ElectionService.java 8.39 KB
/**
 * 
 */
package com.yoho.jobs.server.election;

import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.fastjson.JSON;
import com.yoho.core.common.utils.LocalIp;
import com.yoho.jobs.common.domain.ConstantEnum;
import com.yoho.jobs.common.domain.JobInfo;
import com.yoho.jobs.server.domain.ScheduleJobInfo;
import com.yoho.jobs.server.scheduler.Container;
import com.yoho.jobs.server.scheduler.JobScheduleService;
import com.yoho.jobs.server.scheduler.JobService;

/**
 * @author yanzhang.fu 主节点选取
 */
public class ElectionService {

	private final static Logger logger = LoggerFactory.getLogger(ElectionService.class);

	// 选举节点
	private static final String ELECTION_PATH = "/job/jobmanager/election";

	private static final String CLIENT_NODES = "/job/jobclient";

	private CuratorFramework client;

	private LeaderSelector leaderSelector;

	private JobService jobService;

	private JobScheduleService jobScheduleService;

	private CountDownLatch count = new CountDownLatch(1);

	public void init() {
		leaderSelector = new LeaderSelector(client, ELECTION_PATH, new LeaderSelectorListenerAdapter() {

			@Override
			public void takeLeadership(CuratorFramework client) throws Exception {
				// 选主成功,开始调度任务
				logger.info("{} has leadership", LocalIp.getLocalIp());
				// 获取所有任务
				Map<String, List<JobInfo>> jobsInfo = jobService.getAllJobs();
				buildScheduleJobInfoContainer(jobsInfo);

				Container.CONTAINER.putAllClient(jobService.getAllClient());
				// 监听客户端
				// listenClientNode(jobsInfo);
				listenClient();
				// 调度开始,只有在触发调度的时候再选取执行节点
				// jobScheduleService.batchSchedule(Container.CONTAINER.getAllJobs());

				count.await();
			}

		});

		leaderSelector.autoRequeue();
	}

	public void start() {
		logger.info("start select master");
		leaderSelector.start();
	}

	public void stop() {
		leaderSelector.close();
	}

	public void destroy() {
		stop();
		count.countDown();

	}

	private void buildScheduleJobInfoContainer(Map<String, List<JobInfo>> jobsInfo) {
		for (Map.Entry<String, List<JobInfo>> entry : jobsInfo.entrySet()) {
			String moduleName = entry.getKey();
			for (JobInfo jobInfo : entry.getValue()) {
				ScheduleJobInfo scheduleJobInfo = new ScheduleJobInfo();
				scheduleJobInfo.setJobInfo(jobInfo);
				scheduleJobInfo.setFuture(null);
				if (Container.CONTAINER.getAllJobs().containsKey(moduleName)) {

					Container.CONTAINER.getAllJobs().get(moduleName).put(jobInfo.getJobName(), scheduleJobInfo);
				} else {
					Map<String, ScheduleJobInfo> map = new HashMap<String, ScheduleJobInfo>();
					map.put(jobInfo.getJobName(), scheduleJobInfo);
					Container.CONTAINER.getAllJobs().put(moduleName, map);
				}
			}
		}
	}

