YohoInternalJob.java
7.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package com.yoho.quartz.job.inner;
import com.alibaba.fastjson.JSON;
import com.yoho.core.common.helpers.StackTraceHelper;
import com.yoho.error.event.JobAlarmEvent;
import com.yoho.quartz.alarm.JobAlarmEventService;
import com.yoho.quartz.contants.Contants;
import com.yoho.quartz.domain.JobDetail;
import com.yoho.quartz.domain.JobProcessResult;
import com.yoho.quartz.domain.JobResultCode;
import com.yoho.quartz.domain.JobRunResult;
import com.yoho.quartz.job.JobProcessorHolder;
import com.yoho.quartz.job.YhJob;
import com.yoho.quartz.utils.DateUtils;
import com.yoho.quartz.utils.StringOptUtil;
import com.yoho.quartz.utils.ZkPathOptUtil;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.nio.charset.Charset;
import java.util.Date;
/**
* Author:yanzhang.fu
* Date:2017/12/6
* Description: 框架内部实现quartz的Job接口,实际上是YhJob的实现类的一个代理,不允许被扩展
* Modified By:
**/
public final class YohoInternalJob implements Job {
private static final Logger logger = LoggerFactory.getLogger(YohoInternalJob.class);
@Autowired
private CuratorFramework client;
@Autowired
private JobAlarmEventService jobAlarmEventService;
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
/**
* 主要干如下几件事:
* 1、执行具体业务逻辑
* 2、返回执行结果,入库job_run_log,有失败,上报告警。
*3、若为重试操作,则删除trigger。
*/
logger.info("start to execute job...,context is {}", context);
String exceptionDesc = "";
String result = "fail";
String startTime = DateUtils.date2String(new Date(), DateUtils.DATE_TIME_FORMAT);
try {
//获取执行的上下文
String jobContext = context.getJobDetail().getJobDataMap().getString(Contants.JobDataMapKey.JOB_CONTEXT);
String processerClazz = context.getJobDetail().getJobDataMap().getString(Contants.JobDataMapKey.PROCESSOR);
YhJob processor = JobProcessorHolder.getProcessor(processerClazz);
if (null == processor) {
logger.info("start to execute job...,jobContext is {}", jobContext);
logger.info("start to execute job...,processerClazz is {}", processerClazz);
JobProcessorHolder.printHolder();
//已经删除的job
result = "deleted job";
return;
}
JobProcessResult jobProcessResult = processor.process(jobContext);
if (jobProcessResult == null||jobProcessResult.getJobResultCode() == null)
{
result = "success";
return;
}
switch (jobProcessResult.getJobResultCode()) {
case FAIL:
result = jobProcessResult.getJobResultCode().getValue();
exceptionDesc = jobProcessResult.getDesc();
logger.error("execute job failed, the error info is {}", exceptionDesc);
jobAlarmEventService.publishEnvent(buildEvent(context, exceptionDesc));
break;
case SUCCESS:
result = jobProcessResult.getJobResultCode().getValue();
exceptionDesc = jobProcessResult.getDesc();
break;
}
} catch (Throwable e) {
//执行异常处理,记录异常,包括runtimeexception
String exceptionTrace = StackTraceHelper.getStackMsg(e);
String cause = e.toString();
StringBuilder sb = new StringBuilder();
sb.append(cause);
sb.append("\n");
sb.append(exceptionTrace);
exceptionDesc = sb.toString();
logger.error("execute job failed, the exception is {}", e);
jobAlarmEventService.publishEnvent(buildEvent(context, exceptionDesc));
} finally {
//任务执行日志入库
try {
JobRunResult runResult = buildResult(context, result, exceptionDesc, startTime);
//写到zk上,然后由运维侧监听器入库。需要获取zk的客户端实例
sendJobRunLog(client, runResult);
} catch (Throwable e) {
logger.error("insert jobRunlog failed,Exception is {}", e);
}
}
}
private void sendJobRunLog(CuratorFramework client, JobRunResult runResult) {
try {
//获取jobname
String jobMetaPath = StringOptUtil.append(Contants.ZkPath.JOB_PREFIX, "/", runResult.getModule(), "/", runResult.getJobGroup(), "/", runResult.getJobId(), Contants.ZkPath.JOB_META_POSTFIX);
JobDetail jobDetail = JSON.parseObject(client.getData().forPath(jobMetaPath), JobDetail.class);
runResult.setJobName(jobDetail.getJobName());
String path = StringOptUtil.append(Contants.ZkPath.JOB_PREFIX, "/", runResult.getModule(), "/", runResult.getJobGroup(), "/", runResult.getJobId(), Contants.ZkPath.JOB_RESULT_POSTFIX);
if (ZkPathOptUtil.checkPathExist(client, path)) {
client.setData().forPath(path, JSON.toJSONString(runResult).getBytes(Charset.forName("UTF-8")));
} else {
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, JSON.toJSONString(runResult).getBytes(Charset.forName("UTF-8")));
}
} catch (Exception e) {
logger.error("send jobRunLog to zk failed. runResult is {}, message is {}", runResult, e);
}
}
private JobRunResult buildResult(JobExecutionContext context, String result, String exceptionDesc, String startTime) throws SchedulerException {
JobRunResult runResult = new JobRunResult();
runResult.setJobId(context.getJobDetail().getKey().getName());
runResult.setJobGroup(context.getJobDetail().getKey().getGroup());
runResult.setEndTime(DateUtils.long2String(System.currentTimeMillis(), DateUtils.DATE_TIME_FORMAT));
runResult.setMessage(exceptionDesc);
runResult.setModule(context.getJobDetail().getJobDataMap().getString(Contants.JobDataMapKey.MODULE_KEY));
runResult.setProcessor(context.getJobDetail().getJobDataMap().getString(Contants.JobDataMapKey.PROCESSOR));
runResult.setContext(context.getJobDetail().getJobDataMap().getString(Contants.JobDataMapKey.JOB_CONTEXT));
runResult.setRecordDate(System.currentTimeMillis());
runResult.setResult(result);
runResult.setSchedulerInstanceId(context.getScheduler().getSchedulerInstanceId());
runResult.setSchedulerName(context.getScheduler().getSchedulerName());
runResult.setStartTime(startTime);
return runResult;
}
private JobAlarmEvent buildEvent(JobExecutionContext context, String exceptionDesc) {
try {
String scheduleName = context.getScheduler().getSchedulerName();
String jobName = context.getJobDetail().getKey().getName();
String jobGroup = context.getJobDetail().getKey().getGroup();
return new JobAlarmEvent(scheduleName, jobName, jobGroup, scheduleName, exceptionDesc);
} catch (SchedulerException e) {
logger.error("get scheduler failed. exception message is {}", e);
}
return null;
}
}