Authored by fu

重构

/**
*
*/
package com.yoho.jobs.client.constant;
/**
* @author yanzhang.fu
*
*/
public interface JobConstant {
/**
* 任务状态
*
* @author yanzhang.fu
*
*/
enum JobStatus {
NOT_EXEC(0), EXECING(1), PAUSE(2), STOP(3);
int status;
private JobStatus(int status) {
this.status = status;
}
public int getStatus() {
return status;
}
}
/**
* 任务操作指令
*
* @author yanzhang.fu
*
*/
enum JobOperator {
START(0), PAUSE(1), STOP(2);
int operator;
private JobOperator(int operator) {
this.operator = operator;
}
public int getOperator() {
return operator;
}
}
/**
* 任务类型 (单节点(0),多节点(1))
*
* @author yanzhang.fu
*
*/
enum JobType {
SINGLE_NODE(0), MUTIL_NODE(1);
int type;
private JobType(int type) {
this.type = type;
}
public int getType() {
return type;
}
}
enum JobResult{
SUCCESS(0),FAIL(1);
int result;
private JobResult(int result) {
this.result = result;
}
public int getResult() {
return result;
}
}
}
... ... @@ -34,8 +34,10 @@ import com.yoho.core.rest.client.ServiceCaller;
import com.yoho.jobs.client.JobProcessor;
import com.yoho.jobs.client.YHJobDef;
import com.yoho.jobs.client.container.JobContainer;
import com.yoho.jobs.common.domain.ConstantEnum;
import com.yoho.jobs.common.domain.JobInfo;
import com.yoho.jobs.common.domain.ProcessResult;
import com.yoho.jobs.common.jobopt.JobRegisterService;
/**
*
... ... @@ -53,6 +55,9 @@ public class InstantiationBeanPostProcessor implements ApplicationContextAware,
private ThreadPoolTaskExecutor jobExecutor;
@Autowired
private JobRegisterService jobRegisterService;
private ApplicationContext ctx;
@Autowired
... ... @@ -219,12 +224,17 @@ public class InstantiationBeanPostProcessor implements ApplicationContextAware,
}
private void batchRegiste(List<JobInfo> jobInfos) throws Exception {
String jobPath = CLIENT_PATH_PREFIX + this.module + "/jobinfo";
// String jobPath = CLIENT_PATH_PREFIX + this.module + "/jobinfo";
// if (CollectionUtils.isNotEmpty(jobInfos)) {
// for (JobInfo jobInfo : jobInfos) {
// client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
// jobPath + "/" + jobInfo.getJobName(),
// JSON.toJSONString(jobInfo).getBytes(Charset.forName("UTF-8")));
// }
// }
if (CollectionUtils.isNotEmpty(jobInfos)) {
for (JobInfo jobInfo : jobInfos) {
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
jobPath + "/" + jobInfo.getJobName(),
JSON.toJSONString(jobInfo).getBytes(Charset.forName("UTF-8")));
jobRegisterService.registJob(jobInfo);
}
}
... ... @@ -257,7 +267,7 @@ public class InstantiationBeanPostProcessor implements ApplicationContextAware,
// jobManager 定时trigger任务
JobInfo jobInfo = JSON.parseObject(data.getData(), JobInfo.class);
logger.info("start to schedule job, job is {}", jobInfo);
if (jobInfo.getOperator() == 4) {
if (jobInfo.getOperator() == ConstantEnum.JobOperatorEnum.TRIGGER.getValue()) {
// 异步并行执行 线程池
ListenableFuture<ProcessResult> listenableFuture = jobExecutor
... ... @@ -277,13 +287,13 @@ public class InstantiationBeanPostProcessor implements ApplicationContextAware,
public void onFailure(Throwable ex) {
// 任务处理器有异常,记录日志
logger.info("process job failed. jobinfo={} , Exception is {}", jobInfo, ex);
//上报结果
// 上报结果
ProcessResult result = new ProcessResult();
result.setJobInfo(jobInfo);
result.setMessage(ex.getMessage());
result.setCode("error");
SendResult2Server(result);
}
});
... ... @@ -306,8 +316,8 @@ public class InstantiationBeanPostProcessor implements ApplicationContextAware,
// 上报结果到jobManager中
private void SendResult2Server(ProcessResult result) {
logger.info(" send job result to jobManager.");
//TODO 任务执行失败,上报告警
// TODO 任务执行失败,上报告警
serviceCaller.call("jobs.saveJobResult", result, void.class);
}
... ...
... ... @@ -10,8 +10,8 @@ import org.slf4j.LoggerFactory;
import com.yoho.core.common.utils.DateUtil;
import com.yoho.core.common.utils.LocalIp;
import com.yoho.jobs.client.constant.JobConstant;
import com.yoho.jobs.client.container.JobContainer;
import com.yoho.jobs.common.domain.ConstantEnum;
import com.yoho.jobs.common.domain.JobInfo;
import com.yoho.jobs.common.domain.ProcessResult;
... ... @@ -38,7 +38,7 @@ public class JobTrigger implements Callable {
ProcessResult result = null;
String beginTime = DateUtil.getcurrentDateTime();
// 若果是单机任务,需要看当前节点是否是目标节点
if (JobConstant.JobType.SINGLE_NODE.getType() == jobInfo.getJobType()) {
if (ConstantEnum.JobType.SINGLE_NODE.getType() == jobInfo.getJobType()) {
if (LocalIp.getLocalIp().trim().equals(jobInfo.getProcessIp())) {
logger.info("job exec ,job name is {}", jobInfo.getJobName());
result = JobContainer.PROCESSORCONTAINER.getProcessor(jobInfo.getProcessor())
... ...
... ... @@ -20,5 +20,9 @@
<artifactId>lombok</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.yoho.core</groupId>
<artifactId>yoho-core-common</artifactId>
</dependency>
</dependencies>
</project>
... ...
/**
*
*/
package com.yoho.jobs.server.domain;
package com.yoho.jobs.common.domain;
/**
* @author yanzhang.fu
... ... @@ -74,11 +74,25 @@ public interface ConstantEnum {
public class JobResponse {
public static final String SUCCESS = "200";
public static final String SUCCESS_DESC = "success";
public static final String FAIL = "1";
}
enum JobType {
SINGLE_NODE(0), MUTIL_NODE(1);
int type;
private JobType(int type) {
this.type = type;
}
public int getType() {
return type;
}
}
}
... ...
/**
*
*/
package com.yoho.jobs.common.jobopt;
import com.yoho.jobs.common.domain.JobInfo;
/**
* @author yanzhang.fu
*
*/
public interface JobRegisterService {
void registJob(JobInfo jobInfo) throws Exception;
boolean checkJobExist(JobInfo jobInfo) throws Exception;
}
... ...
/**
*
*/
package com.yoho.jobs.common.jobopt.impl;
import java.nio.charset.Charset;
import javax.annotation.Resource;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import com.alibaba.fastjson.JSON;
import com.yoho.jobs.common.domain.ConstantEnum;
import com.yoho.jobs.common.domain.JobInfo;
import com.yoho.jobs.common.jobopt.JobRegisterService;
/**
* @author yanzhang.fu
*
*/
@Service("jobRegisterService")
public class JobRegisterServiceImpl implements JobRegisterService {
private final static Logger logger = LoggerFactory.getLogger(JobRegisterServiceImpl.class);
private static final String JOBINFO_PATH = "/job/jobclient";
@Resource(name = "curatorFramework")
private CuratorFramework client;
/*
* (non-Javadoc)
*
* @see
* com.yoho.jobs.common.jobopt.JobRegisterService#registJob(com.yoho.jobs.common
* .domain.JobInfo)
*/
@Override
public void registJob(JobInfo jobInfo) throws Exception {
if (logger.isDebugEnabled()) {
logger.debug("begin to registJob job , job is {}", jobInfo);
}
if (!checkJobExist(jobInfo)) {
String message = "the job already exist in zk,plz check it ,jobinfo is " + jobInfo;
logger.error("the job already exits in zk,plz check it ,jobinfo is {}", jobInfo);
throw new Exception(message);
} else {
String jobPath = JOBINFO_PATH + "/" + jobInfo.getModule() + "/jobinfo/" + jobInfo.getJobName();
jobInfo.setOperator(ConstantEnum.JobOperatorEnum.CREATE.getValue());
jobInfo.setStatus(ConstantEnum.JobStatusEnum.STOPPING.getValue());
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(jobPath,
JSON.toJSONString(jobInfo).getBytes(Charset.forName("UTF-8")));
}
}
public boolean checkJobExist(JobInfo jobInfo) throws Exception {
String jobPath = JOBINFO_PATH + "/" + jobInfo.getModule() + "/jobinfo/" + jobInfo.getJobName();
Stat stat = client.checkExists().forPath(jobPath);
if (stat == null) {
return false;
} else {
return true;
}
}
}
... ...
... ... @@ -40,17 +40,40 @@
<form action="/jobs/jomanager/createjob.do" class="form col-md-12 center-block" name="form">
<div class="form-group">
<input type="text" class="form-control input-lg" id="jobName" name="jobName" placeholder="任务名称">
<input type="text" class="form-control input-lg" id="jobName" name="jobName" placeholder="任务名称" required>
</div>
<div class="form-group">
<input type="text" class="form-control input-lg" id="jobGroup" name="jobGroup" default="default" placeholder="任务分组,默认default">
<input type="text" class="form-control input-lg" id="jobGroup" name="jobGroup" placeholder="任务分组,默认default">
</div>
<!-- <div class="form-group">
<input type="text" class="form-control input-lg" id="checkCode" name="checkCode" maxlength="4" placeholder="校验码">
<img src="common/checkCode.jsp" />
</div> -->
<div class="form-group">
<input type="button" value="立刻登录" id="loginBtn" class="btn btn-primary btn-lg btn-block" name="loginBtn"/>
<input type="text" class="form-control input-lg" id="module" name="module" placeholder="任务所属模块">
</div>
<div class="form-group">
<input type="text" class="form-control input-lg" id="processor" name="processor" placeholder="任务执行器">
</div>
<div class="form-group">
<input type="text" class="form-control input-lg" id="processip" name="processip" placeholder="任务执行ip">
</div>
<div class="form-group">
<select name="jobtype">
<option value="0">单节点执行</option>
<option value="1">多节点执行</option>
</select>
</div>
<div class="form-group">
<input type="text" class="form-control input-lg" id="cronexception" name="cronexception" placeholder="任务执行周期">
</div>
<div class="form-group">
<textarea rows="5" cols="65" name="context" placeholder="任务执行所需入参"></textarea>
</div>
<div class="form-group">
<select name="activemode">
<option value="0">单次任务</option>
<option value="1">周期任务</option>
</select>
</div>
<div class="form-group">
<input type="button" value="创建" id="createJobBtn" class="btn btn-primary btn-lg btn-block" name="createJobBtn"/>
<!-- <span><a href="#">找回密码</a></span> <span></span> -->
</div>
<div id="messageAlert"></div>
... ... @@ -63,36 +86,13 @@
</div>
<script>
$(function() {
$("#loginBtn").click(function() {
$("#createJobBtn").click(function() {
$(this).prop("disabled", "disabled");
var loginName = $("#loginName");
var loginPwd = $("#loginPwd");
//var checkCode = $("#checkCode");
if (loginName.val() == "") {
$("#messageAlert").alerts({
content : "请输入登录名。",
type : "danger"
});
$(this).removeAttr("disabled");
loginName.focus();
return;
}
if (loginPwd.val() == "") {
$("#messageAlert").alerts({
content : "请输入密码。",
type : "danger"
});
$(this).removeAttr("disabled");
loginPwd.focus();
return;
}
$.ajax({
url : "jobmanager/showalljob.do",
url : "/jobs/jobmanager/createjob.do",
data : {
loginName : loginName.val(),
loginPwd : loginPwd.val()
//checkCode : checkCode.val()
},
dataType : "json",
type : "POST",
... ... @@ -102,7 +102,7 @@
content : "登录失败",
type : "danger"
});
$("#loginBtn").removeAttr("disabled");
$("#createJobBtn").removeAttr("disabled");
return;
}
if (data.code != 200) {
... ... @@ -110,10 +110,10 @@
content : data.message,
type : "danger"
});
$("#loginBtn").removeAttr("disabled");
$("#createJobBtn").removeAttr("disabled");
return;
}
location.href = "html/marketingPushList.html";
location.href = "/jobs/html/showjob.html";
}
});
});
... ...
... ... @@ -13,8 +13,8 @@ import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.yoho.jobs.common.domain.ConstantEnum;
import com.yoho.jobs.common.domain.JobInfo;
import com.yoho.jobs.server.domain.ConstantEnum;
import com.yoho.jobs.server.domain.JobResponse;
import com.yoho.jobs.server.domain.PageResponse;
import com.yoho.jobs.server.domain.ShowAllJobResponse;
... ...
... ... @@ -7,6 +7,8 @@ import java.io.Serializable;
import org.apache.commons.lang3.StringUtils;
import com.yoho.jobs.common.domain.ConstantEnum;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
... ...
... ... @@ -21,8 +21,8 @@ import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSON;
import com.yoho.core.common.utils.LocalIp;
import com.yoho.jobs.common.domain.ConstantEnum;
import com.yoho.jobs.common.domain.JobInfo;
import com.yoho.jobs.server.domain.ConstantEnum;
import com.yoho.jobs.server.domain.ScheduleJobInfo;
import com.yoho.jobs.server.scheduler.Container;
import com.yoho.jobs.server.scheduler.JobScheduleService;
... ...
... ... @@ -12,14 +12,13 @@ import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.support.CronTrigger;
import com.yoho.jobs.server.domain.ConstantEnum;
import com.yoho.jobs.common.domain.ConstantEnum;
import com.yoho.jobs.server.domain.ScheduleJobInfo;
import com.yoho.jobs.server.scheduler.Container;
import com.yoho.jobs.server.scheduler.IScheduleJobServer;
import com.yoho.jobs.server.scheduler.JobScheduleService;
import com.yoho.jobs.server.utils.DateUtil;
/**
* @author yanzhang.fu
*
... ... @@ -31,6 +30,7 @@ public class JobScheduleServiceImpl implements JobScheduleService {
private CuratorFramework client;
private final static Logger logger = LoggerFactory.getLogger(JobScheduleServiceImpl.class);
/*
* (non-Javadoc)
*
... ... @@ -53,21 +53,21 @@ public class JobScheduleServiceImpl implements JobScheduleService {
return;
}
ThreadPoolTaskScheduler scheduler = job.getScheduler();
if( scheduler== null) {
scheduler = new ThreadPoolTaskScheduler();
if (scheduler == null) {
scheduler = new ThreadPoolTaskScheduler();
scheduler.initialize();
job.setScheduler(scheduler);
}
String cronExpress = job.getJobInfo().getCronExpression();
// 选取执行节点
if (job.getJobInfo().getJobType() == 0) {
if (job.getJobInfo().getJobType() == ConstantEnum.JobType.SINGLE_NODE.getType()) {
String execIp = job.getJobInfo().getProcessIp();
if (StringUtils.isEmpty(execIp) || (Container.CONTAINER.getAllClient()
.containsKey(job.getJobInfo().getModule())
&& !Container.CONTAINER.getAllClient().get(job.getJobInfo().getModule()).contains(execIp))) {
String ip = scheduleJobServer.getServerIp(job.getJobInfo().getModule());
if(StringUtils.isEmpty(ip)) {
if (StringUtils.isEmpty(ip)) {
logger.error(" select host to exec job faild , plz check config on zk");
throw new Exception("select host faild.");
}
... ...
... ... @@ -11,14 +11,14 @@ import java.util.Map;
import org.apache.commons.collections.CollectionUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSON;
import com.yoho.jobs.common.domain.ConstantEnum;
import com.yoho.jobs.common.domain.JobInfo;
import com.yoho.jobs.server.domain.ConstantEnum;
import com.yoho.jobs.common.jobopt.JobRegisterService;
import com.yoho.jobs.server.domain.ScheduleJobInfo;
import com.yoho.jobs.server.scheduler.Container;
import com.yoho.jobs.server.scheduler.JobService;
... ... @@ -34,6 +34,8 @@ public class JobServiceImpl implements JobService {
private CuratorFramework client;
private JobRegisterService jobRegisterService;
/*
* (non-Javadoc)
*
... ... @@ -129,23 +131,27 @@ public class JobServiceImpl implements JobService {
this.client = client;
}
public JobRegisterService getJobRegisterService() {
return jobRegisterService;
}
public void setJobRegisterService(JobRegisterService jobRegisterService) {
this.jobRegisterService = jobRegisterService;
}
@Override
public void createJob(JobInfo jobInfo) throws Exception {
if (logger.isDebugEnabled()) {
logger.debug("begin to create job , job is {}", jobInfo);
}
if (!checkJobExist(jobInfo)) {
String message = "the job already exist in zk,plz check it ,jobinfo is " + jobInfo;
logger.error("the job already exits in zk,plz check it ,jobinfo is {}", jobInfo);
throw new Exception(message);
} else {
String jobPath = JOBINFO_PATH + "/" + jobInfo.getModule() + "/jobinfo/" + jobInfo.getJobName();
jobInfo.setOperator(ConstantEnum.JobOperatorEnum.CREATE.getValue());
jobInfo.setStatus(ConstantEnum.JobStatusEnum.STOPPING.getValue());
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(jobPath,
JSON.toJSONString(jobInfo).getBytes(Charset.forName("UTF-8")));
}
jobInfo.setOperator(ConstantEnum.JobOperatorEnum.CREATE.getValue());
jobInfo.setStatus(ConstantEnum.JobStatusEnum.STOPPING.getValue());
jobRegisterService.registJob(jobInfo);
ScheduleJobInfo scheduleJobInfo = new ScheduleJobInfo();
scheduleJobInfo.setFuture(null);
scheduleJobInfo.setJobInfo(jobInfo);
scheduleJobInfo.setScheduler(null);
Container.CONTAINER.addJobInfo(jobInfo.getModule(), scheduleJobInfo);
}
... ... @@ -155,7 +161,7 @@ public class JobServiceImpl implements JobService {
logger.debug("begin to startJob job , job is {}", jobInfo);
}
if (!checkJobExist(jobInfo)) {
if (!jobRegisterService.checkJobExist(jobInfo)) {
// 不存在
String message = "job not exist, job is " + jobInfo;
logger.error(message);
... ... @@ -184,7 +190,7 @@ public class JobServiceImpl implements JobService {
return;
}
if (!checkJobExist(jobInfo)) {
if (!jobRegisterService.checkJobExist(jobInfo)) {
String message = "job not exist, job is " + jobInfo;
logger.error(message);
throw new Exception(message);
... ... @@ -205,7 +211,7 @@ public class JobServiceImpl implements JobService {
public void updateJob(JobInfo jobInfo) throws Exception {
String jobPath = JOBINFO_PATH + "/" + jobInfo.getModule() + "/jobinfo/" + jobInfo.getJobName();
if (!checkJobExist(jobInfo)) {
if (!jobRegisterService.checkJobExist(jobInfo)) {
// 当任务不存在时,create之
jobInfo.setOperator(ConstantEnum.JobOperatorEnum.CREATE.getValue());
createJob(jobInfo);
... ... @@ -228,7 +234,7 @@ public class JobServiceImpl implements JobService {
if (logger.isDebugEnabled()) {
logger.debug("begin to retryJob job , job is {}", jobInfo);
}
if (!checkJobExist(jobInfo)) {
if (!jobRegisterService.checkJobExist(jobInfo)) {
String message = "job not exist, job is " + jobInfo;
logger.error(message);
throw new Exception(message);
... ... @@ -240,13 +246,4 @@ public class JobServiceImpl implements JobService {
}
}
private boolean checkJobExist(JobInfo jobInfo) throws Exception {
String jobPath = JOBINFO_PATH + "/" + jobInfo.getModule() + "/jobinfo/" + jobInfo.getJobName();
Stat stat = client.checkExists().forPath(jobPath);
if (stat == null) {
return false;
} else {
return true;
}
}
}
... ...
... ... @@ -25,6 +25,7 @@
</bean>
<bean id="jobService" class="com.yoho.jobs.server.scheduler.impl.JobServiceImpl">
<property name="client" ref="curatorFramework" />
<property name="jobRegisterService" ref="jobRegisterService"></property>
</bean>
<bean id="jobScheduleService"
... ...