MycatZkCluster.java 5.18 KB
package io.mycat.zkcluster;

import com.alibaba.fastjson.JSON;
import io.mycat.MycatServer;
import io.mycat.util.IPUtil;
import io.mycat.util.StringUtil;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

/**
 * Created by xueyin on 2018/11/27.
 * yoho集群实现方式: mycat通过向zk注册自己的方式加入到cluster
 * 应用层通过多数据源方式来实现集群
 */
public class MycatZkCluster {
    private static final Logger LOGGER = LoggerFactory.getLogger(MycatZkCluster.class);

    private static final String CLUSTER_PATH_BASE = "/yoho-cobar";

    private static MycatZkCluster ZK_CLUSTER_INSTANCE = new MycatZkCluster();

    private CuratorFramework curatorFramework = null;

    private class MycatClusterNode {
        String  serverIP;
        int     serverPort;
        int     type;

        private MycatClusterNode(String serverIP, int serverPort, int type) {
            this.serverIP = serverIP;
            this.serverPort = serverPort;
            this.type = type;
        }
    }

    private MycatClusterNode mycatClusterNode;

    private MycatZkCluster() {
        mycatClusterNode = new MycatClusterNode(IPUtil.fetchLocalIP(),
                                        MycatServer.getInstance().getConfig().getSystem().getServerPort(),
                                        0);
    }

    public static MycatZkCluster getInstance() {
        return ZK_CLUSTER_INSTANCE;
    }

    public boolean isCluster() {
        return ZkClusterConfig.getInstance().getClusterMode();
    }

    public void start() {
        LOGGER.info("begin start mycat cluster success.");
        if (!ZkClusterConfig.getInstance().getClusterMode()) {
            LOGGER.warn("mycat running on standalone mode.");
            return;
        }

        regitClusterNode(mycatClusterNode);
        LOGGER.info("start mycat cluster success.");
    }

    public void stop() {
        LOGGER.info("begin stop mycat cluster success.");
        // 从zk上删除注册信息
        unregitClusterNode(mycatClusterNode);

        LOGGER.info("stop mycat cluster success.");
    }

    private void regitClusterNode(MycatClusterNode node) {
        // 创建zk连接
        curatorFramework = createZKConnect();
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            @Override
            public void run() {
                stop();
            }
        }));

        // 创建cluster node
        String nodePath = buildClusterNodePath(node);
        String nodeData = buildClusterNodeData(node);
        try {
            if (curatorFramework.checkExists().forPath(nodePath) != null) {
                curatorFramework.delete().deletingChildrenIfNeeded().forPath(nodePath);
            }
            curatorFramework.create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.EPHEMERAL)
                    .forPath(nodePath, StringUtil.encode(nodeData, "uft-8"));
        } catch (Exception e) {
            LOGGER.error("cluster registe to zookeeper fail. ", e);
            throw new RuntimeException("cluster registe to zookeeper fail.");
        }
    }

    private CuratorFramework createZKConnect() {
        String zkUrl = ZkClusterConfig.getInstance().getZkUrl();

        CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
                .connectString(zkUrl)
                .connectionTimeoutMs(3000)
                .sessionTimeoutMs(5000)
                .retryPolicy(new ExponentialBackoffRetry(100, 6))
                .build();
        curatorFramework.start();

        try {
            curatorFramework.blockUntilConnected(5, TimeUnit.SECONDS);
            if (curatorFramework.getZookeeperClient().isConnected()) {
                return curatorFramework;
            }
        }
        catch (InterruptedException e) {
            LOGGER.error("cluser connect zk exception.", e);
        }

        curatorFramework.close();
        throw  new RuntimeException("mycat zkcluster create connect fail.");
    }

    private void unregitClusterNode(MycatClusterNode node) {
        if (curatorFramework == null) {
            return;
        }

        String clusterPath = buildClusterNodePath(node);
        try {
            if (curatorFramework.checkExists().forPath(clusterPath) != null) {
                curatorFramework.delete().deletingChildrenIfNeeded().forPath(clusterPath);
            }

            curatorFramework.close();
            curatorFramework = null;
        } catch (Exception e) {
            LOGGER.error("cluster unregist to zookeeper fail.", e);
        }
    }

    private String buildClusterNodePath(MycatClusterNode node) {
        return CLUSTER_PATH_BASE + "/" + ZkClusterConfig.getInstance().getClusterName() +"/" + node.serverIP + ":" + node.serverPort;
    }

    private String buildClusterNodeData(MycatClusterNode node) {
        return String.format("{\"serverIP\":\"%s\",\"serverPort\":%d,\"type\":%d}",
                node.serverIP, node.serverPort, node.type);
    }
}