Authored by Gino Zhang

Master变更之后将原先执行中的任务设置为失败状态

@@ -6,3 +6,4 @@ job/.classpath @@ -6,3 +6,4 @@ job/.classpath
6 job/.project 6 job/.project
7 web/.classpath 7 web/.classpath
8 web/.project 8 web/.project
  9 +build/to_remote/war/yoho-kisjob-web.war
@@ -62,6 +62,13 @@ public class NodeUtil { @@ -62,6 +62,13 @@ public class NodeUtil {
62 return localIpAddress; 62 return localIpAddress;
63 } 63 }
64 64
  65 + public static boolean isCurrentNode(String nodeIdentifier) {
  66 + if (nodeIdentifier == null) {
  67 + return false;
  68 + }
  69 + return nodeIdentifier.equals(getCurrentNodeIp());
  70 + }
  71 +
65 private static boolean isPublicIpAddress(final InetAddress ipAddress) { 72 private static boolean isPublicIpAddress(final InetAddress ipAddress) {
66 return !ipAddress.isSiteLocalAddress() && !ipAddress.isLoopbackAddress() && !isV6IpAddress(ipAddress); 73 return !ipAddress.isSiteLocalAddress() && !ipAddress.isLoopbackAddress() && !isV6IpAddress(ipAddress);
67 } 74 }
@@ -467,6 +467,14 @@ public class DefaultRegistryCenter implements RegistryCenter { @@ -467,6 +467,14 @@ public class DefaultRegistryCenter implements RegistryCenter {
467 String jobName = map.get("jobName"); 467 String jobName = map.get("jobName");
468 JobMeta jobMeta = jobMetaService.queryByKey(nodeGroup, jobName); 468 JobMeta jobMeta = jobMetaService.queryByKey(nodeGroup, jobName);
469 Long jobInstanceId = Long.valueOf(zkHelper.get(event.getData().getPath())); 469 Long jobInstanceId = Long.valueOf(zkHelper.get(event.getData().getPath()));
  470 + KisJobInstance jobInstance = jobInstanceService.queryJobInstById(jobInstanceId);
  471 + if (jobInstance == null || jobInstance.getInstanceStatus().intValue() != State.Running.getCode()) {
  472 + // 如果实例不存在或者不是running状态的 就不通知调度中心去执行
  473 + log.info(
  474 + "The job instance is not running and skip execution groups. jobName: {}, jobInstanceId: {}",
  475 + jobName, jobInstanceId);
  476 + return;
  477 + }
470 List<Integer> groupIndexs = jobInstGroupService.queryJobInstanceGroupIndexs(jobInstanceId); 478 List<Integer> groupIndexs = jobInstGroupService.queryJobInstanceGroupIndexs(jobInstanceId);
471 Assert.notEmpty(groupIndexs); 479 Assert.notEmpty(groupIndexs);
472 Map<Integer, String> groupDispatchResult = new HashMap<Integer, String>(); 480 Map<Integer, String> groupDispatchResult = new HashMap<Integer, String>();
@@ -2,7 +2,6 @@ package com.yoho.kisjob.reg.persistence.service; @@ -2,7 +2,6 @@ package com.yoho.kisjob.reg.persistence.service;
2 2
3 import java.util.Date; 3 import java.util.Date;
4 import java.util.List; 4 import java.util.List;
5 -import java.util.Map;  
6 5
7 import org.springframework.beans.factory.annotation.Autowired; 6 import org.springframework.beans.factory.annotation.Autowired;
8 import org.springframework.stereotype.Service; 7 import org.springframework.stereotype.Service;
@@ -11,12 +10,15 @@ import org.springframework.util.Assert; @@ -11,12 +10,15 @@ import org.springframework.util.Assert;
11 import com.yoho.kisjob.common.constant.JobException; 10 import com.yoho.kisjob.common.constant.JobException;
12 import com.yoho.kisjob.common.constant.State; 11 import com.yoho.kisjob.common.constant.State;
13 import com.yoho.kisjob.common.meta.ExecutionMeta; 12 import com.yoho.kisjob.common.meta.ExecutionMeta;
  13 +import com.yoho.kisjob.common.meta.JobMeta;
  14 +import com.yoho.kisjob.common.meta.JobType;
14 import com.yoho.kisjob.common.utils.NodeUtil; 15 import com.yoho.kisjob.common.utils.NodeUtil;
15 import com.yoho.kisjob.common.utils.RandomUtils; 16 import com.yoho.kisjob.common.utils.RandomUtils;
16 import com.yoho.kisjob.common.utils.Utils; 17 import com.yoho.kisjob.common.utils.Utils;
17 import com.yoho.kisjob.reg.persistence.service.jdbc.JdbcService; 18 import com.yoho.kisjob.reg.persistence.service.jdbc.JdbcService;
18 import com.yoho.kisjob.reg.persistence.vo.KisJob; 19 import com.yoho.kisjob.reg.persistence.vo.KisJob;
19 import com.yoho.kisjob.reg.persistence.vo.KisJobInstance; 20 import com.yoho.kisjob.reg.persistence.vo.KisJobInstance;
  21 +import com.yoho.kisjob.reg.persistence.vo.KisJobInstanceGroup;
20 22
21 /** 23 /**
22 * 定时任务实例持久化服务类。 24 * 定时任务实例持久化服务类。
@@ -34,6 +36,9 @@ public class JobInstanceService { @@ -34,6 +36,9 @@ public class JobInstanceService {
34 @Autowired 36 @Autowired
35 private JobInstGroupService jobInstGroupService; 37 private JobInstGroupService jobInstGroupService;
36 38
  39 + @Autowired
  40 + private JobMetaService jobMetaService;
  41 +
37 public Long addNewInstance(String nodeGroup, String jobName) { 42 public Long addNewInstance(String nodeGroup, String jobName) {
38 List<KisJob> list = jdbcService.query(new KisJob(nodeGroup, jobName)); 43 List<KisJob> list = jdbcService.query(new KisJob(nodeGroup, jobName));
39 Assert.notEmpty(list, "For " + jobName); 44 Assert.notEmpty(list, "For " + jobName);
@@ -81,21 +86,37 @@ public class JobInstanceService { @@ -81,21 +86,37 @@ public class JobInstanceService {
81 condition.setInstanceStatus(State.Running.getCode()); 86 condition.setInstanceStatus(State.Running.getCode());
82 List<KisJobInstance> list = jdbcService.query(condition); 87 List<KisJobInstance> list = jdbcService.query(condition);
83 if (Utils.isNotEmpty(list)) { 88 if (Utils.isNotEmpty(list)) {
  89 +
  90 + JobMeta jobMeta = jobMetaService.queryByJobId(jobId);
  91 + JobType jobType = jobMeta != null ? jobMeta.getJobType() : null;
84 for (KisJobInstance jobInstance : list) { 92 for (KisJobInstance jobInstance : list) {
85 - if (NodeUtil.getCurrentNodeIp().equals(jobInstance.getNodeIdentifier())) { 93 + if (NodeUtil.isCurrentNode(jobInstance.getNodeIdentifier())) {
86 continue; 94 continue;
87 } 95 }
88 96
89 - Map<Integer, State> groupStatusMap = jobInstGroupService.queryJobInstanceGroupStatus(jobInstance  
90 - .getInstanceId());  
91 - for (Map.Entry<Integer, State> entry : groupStatusMap.entrySet()) {  
92 - if (entry.getValue() == State.Ready || entry.getValue() == State.Running) {  
93 - jobInstGroupService.endExecuteGroup(jobInstance.getInstanceId(), entry.getKey(),  
94 - new JobException("Execute group failed for master crashed."));  
95 - } 97 + if (jobType == JobType.ElastaicJob) {
  98 + updateUncompletedJobInstGroups(jobInstance);
96 } 99 }
  100 +
97 updateInstanceResult(jobInstance.getInstanceId(), null, new JobException( 101 updateInstanceResult(jobInstance.getInstanceId(), null, new JobException(
98 - "Execute instance failed for master crashed.")); 102 + "Execute instance failed for master %s crashed.", jobInstance.getNodeIdentifier()));
  103 + }
  104 + }
  105 + }
  106 +
  107 + public void updateUncompletedJobInstGroups(KisJobInstance jobInstance) {
  108 + State groupState;
  109 + List<KisJobInstanceGroup> groups = jobInstGroupService.queryJobInstanceGroups(new KisJobInstanceGroup(
  110 + jobInstance.getInstanceId()));
  111 + if (Utils.isNotEmpty(groups)) {
  112 + // TODO:暂时先把master上面的分组更新成失败的 这个后续需要优化
  113 + for (KisJobInstanceGroup group : groups) {
  114 + groupState = State.build(group.getGroupState());
  115 + if ((groupState == State.Ready || groupState == State.Running)
  116 + && jobInstance.getNodeIdentifier().equals(group.getNodeIdentifier())) {
  117 + jobInstGroupService.endExecuteGroup(jobInstance.getInstanceId(), group.getGroupIndex(),
  118 + new JobException("Execute group failed for master %s crashed.", group.getNodeIdentifier()));
  119 + }
99 } 120 }
100 } 121 }
101 } 122 }