YohoInternalJob.java 7.59 KB
package com.yoho.quartz.job.inner;

import com.alibaba.fastjson.JSON;
import com.yoho.core.common.helpers.StackTraceHelper;
import com.yoho.error.event.JobAlarmEvent;
import com.yoho.quartz.alarm.JobAlarmEventService;
import com.yoho.quartz.contants.Contants;
import com.yoho.quartz.domain.JobDetail;
import com.yoho.quartz.domain.JobProcessResult;
import com.yoho.quartz.domain.JobResultCode;
import com.yoho.quartz.domain.JobRunResult;
import com.yoho.quartz.job.JobProcessorHolder;
import com.yoho.quartz.job.YhJob;
import com.yoho.quartz.utils.DateUtils;
import com.yoho.quartz.utils.StringOptUtil;
import com.yoho.quartz.utils.ZkPathOptUtil;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

import java.nio.charset.Charset;
import java.util.Date;

/**
 * Author:yanzhang.fu
 * Date:2017/12/6
 * Description: 框架内部实现quartz的Job接口,实际上是YhJob的实现类的一个代理,不允许被扩展
 * Modified By:
 **/
public final class YohoInternalJob implements Job {

    private static final Logger logger = LoggerFactory.getLogger(YohoInternalJob.class);

    @Autowired
    private CuratorFramework client;
    @Autowired
    private JobAlarmEventService jobAlarmEventService;

    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        /**
         * 主要干如下几件事:
         * 1、执行具体业务逻辑
         * 2、返回执行结果,入库job_run_log,有失败,上报告警。
         *3、若为重试操作,则删除trigger。
         */
        logger.info("start to execute job...,context is {}", context);
        String exceptionDesc = "";
        String result = "fail";
        String startTime = DateUtils.date2String(new Date(), DateUtils.DATE_TIME_FORMAT);
        try {
            //获取执行的上下文
            String jobContext = context.getJobDetail().getJobDataMap().getString(Contants.JobDataMapKey.JOB_CONTEXT);
            String processerClazz = context.getJobDetail().getJobDataMap().getString(Contants.JobDataMapKey.PROCESSOR);
            YhJob processor = JobProcessorHolder.getProcessor(processerClazz);
            if (null == processor) {
                logger.info("start to execute job...,jobContext is {}", jobContext);
                logger.info("start to execute job...,processerClazz is {}", processerClazz);
                JobProcessorHolder.printHolder();
                //已经删除的job
                result = "deleted job";
                return;
            }
            JobProcessResult jobProcessResult = processor.process(jobContext);
            if (jobProcessResult == null||jobProcessResult.getJobResultCode() == null)
            {
                result = "success";
                return;
            }
            switch (jobProcessResult.getJobResultCode()) {
                case FAIL:
                    result = jobProcessResult.getJobResultCode().getValue();
                    exceptionDesc = jobProcessResult.getDesc();
                    logger.error("execute job failed, the error info is {}", exceptionDesc);
                    jobAlarmEventService.publishEnvent(buildEvent(context, exceptionDesc));
                    break;
                case SUCCESS:
                    result = jobProcessResult.getJobResultCode().getValue();
                    exceptionDesc = jobProcessResult.getDesc();
                    break;
            }
        } catch (Throwable e) {
            //执行异常处理,记录异常,包括runtimeexception
            String exceptionTrace = StackTraceHelper.getStackMsg(e);
            String cause = e.toString();
            StringBuilder sb = new StringBuilder();
            sb.append(cause);
            sb.append("\n");
            sb.append(exceptionTrace);
            exceptionDesc = sb.toString();
            logger.error("execute job failed, the exception is {}", e);
            jobAlarmEventService.publishEnvent(buildEvent(context, exceptionDesc));
        } finally {
            //任务执行日志入库
            try {
                JobRunResult runResult = buildResult(context, result, exceptionDesc, startTime);
                //写到zk上,然后由运维侧监听器入库。需要获取zk的客户端实例
                sendJobRunLog(client, runResult);
            } catch (Throwable e) {
                logger.error("insert jobRunlog failed,Exception is {}", e);
            }

        }

    }

    private void sendJobRunLog(CuratorFramework client, JobRunResult runResult) {
        try {
            //获取jobname
            String jobMetaPath = StringOptUtil.append(Contants.ZkPath.JOB_PREFIX, "/", runResult.getModule(), "/", runResult.getJobGroup(), "/", runResult.getJobId(), Contants.ZkPath.JOB_META_POSTFIX);
            JobDetail jobDetail = JSON.parseObject(client.getData().forPath(jobMetaPath), JobDetail.class);
            runResult.setJobName(jobDetail.getJobName());
            String path = StringOptUtil.append(Contants.ZkPath.JOB_PREFIX, "/", runResult.getModule(), "/", runResult.getJobGroup(), "/", runResult.getJobId(), Contants.ZkPath.JOB_RESULT_POSTFIX);
            if (ZkPathOptUtil.checkPathExist(client, path)) {
                client.setData().forPath(path, JSON.toJSONString(runResult).getBytes(Charset.forName("UTF-8")));
            } else {
                client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, JSON.toJSONString(runResult).getBytes(Charset.forName("UTF-8")));
            }
        } catch (Exception e) {
            logger.error("send jobRunLog to zk failed. runResult is {}, message is {}", runResult, e);
        }
    }

    private JobRunResult buildResult(JobExecutionContext context, String result, String exceptionDesc, String startTime) throws SchedulerException {
        JobRunResult runResult = new JobRunResult();
        runResult.setJobId(context.getJobDetail().getKey().getName());
        runResult.setJobGroup(context.getJobDetail().getKey().getGroup());
        runResult.setEndTime(DateUtils.long2String(System.currentTimeMillis(), DateUtils.DATE_TIME_FORMAT));
        runResult.setMessage(exceptionDesc);
        runResult.setModule(context.getJobDetail().getJobDataMap().getString(Contants.JobDataMapKey.MODULE_KEY));
        runResult.setProcessor(context.getJobDetail().getJobDataMap().getString(Contants.JobDataMapKey.PROCESSOR));
        runResult.setContext(context.getJobDetail().getJobDataMap().getString(Contants.JobDataMapKey.JOB_CONTEXT));
        runResult.setRecordDate(System.currentTimeMillis());
        runResult.setResult(result);
        runResult.setSchedulerInstanceId(context.getScheduler().getSchedulerInstanceId());
        runResult.setSchedulerName(context.getScheduler().getSchedulerName());
        runResult.setStartTime(startTime);
        return runResult;
    }

    private JobAlarmEvent buildEvent(JobExecutionContext context, String exceptionDesc) {
        try {
            String scheduleName = context.getScheduler().getSchedulerName();
            String jobName = context.getJobDetail().getKey().getName();
            String jobGroup = context.getJobDetail().getKey().getGroup();
            return new JobAlarmEvent(scheduleName, jobName, jobGroup, scheduleName, exceptionDesc);
        } catch (SchedulerException e) {
            logger.error("get scheduler failed. exception message is {}", e);
        }
        return null;
    }

}