DistributedJob.java 5.13 KB
package com.yoho.kisjob.jobnode.elastic;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.yoho.kisjob.common.constant.JobException;
import com.yoho.kisjob.common.constant.State;
import com.yoho.kisjob.common.locator.ServiceLocator;
import com.yoho.kisjob.common.meta.ExecutionMeta;
import com.yoho.kisjob.common.meta.JobGroupExecutionMeta;
import com.yoho.kisjob.common.meta.JobGroupMeta;
import com.yoho.kisjob.common.meta.Writable;
import com.yoho.kisjob.common.utils.Utils;
import com.yoho.kisjob.jobnode.context.JobContextHolder;
import com.yoho.kisjob.jobnode.core.AbstractJob;
import com.yoho.kisjob.reg.core.RegistryCenter;

/**
 * 分布式定时任务实现类
 * 
 * @author zhangfeng
 * @since 2016-07-17
 */
@Service
@Slf4j
public class DistributedJob extends AbstractJob {

	@Autowired
	private RegistryCenter registerCenter;

	@Override
	public void execute() {
		log.info("Begin to execute distributed job. jobName: {}", JobContextHolder.getContext().getJobName());
		String nodeGroup = JobContextHolder.getContext().getNodeGroup();
		String jobName = JobContextHolder.getContext().getJobName();
		Long jobInstId = JobContextHolder.getContext().getJobInstanceId();

		// 1.初始化
		// TODO: 需要考虑上次master挂掉之后如何继续执行的问题 暂时先搞成失败的
		registerCenter.clearJobGroups(nodeGroup, jobName);

		// 2.master grouping with mapper
		Map<String, JobGroupMeta> groups = new HashMap<String, JobGroupMeta>();
		Mapper mapper = ServiceLocator.findService(JobContextHolder.getContext().getMapperBean());
		mapper.map(new OutputCollector() {
			@Override
			public void collect(String key, Writable value) {
				JobGroupMeta group = groups.get(key);
				if (group == null) {
					group = new JobGroupMeta(key);
					groups.put(key, group);
				}

				group.addGroupItem(value);
			}
		});

		// 3.persistence grouping result
		registerCenter.registerJobGroups(jobInstId, groups);

		// 4.等待分组执行结束
		List<JobGroupExecutionMeta> list = registerCenter.getJobGroupExecutionMeta(nodeGroup, jobName, jobInstId);
		while (!isAllGroupCompleted(list)) {
			processInstanceGroup(list);
			waitAllGroupsCompleted();
			list = registerCenter.getJobGroupExecutionMeta(nodeGroup, jobName, jobInstId);
		}

		// 5.判断结果
		Set<State> resultStates = getAllGroupState(list);
		if (resultStates.contains(State.Failed)) {
			throw new JobException("There has failed job groups, please check.");
		}

		log.info("End to execute distributed job. jobName: {}", jobName);
	}

	private Set<State> getAllGroupState(List<JobGroupExecutionMeta> list) {
		Set<State> result = new HashSet<State>();
		if (Utils.isEmpty(list)) {
			return result;
		}

		for (JobGroupExecutionMeta meta : list) {
			result.add(meta.getGroupState());
		}
		return result;
	}

	private void processInstanceGroup(List<JobGroupExecutionMeta> list) {
		Long jobInstId = JobContextHolder.getContext().getJobInstanceId();
		for (JobGroupExecutionMeta meta : list) {
			if (!meta.isNodeAvailable()
					&& (meta.getGroupState() == State.Ready || meta.getGroupState() == State.Running)) {
				// 处理节点不可用的情况
				// TODO:处理失效转移的情况 暂时先不支持 都设置为失败
				registerCenter.endExecuteGroup(jobInstId, meta.getGroupIndex(), new JobException(
						"The job node %s is unavailable.", meta.getNodeIdentifier()));
			}
		}
	}

	private boolean isAllGroupCompleted(List<JobGroupExecutionMeta> list) {
		if (Utils.isEmpty(list)) {
			return true;
		}

		for (JobGroupExecutionMeta meta : list) {
			if (meta.getGroupState() == State.Ready || meta.getGroupState() == State.Running) {
				return false;
			}
		}
		return true;
	}

	/**
	 * 等待分组执行结束 每隔10秒钟检查一次
	 */
	private void waitAllGroupsCompleted() {
		try {
			log.debug("Waiting for all group execution completed.");
			Thread.sleep(10000L);
		} catch (InterruptedException e) {
			throw new JobException("The distribution job has been interrupted.", e);
		}
	}

	@Override
	public ExecutionMeta query() {
		String nodeGroup = JobContextHolder.getContext().getNodeGroup();
		String jobName = JobContextHolder.getContext().getJobName();
		Long jobInstId = JobContextHolder.getContext().getJobInstanceId();
		List<JobGroupExecutionMeta> list = registerCenter.getJobGroupExecutionMeta(nodeGroup, jobName, jobInstId);
		if (Utils.isEmpty(list)) {
			return null;
		}

		int succeedCount = 0, failedCount = 0;
		for (int i = 0; i < list.size(); i++) {
			if (list.get(i).getGroupState() == State.Completed) {
				succeedCount++;
			} else if (list.get(i).getGroupState() == State.Failed) {
				failedCount++;
			}
		}

		return new ExecutionMeta(list.size(), succeedCount, failedCount);
	}

	@Override
	public void pause() {
		// TODO:支持暂停、恢复和取消
		super.pause();
	}

	@Override
	public void resume() {
		// TODO:支持暂停、恢复和取消
		super.resume();
	}

	@Override
	public void cancel() {
		// TODO:支持暂停、恢复和取消
		super.cancel();
	}
}