	// 监听客户端
	private void listenClient() throws Exception {
		TreeCache treeCache = new TreeCache(client, CLIENT_NODES);
		treeCache.getListenable().addListener(new TreeCacheListener() {
			@Override
			public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
				ChildData data = event.getData();
				if (data != null) {
					String path = data.getPath();
					String pathType = getPathType(path);
					switch (event.getType()) {
					case NODE_ADDED:
						if (isClientInfoType(pathType)) {
							// 获取ip
							String clientIp = path.substring(path.lastIndexOf("/") + 1);
							logger.info("register new client, the client is {}", String.valueOf(data.getData()));
							Container.CONTAINER.addClient(parseModule(path), clientIp);
						} else if (isJobInfoType(pathType)) {
							JobInfo jobInfo = JSON.parseObject(data.getData(), JobInfo.class);
							logger.info("create new Job, the Job is {}", String.valueOf(data.getData()));
							ScheduleJobInfo scheduleJobInfo = new ScheduleJobInfo();
							scheduleJobInfo.setJobInfo(jobInfo);
							scheduleJobInfo.setFuture(null);
							Container.CONTAINER.addJobInfo(jobInfo.getModule(), scheduleJobInfo);
							if (jobInfo.getOperator() == ConstantEnum.JobOperatorEnum.EXEC.getValue()
									|| jobInfo.getOperator() == ConstantEnum.JobOperatorEnum.TRIGGER.getValue()) {
								jobInfo.setOperator(ConstantEnum.JobOperatorEnum.TRIGGER.getValue());
//								jobInfo.setStatus(ConstantEnum.JobStatusEnum.STARTING.getValue());
								jobScheduleService.schedule(scheduleJobInfo);
							}
						}
						break;
					case NODE_REMOVED:
						if (isClientInfoType(pathType)) {
							// 有客户端下线,需要将下线的客户端上执行的任务切换到其他节点上
							logger.info("remove node {}", data.getPath());
							Container.CONTAINER.removeClient(parseModule(path),
									path.substring(path.lastIndexOf("/") + 1));
							// 重新选取节点
						}
						break;
					case NODE_UPDATED:
						if (isClientInfoType(pathType)) {

						} else if (isJobInfoType(pathType)) {
							logger.info("update Job, the Job is {}", String.valueOf(data.getData()));
							JobInfo jobInfo = JSON.parseObject(data.getData(), JobInfo.class);
							// 如果是任务的启动则需要schedule
							ScheduleJobInfo updateScheduleJobInfo = Container.CONTAINER
									.getJobsByModule(jobInfo.getModule()).get(jobInfo.getJobName());
							if (jobInfo.getOperator() == ConstantEnum.JobOperatorEnum.EXEC.getValue()) {
								jobInfo.setOperator(ConstantEnum.JobOperatorEnum.TRIGGER.getValue());
//								jobInfo.setStatus(ConstantEnum.JobStatusEnum.STARTING.getValue());

								updateScheduleJobInfo.setJobInfo(jobInfo);
								updateScheduleJobInfo.setFuture(null);
								jobScheduleService.schedule(updateScheduleJobInfo);
							} else if (jobInfo.getOperator() == ConstantEnum.JobOperatorEnum.STOP.getValue()) {
								// 当前任务是执行状态时,才可以停止,然后cancel之
								if (jobInfo.getStatus() == ConstantEnum.JobStatusEnum.STARTING.getValue()) {
									
									updateScheduleJobInfo.setJobInfo(jobInfo);
									updateScheduleJobInfo.getFuture().cancel(false);
									jobInfo.setStatus(ConstantEnum.JobStatusEnum.STOPPING.getValue());
									// 重新更新到zk上
									String jobPath = CLIENT_NODES + "/" + jobInfo.getModule() + "/jobinfo/"
											+ jobInfo.getJobName();
									try {
										client.setData().forPath(jobPath,
												JSON.toJSONString(jobInfo).getBytes(Charset.forName("UTF-8")));
									} catch (Exception e) {
										logger.error("stop job faild , job is {}, exception {}", jobInfo, e);
									}
								}
							} else if (jobInfo.getOperator() == ConstantEnum.JobOperatorEnum.UPDATE.getValue()) {
								updateScheduleJobInfo.setJobInfo(jobInfo);
							}
						}
						break;

					default:
						break;
					}
				} else {
					logger.info("data is null : " + event.getType());
				}
			}
		});
		// 开始监听
		treeCache.start();
	}

	private String getPathType(String path) {
		String pathType = "";
		String temp = path.substring(0, path.lastIndexOf("/"));
		pathType = temp.substring(temp.lastIndexOf("/") + 1);
		return pathType;
	}

	private boolean isClientInfoType(String pathType) {
		return pathType.equalsIgnoreCase("clientnode") ? true : false;
	}

	private boolean isJobInfoType(String pathType) {
		return pathType.equalsIgnoreCase("jobinfo") ? true : false;
	}

	private String parseModule(String path) {
		String s = path.substring(CLIENT_NODES.length() + 1);
		String module = s.substring(0, s.indexOf("/"));
		return module;
	}

	public CuratorFramework getClient() {
		return client;
	}

	public void setClient(CuratorFramework client) {
		this.client = client;
	}

	public JobService getJobService() {
		return jobService;
	}

	public void setJobService(JobService jobService) {
		this.jobService = jobService;
	}

	public JobScheduleService getJobScheduleService() {
		return jobScheduleService;
	}

	public void setJobScheduleService(JobScheduleService jobScheduleService) {
		this.jobScheduleService = jobScheduleService;
	}

}