MycatZkCluster.java
5.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package io.mycat.zkcluster;
import com.alibaba.fastjson.JSON;
import io.mycat.MycatServer;
import io.mycat.util.IPUtil;
import io.mycat.util.StringUtil;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
/**
* Created by xueyin on 2018/11/27.
* yoho集群实现方式: mycat通过向zk注册自己的方式加入到cluster
* 应用层通过多数据源方式来实现集群
*/
public class MycatZkCluster {
private static final Logger LOGGER = LoggerFactory.getLogger(MycatZkCluster.class);
private static final String CLUSTER_PATH_BASE = "/yoho-cobar";
private static MycatZkCluster ZK_CLUSTER_INSTANCE = new MycatZkCluster();
private CuratorFramework curatorFramework = null;
private class MycatClusterNode {
String serverIP;
int serverPort;
int type;
private MycatClusterNode(String serverIP, int serverPort, int type) {
this.serverIP = serverIP;
this.serverPort = serverPort;
this.type = type;
}
}
private MycatClusterNode mycatClusterNode;
private MycatZkCluster() {
mycatClusterNode = new MycatClusterNode(IPUtil.fetchLocalIP(),
MycatServer.getInstance().getConfig().getSystem().getServerPort(),
0);
}
public static MycatZkCluster getInstance() {
return ZK_CLUSTER_INSTANCE;
}
public boolean isCluster() {
return ZkClusterConfig.getInstance().getClusterMode();
}
public void start() {
LOGGER.info("begin start mycat cluster success.");
if (!ZkClusterConfig.getInstance().getClusterMode()) {
LOGGER.warn("mycat running on standalone mode.");
return;
}
regitClusterNode(mycatClusterNode);
LOGGER.info("start mycat cluster success.");
}
public void stop() {
LOGGER.info("begin stop mycat cluster success.");
// 从zk上删除注册信息
unregitClusterNode(mycatClusterNode);
LOGGER.info("stop mycat cluster success.");
}
private void regitClusterNode(MycatClusterNode node) {
// 创建zk连接
curatorFramework = createZKConnect();
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
stop();
}
}));
// 创建cluster node
String nodePath = buildClusterNodePath(node);
String nodeData = buildClusterNodeData(node);
try {
if (curatorFramework.checkExists().forPath(nodePath) != null) {
curatorFramework.delete().deletingChildrenIfNeeded().forPath(nodePath);
}
curatorFramework.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(nodePath, StringUtil.encode(nodeData, "uft-8"));
} catch (Exception e) {
LOGGER.error("cluster registe to zookeeper fail. ", e);
throw new RuntimeException("cluster registe to zookeeper fail.");
}
}
private CuratorFramework createZKConnect() {
String zkUrl = ZkClusterConfig.getInstance().getZkUrl();
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
.connectString(zkUrl)
.connectionTimeoutMs(3000)
.sessionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(100, 6))
.build();
curatorFramework.start();
try {
curatorFramework.blockUntilConnected(5, TimeUnit.SECONDS);
if (curatorFramework.getZookeeperClient().isConnected()) {
return curatorFramework;
}
}
catch (InterruptedException e) {
LOGGER.error("cluser connect zk exception.", e);
}
curatorFramework.close();
throw new RuntimeException("mycat zkcluster create connect fail.");
}
private void unregitClusterNode(MycatClusterNode node) {
if (curatorFramework == null) {
return;
}
String clusterPath = buildClusterNodePath(node);
try {
if (curatorFramework.checkExists().forPath(clusterPath) != null) {
curatorFramework.delete().deletingChildrenIfNeeded().forPath(clusterPath);
}
curatorFramework.close();
curatorFramework = null;
} catch (Exception e) {
LOGGER.error("cluster unregist to zookeeper fail.", e);
}
}
private String buildClusterNodePath(MycatClusterNode node) {
return CLUSTER_PATH_BASE + "/" + ZkClusterConfig.getInstance().getClusterName() +"/" + node.serverIP + ":" + node.serverPort;
}
private String buildClusterNodeData(MycatClusterNode node) {
return String.format("{\"serverIP\":\"%s\",\"serverPort\":%d,\"type\":%d}",
node.serverIP, node.serverPort, node.type);
}
}