Authored by fu

代码重复

... ... @@ -13,3 +13,4 @@ yoho-jobs-client/.project
yoho-jobs-client/target/
yoho-jobs-server/target/
yoho-jobs-dal/target/
yoho-jobs-server/.eclipse-pmd
... ...
package com.yoho.jobs.client;
/**
* Hello world!
*
*/
public class App
{
public static void main( String[] args )
{
System.out.println( "Hello World!" );
}
}
... ... @@ -27,7 +27,7 @@ public class TestSchedule {
int i = 0;
while (!future.isDone()) {
try {
if (i < 70) {
if (i < 7) {
Thread.currentThread().sleep(1000l);
i++;
} else {
... ...
... ... @@ -12,6 +12,7 @@ import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.yoho.jobs.server.domain.ConstantEnum;
import com.yoho.jobs.server.domain.JobInfo;
import com.yoho.jobs.server.domain.JobResponse;
import com.yoho.jobs.server.domain.ShowAllJobResponse;
... ... @@ -31,11 +32,9 @@ public class JobServiceController {
JobResponse response = new JobResponse();
try {
jobService.createJob(jobInfo);
response.setCode("0");
response.setDesc("success");
buildJobResponse(ConstantEnum.JobResponse.SUCCESS, ConstantEnum.JobResponse.SUCCESS_DESC, response);
} catch (Exception e) {
response.setCode("1");
response.setDesc(e.getLocalizedMessage());
buildJobResponse(ConstantEnum.JobResponse.FAIL, e.getMessage(), response);
}
return response;
}
... ... @@ -45,11 +44,9 @@ public class JobServiceController {
JobResponse response = new JobResponse();
try {
jobService.stopJob(jobInfo);
response.setCode("0");
response.setDesc("success");
buildJobResponse(ConstantEnum.JobResponse.SUCCESS, ConstantEnum.JobResponse.SUCCESS_DESC, response);
} catch (Exception e) {
response.setCode("1");
response.setDesc(e.getLocalizedMessage());
buildJobResponse(ConstantEnum.JobResponse.FAIL, e.getMessage(), response);
}
return response;
}
... ... @@ -59,11 +56,9 @@ public class JobServiceController {
JobResponse response = new JobResponse();
try {
jobService.startJob(jobInfo);
response.setCode("0");
response.setDesc("success");
buildJobResponse(ConstantEnum.JobResponse.SUCCESS, ConstantEnum.JobResponse.SUCCESS_DESC, response);
} catch (Exception e) {
response.setCode("1");
response.setDesc(e.getLocalizedMessage());
buildJobResponse(ConstantEnum.JobResponse.FAIL, e.getMessage(), response);
}
return response;
... ... @@ -74,11 +69,9 @@ public class JobServiceController {
JobResponse response = new JobResponse();
try {
jobService.updateJob(jobInfo);
response.setCode("0");
response.setDesc("success");
buildJobResponse(ConstantEnum.JobResponse.SUCCESS, ConstantEnum.JobResponse.SUCCESS_DESC, response);
} catch (Exception e) {
response.setCode("1");
response.setDesc(e.getLocalizedMessage());
buildJobResponse(ConstantEnum.JobResponse.FAIL, e.getMessage(), response);
}
return response;
... ... @@ -94,7 +87,7 @@ public class JobServiceController {
response.setDesc("success");
} catch (Exception e) {
response.setCode("1");
response.setDesc(e.getLocalizedMessage());
response.setDesc(e.getMessage());
}
return response;
}
... ... @@ -104,13 +97,17 @@ public class JobServiceController {
JobResponse response = new JobResponse();
try {
jobService.retryJob(jobInfo);
response.setCode("0");
response.setDesc("success");
buildJobResponse(ConstantEnum.JobResponse.SUCCESS, ConstantEnum.JobResponse.SUCCESS_DESC, response);
} catch (Exception e) {
response.setCode("1");
response.setDesc(e.getLocalizedMessage());
buildJobResponse(ConstantEnum.JobResponse.FAIL, e.getMessage(), response);
}
return response;
}
private void buildJobResponse(String code, String message, JobResponse response) {
response.setCode(code);
response.setDesc(message);
}
}
... ...
... ... @@ -53,7 +53,7 @@ public interface ConstantEnum {
}
public enum JobOperatorEnum {
EXEC(0, "执行"), UPDATE(1, "更新"), STOP(2, "停止"), CREATE(3, "新建"),TRIGGER(4,"任务触发");
EXEC(0, "执行"), UPDATE(1, "更新"), STOP(2, "停止"), CREATE(3, "新建"), TRIGGER(4, "任务触发");
private int value;
private String name;
... ... @@ -72,4 +72,13 @@ public interface ConstantEnum {
}
public class JobResponse {
public static final String SUCCESS = "0";
public static final String SUCCESS_DESC = "success";
public static final String FAIL = "1";
}
}
... ...
... ... @@ -5,6 +5,8 @@ package com.yoho.jobs.server.domain;
import java.util.concurrent.ScheduledFuture;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
... ... @@ -22,5 +24,7 @@ public class ScheduleJobInfo {
// 定时任务返回对象
private ScheduledFuture<?> future;
ThreadPoolTaskScheduler scheduler;
}
... ...
... ... @@ -63,10 +63,10 @@ public class ElectionService {
Container.CONTAINER.putAllClient(jobService.getAllClient());
// 监听客户端
// listenClientNode(jobsInfo);
// listenClientNode(jobsInfo);
listenClient();
// 调度开始,只有在触发调度的时候再选取执行节点
jobScheduleService.batchSchedule(Container.CONTAINER.getAllJobs());
// jobScheduleService.batchSchedule(Container.CONTAINER.getAllJobs());
count.await();
}
... ... @@ -91,122 +91,6 @@ public class ElectionService {
}
private void listenClientNode(Map<String, List<JobInfo>> jobsInfo) throws Exception {
for (Map.Entry<String, List<JobInfo>> entry : jobsInfo.entrySet()) {
buildWatch(entry.getKey());
watchJobInfo(entry.getKey());
}
}
private void buildWatch(String module) throws Exception {
TreeCache treeCache = new TreeCache(client, CLIENT_NODES + module + "/clientnode");
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
ChildData data = event.getData();
if (data != null) {
String path = data.getPath();
String clientIp = path.substring(path.lastIndexOf("/") + 1);
switch (event.getType()) {
case NODE_ADDED:
logger.info("register new client, the client is {}", String.valueOf(data.getData()));
Container.CONTAINER.addClient(module, clientIp);
break;
case NODE_REMOVED:
// 有客户端下线,需要将下线的客户端上执行的任务切换到其他节点上
logger.info("remove node {}", data.getPath());
// String path = data.getPath();
// String clientIp = path.substring(path.lastIndexOf("/") + 1);
Container.CONTAINER.removeClient(module, clientIp);
// 重新选取节点
break;
case NODE_UPDATED:
break;
default:
break;
}
} else {
logger.info("data is null : " + event.getType());
}
}
});
// 开始监听
treeCache.start();
}
private void watchJobInfo(String module) throws Exception {
TreeCache treeCache = new TreeCache(client, CLIENT_NODES + module + "/jobInfo");
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
ChildData data = event.getData();
if (data != null) {
JobInfo jobInfo = JSON.parseObject(data.getData(), JobInfo.class);
switch (event.getType()) {
// TODO 若先启动服务端,后启动客户端,会不会有问题---有问题
case NODE_ADDED:
logger.info("create new Job, the Job is {}", String.valueOf(data.getData()));
ScheduleJobInfo scheduleJobInfo = new ScheduleJobInfo();
scheduleJobInfo.setJobInfo(jobInfo);
scheduleJobInfo.setFuture(null);
Container.CONTAINER.addJobInfo(jobInfo.getModule(), scheduleJobInfo);
// if (jobInfo.getOperator() == ConstantEnum.JobOperatorEnum.EXEC.getValue()) {
jobInfo.setOperator(ConstantEnum.JobOperatorEnum.TRIGGER.getValue());
jobInfo.setStatus(ConstantEnum.JobStatusEnum.STARTING.getValue());
jobScheduleService.schedule(scheduleJobInfo);
// }
break;
case NODE_REMOVED:
break;
case NODE_UPDATED:
logger.info("update Job, the Job is {}", String.valueOf(data.getData()));
// 如果是任务的启动则需要schedule
ScheduleJobInfo updateScheduleJobInfo = Container.CONTAINER.getJobsByModule(jobInfo.getModule())
.get(jobInfo.getJobName());
if (jobInfo.getOperator() == ConstantEnum.JobOperatorEnum.EXEC.getValue()) {
jobInfo.setOperator(ConstantEnum.JobOperatorEnum.TRIGGER.getValue());
jobInfo.setStatus(ConstantEnum.JobStatusEnum.STARTING.getValue());
updateScheduleJobInfo.setJobInfo(jobInfo);
updateScheduleJobInfo.setFuture(null);
jobScheduleService.schedule(updateScheduleJobInfo);
} else if (jobInfo.getOperator() == ConstantEnum.JobOperatorEnum.STOP.getValue()) {
// 当前任务是执行状态时,才可以停止,然后cancel之
if (jobInfo.getStatus() == ConstantEnum.JobStatusEnum.STARTING.getValue()) {
jobInfo.setStatus(ConstantEnum.JobStatusEnum.STOPPING.getValue());
updateScheduleJobInfo.setJobInfo(jobInfo);
updateScheduleJobInfo.getFuture().cancel(false);
// 重新更新到zk上
String jobPath = CLIENT_NODES + jobInfo.getModule() + "/jobinfo/"
+ jobInfo.getJobName();
try {
client.setData().forPath(jobPath,
JSON.toJSONString(jobInfo).getBytes(Charset.forName("UTF-8")));
} catch (Exception e) {
logger.error("stop job faild , job is {}, exception {}", jobInfo, e);
}
}
} else if (jobInfo.getOperator() == ConstantEnum.JobOperatorEnum.UPDATE.getValue()) {
updateScheduleJobInfo.setJobInfo(jobInfo);
}
break;
default:
break;
}
} else {
logger.info("data is null : " + event.getType());
}
}
});
// 开始监听
treeCache.start();
}
private void buildScheduleJobInfoContainer(Map<String, List<JobInfo>> jobsInfo) {
for (Map.Entry<String, List<JobInfo>> entry : jobsInfo.entrySet()) {
String moduleName = entry.getKey();
... ... @@ -250,17 +134,18 @@ public class ElectionService {
scheduleJobInfo.setJobInfo(jobInfo);
scheduleJobInfo.setFuture(null);
Container.CONTAINER.addJobInfo(jobInfo.getModule(), scheduleJobInfo);
jobInfo.setOperator(ConstantEnum.JobOperatorEnum.TRIGGER.getValue());
jobInfo.setStatus(ConstantEnum.JobStatusEnum.STARTING.getValue());
jobScheduleService.schedule(scheduleJobInfo);
if (jobInfo.getOperator() == ConstantEnum.JobOperatorEnum.EXEC.getValue()
|| jobInfo.getOperator() == ConstantEnum.JobOperatorEnum.TRIGGER.getValue()) {
jobInfo.setOperator(ConstantEnum.JobOperatorEnum.TRIGGER.getValue());
jobInfo.setStatus(ConstantEnum.JobStatusEnum.STARTING.getValue());
jobScheduleService.schedule(scheduleJobInfo);
}
}
break;
case NODE_REMOVED:
if (isClientInfoType(pathType)) {
// 有客户端下线,需要将下线的客户端上执行的任务切换到其他节点上
logger.info("remove node {}", data.getPath());
// String path = data.getPath();
// String clientIp = path.substring(path.lastIndexOf("/") + 1);
Container.CONTAINER.removeClient(parseModule(path),
path.substring(path.lastIndexOf("/") + 1));
// 重新选取节点
... ... @@ -289,7 +174,7 @@ public class ElectionService {
updateScheduleJobInfo.setJobInfo(jobInfo);
updateScheduleJobInfo.getFuture().cancel(false);
// 重新更新到zk上
String jobPath = CLIENT_NODES + jobInfo.getModule() + "/jobinfo/"
String jobPath = CLIENT_NODES + "/" + jobInfo.getModule() + "/jobinfo/"
+ jobInfo.getJobName();
try {
client.setData().forPath(jobPath,
... ... @@ -332,7 +217,7 @@ public class ElectionService {
}
private String parseModule(String path) {
String s = path.substring(CLIENT_NODES.length()+1);
String s = path.substring(CLIENT_NODES.length() + 1);
String module = s.substring(0, s.indexOf("/"));
return module;
}
... ...
... ... @@ -48,8 +48,13 @@ public class JobScheduleServiceImpl implements JobScheduleService {
if (job.getJobInfo().getOperator() != ConstantEnum.JobOperatorEnum.TRIGGER.getValue()) {
return;
}
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.initialize();
ThreadPoolTaskScheduler scheduler = job.getScheduler();
if( scheduler== null) {
scheduler = new ThreadPoolTaskScheduler();
scheduler.initialize();
job.setScheduler(scheduler);
}
String cronExpress = job.getJobInfo().getCronExpression();
// 选取执行节点
if (job.getJobInfo().getJobType() == 0) {
... ...
... ... @@ -19,6 +19,8 @@ import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSON;
import com.yoho.jobs.server.domain.ConstantEnum;
import com.yoho.jobs.server.domain.JobInfo;
import com.yoho.jobs.server.domain.ScheduleJobInfo;
import com.yoho.jobs.server.scheduler.Container;
import com.yoho.jobs.server.scheduler.JobService;
/**
... ... @@ -132,22 +134,17 @@ public class JobServiceImpl implements JobService {
if (logger.isDebugEnabled()) {
logger.debug("begin to create job , job is {}", jobInfo);
}
String jobPath = JOBINFO_PATH + "/" + jobInfo.getModule() + "/jobinfo/" + jobInfo.getJobName();
jobInfo.setOperator(ConstantEnum.JobOperatorEnum.CREATE.getValue());
jobInfo.setStatus(ConstantEnum.JobStatusEnum.STOPPING.getValue());
try {
Stat stat = client.checkExists().forPath(jobPath);
if (stat != null) {
String message = "the job already exist in zk,plz check it ,jobinfo is " + jobInfo;
logger.error("the job already exits in zk,plz check it ,jobinfo is {}", jobInfo);
throw new Exception(message);
} else {
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(jobPath,
JSON.toJSONString(jobInfo).getBytes(Charset.forName("UTF-8")));
}
} catch (Exception e) {
logger.error("System error.", e);
throw e;
if (!checkJobExist(jobInfo)) {
String message = "the job already exist in zk,plz check it ,jobinfo is " + jobInfo;
logger.error("the job already exits in zk,plz check it ,jobinfo is {}", jobInfo);
throw new Exception(message);
} else {
String jobPath = JOBINFO_PATH + "/" + jobInfo.getModule() + "/jobinfo/" + jobInfo.getJobName();
jobInfo.setOperator(ConstantEnum.JobOperatorEnum.CREATE.getValue());
jobInfo.setStatus(ConstantEnum.JobStatusEnum.STOPPING.getValue());
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(jobPath,
JSON.toJSONString(jobInfo).getBytes(Charset.forName("UTF-8")));
}
}
... ... @@ -157,22 +154,22 @@ public class JobServiceImpl implements JobService {
if (logger.isDebugEnabled()) {
logger.debug("begin to startJob job , job is {}", jobInfo);
}
String jobPath = JOBINFO_PATH + "/" + jobInfo.getModule() + "/jobinfo/" + jobInfo.getJobName();
jobInfo.setOperator(ConstantEnum.JobOperatorEnum.EXEC.getValue());
try {
Stat stat = client.checkExists().forPath(jobPath);
if (stat == null) {
// 不存在
String message = "job not exist, job is " + jobInfo;
logger.error(message);
throw new Exception(message);
} else {
// 存在
client.setData().forPath(jobPath, JSON.toJSONString(jobInfo).getBytes(Charset.forName("UTF-8")));
}
} catch (Exception e) {
throw e;
if (!checkJobExist(jobInfo)) {
// 不存在
String message = "job not exist, job is " + jobInfo;
logger.error(message);
throw new Exception(message);
} else {
// 存在
String jobPath = JOBINFO_PATH + "/" + jobInfo.getModule() + "/jobinfo/" + jobInfo.getJobName();
jobInfo.setOperator(ConstantEnum.JobOperatorEnum.EXEC.getValue());
ScheduleJobInfo updateScheduleJobInfo = Container.CONTAINER.getJobsByModule(jobInfo.getModule())
.get(jobInfo.getJobName());
updateScheduleJobInfo.getJobInfo().setOperator(ConstantEnum.JobOperatorEnum.EXEC.getValue());
updateScheduleJobInfo.getJobInfo().setStatus(ConstantEnum.JobStatusEnum.STARTING.getValue());
client.setData().forPath(jobPath,
JSON.toJSONString(updateScheduleJobInfo.getJobInfo()).getBytes(Charset.forName("UTF-8")));
}
}
... ... @@ -184,22 +181,22 @@ public class JobServiceImpl implements JobService {
if (jobInfo.getStatus() == ConstantEnum.JobStatusEnum.STOPPING.getValue()) {
return;
}
String jobPath = JOBINFO_PATH + "/" + jobInfo.getModule() + "/jobinfo/" + jobInfo.getJobName();
jobInfo.setOperator(ConstantEnum.JobOperatorEnum.STOP.getValue());
try {
Stat stat = client.checkExists().forPath(jobPath);
if (stat == null) {
// 不存在
String message = "job not exist, job is " + jobInfo;
logger.error(message);
throw new Exception(message);
} else {
// 存在
client.setData().forPath(jobPath, JSON.toJSONString(jobInfo).getBytes(Charset.forName("UTF-8")));
}
} catch (Exception e) {
throw e;
if (!checkJobExist(jobInfo)) {
String message = "job not exist, job is " + jobInfo;
logger.error(message);
throw new Exception(message);
} else {
// 存在
String jobPath = JOBINFO_PATH + "/" + jobInfo.getModule() + "/jobinfo/" + jobInfo.getJobName();
jobInfo.setOperator(ConstantEnum.JobOperatorEnum.STOP.getValue());
ScheduleJobInfo updateScheduleJobInfo = Container.CONTAINER.getJobsByModule(jobInfo.getModule())
.get(jobInfo.getJobName());
updateScheduleJobInfo.getFuture().cancel(false);
updateScheduleJobInfo.getJobInfo().setOperator(ConstantEnum.JobOperatorEnum.STOP.getValue());
updateScheduleJobInfo.getJobInfo().setStatus(ConstantEnum.JobStatusEnum.STOPPING.getValue());
client.setData().forPath(jobPath,
JSON.toJSONString(updateScheduleJobInfo.getJobInfo()).getBytes(Charset.forName("UTF-8")));
}
}
... ... @@ -207,8 +204,7 @@ public class JobServiceImpl implements JobService {
public void updateJob(JobInfo jobInfo) throws Exception {
String jobPath = JOBINFO_PATH + "/" + jobInfo.getModule() + "/jobinfo/" + jobInfo.getJobName();
Stat stat = client.checkExists().forPath(jobPath);
if (stat == null) {
if (!checkJobExist(jobInfo)) {
// 当任务不存在时,create之
jobInfo.setOperator(ConstantEnum.JobOperatorEnum.CREATE.getValue());
createJob(jobInfo);
... ... @@ -231,23 +227,25 @@ public class JobServiceImpl implements JobService {
if (logger.isDebugEnabled()) {
logger.debug("begin to retryJob job , job is {}", jobInfo);
}
String jobPath = JOBINFO_PATH + "/" + jobInfo.getModule() + "/jobinfo/" + jobInfo.getJobName();
jobInfo.setOperator(ConstantEnum.JobOperatorEnum.TRIGGER.getValue());
try {
Stat stat = client.checkExists().forPath(jobPath);
if (stat == null) {
// 不存在
String message = "job not exist, job is " + jobInfo;
logger.error(message);
throw new Exception(message);
} else {
// 存在
client.setData().forPath(jobPath, JSON.toJSONString(jobInfo).getBytes(Charset.forName("UTF-8")));
}
} catch (Exception e) {
if (!checkJobExist(jobInfo)) {
String message = "job not exist, job is " + jobInfo;
logger.error(message);
throw new Exception(message);
} else {
jobInfo.setOperator(ConstantEnum.JobOperatorEnum.TRIGGER.getValue());
// 存在
String jobPath = JOBINFO_PATH + "/" + jobInfo.getModule() + "/jobinfo/" + jobInfo.getJobName();
client.setData().forPath(jobPath, JSON.toJSONString(jobInfo).getBytes(Charset.forName("UTF-8")));
}
}
throw e;
private boolean checkJobExist(JobInfo jobInfo) throws Exception {
String jobPath = JOBINFO_PATH + "/" + jobInfo.getModule() + "/jobinfo/" + jobInfo.getJobName();
Stat stat = client.checkExists().forPath(jobPath);
if (stat == null) {
return false;
} else {
return true;
}
}
}
... ...
... ... @@ -42,6 +42,7 @@ public class JobTrigger implements Runnable {
String jobPath = CLIENT_PATH_PREFIX + jobInfo.getModule() + "/jobinfo/" + jobInfo.getJobName();
try {
client.setData().forPath(jobPath, JSON.toJSONString(jobInfo).getBytes(Charset.forName("UTF-8")));
System.out.println("执行 "+jobInfo.getModule()+":"+jobInfo.getJobName());
} catch (Exception e) {
logger.error("trigger job faild , job is {}, exception {}", jobInfo, e);
//TODO 记录错误日志,上报告警
... ...