Authored by fu

上报告警

... ... @@ -299,7 +299,7 @@ public class InstantiationBeanPostProcessor implements ApplicationContextAware,
ex.printStackTrace(pw);
result.setMessage(sw.toString());
result.setCode("error");
result.setCode(ConstantEnum.JobProcessResultCode.ERROR_CODE);
SendResult2Server(result);
} finally {
pw.close();
... ... @@ -314,7 +314,7 @@ public class InstantiationBeanPostProcessor implements ApplicationContextAware,
});
}
logger.info("-----do other thing ...-----");
break;
default:
... ... @@ -332,8 +332,6 @@ public class InstantiationBeanPostProcessor implements ApplicationContextAware,
// 上报结果到jobManager中
private void SendResult2Server(ProcessResult result) {
logger.info(" send job result to jobManager.");
// TODO 任务执行失败,上报告警
serviceCaller.call("jobs.saveJobResult", result, void.class);
}
... ...
... ... @@ -95,4 +95,17 @@ public interface ConstantEnum {
}
}
public class JobProcessResultCode {
/**
* 任务处理成功标志
*/
public static final String SUCCESS_CODE = "success";
/**
* 任务处理失败标志
*/
public static final String ERROR_CODE = "error";
}
}
... ...
... ... @@ -3,3 +3,7 @@ zkAddress=172.16.6.85:2181
web.port=8080
yoho.logs.level=debug
limitrecord=5
alarm.influxdb.url=http://192.168.102.22:8086
alarm.influxdb.user=root
alarm.influxdb.password=root
\ No newline at end of file
... ...
/**
*
*/
package com.yoho.jobs.server.alarm;
import java.util.concurrent.TimeUnit;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
/**
* @author yanzhang.fu
*
*/
public class InfluxDbSource {
private String influxdbUrl;
private String influxdbName;
private String password;
private final InfluxDB influxDB;
public InfluxDbSource(String influxdbUrl, String influxdbName, String password) {
this.influxdbUrl = influxdbUrl;
this.influxdbName = influxdbName;
this.password = password;
influxDB = InfluxDBFactory.connect(influxdbUrl, influxdbName, password);
influxDB.enableBatch(1500, 50, TimeUnit.MILLISECONDS);
}
public InfluxDB getInfluxDb() {
return influxDB;
}
public String getInfluxdbUrl() {
return influxdbUrl;
}
public void setInfluxdbUrl(String influxdbUrl) {
this.influxdbUrl = influxdbUrl;
}
public String getInfluxdbName() {
return influxdbName;
}
public void setInfluxdbName(String influxdbName) {
this.influxdbName = influxdbName;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
}
... ...
/**
*
*/
package com.yoho.jobs.server.alarm;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.influxdb.dto.Point;
import org.influxdb.dto.Point.Builder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author yanzhang.fu
*
*/
public class SendAlarmTool {
private Logger logger = LoggerFactory.getLogger(SendAlarmTool.class);
private InfluxDbSource dbSource;
private static final String DATABASE = "yoho-monitor";
private static final String MEASUREMENT = "yoho_job_alarm";
public void sendAlarm(Map<String, String> tags, Map<String, Object> fields) {
logger.info("send job alarm");
try {
Builder builder = Point.measurement(MEASUREMENT).time(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
builder.tag(tags);
builder.fields(fields);
dbSource.getInfluxDb().write(DATABASE, "default", builder.build());
logger.info("send alarm {} success", this.MEASUREMENT);
} catch (Exception e) {
logger.error("send job alarm failed.", e);
}
}
public InfluxDbSource getDbSource() {
return dbSource;
}
public void setDbSource(InfluxDbSource dbSource) {
this.dbSource = dbSource;
}
}
... ...
... ... @@ -4,6 +4,8 @@
package com.yoho.jobs.server.controller;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Resource;
... ... @@ -11,9 +13,11 @@ import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import com.yoho.jobs.common.domain.ConstantEnum;
import com.yoho.jobs.common.domain.ProcessResult;
import com.yoho.jobs.dal.IJobResultMapper;
import com.yoho.jobs.dal.domain.ProcessResultVO;
import com.yoho.jobs.server.alarm.SendAlarmTool;
/**
* @author yanzhang.fu
... ... @@ -26,12 +30,22 @@ public class SaveJobInfoController {
@Resource
private IJobResultMapper jobResultMapper;
@Resource(name = "sendAlarmTool")
private SendAlarmTool sendAlarmTool;
@RequestMapping("/saveJobResult.do")
public void saveJobResult(@RequestBody ProcessResult jobInfo) {
ProcessResultVO result = new ProcessResultVO();
convert(result, jobInfo);
//TODO 若执行失败,则上报告警并入库
// TODO 若执行失败,则上报告警并入库
if (ConstantEnum.JobProcessResultCode.ERROR_CODE.equals(jobInfo.getCode())) {
// 上报告警
Map<String, String> tags = new HashMap<String, String>();
buildTags(result, tags);
Map<String, Object> fields = new HashMap<String, Object>();
buildFields(result, fields);
sendAlarmTool.sendAlarm(tags, fields);
}
jobResultMapper.insertJob(result);
}
... ... @@ -54,4 +68,15 @@ public class SaveJobInfoController {
result.setStatus(jobInfo.getJobInfo().getStatus());
result.setRecordDate(new Date());
}
private void buildTags(ProcessResultVO result, Map<String, String> tags) {
tags.put("jobname", result.getJobName());
tags.put("module", result.getModule());
}
private void buildFields(ProcessResultVO result, Map<String, Object> fields) {
fields.put("message", result.getMessage());
fields.put("desc", "exec job faild");
}
}
... ...
... ... @@ -137,7 +137,7 @@ public class ElectionService {
if (jobInfo.getOperator() == ConstantEnum.JobOperatorEnum.EXEC.getValue()
|| jobInfo.getOperator() == ConstantEnum.JobOperatorEnum.TRIGGER.getValue()) {
jobInfo.setOperator(ConstantEnum.JobOperatorEnum.TRIGGER.getValue());
jobInfo.setStatus(ConstantEnum.JobStatusEnum.STARTING.getValue());
// jobInfo.setStatus(ConstantEnum.JobStatusEnum.STARTING.getValue());
jobScheduleService.schedule(scheduleJobInfo);
}
}
... ... @@ -162,7 +162,7 @@ public class ElectionService {
.getJobsByModule(jobInfo.getModule()).get(jobInfo.getJobName());
if (jobInfo.getOperator() == ConstantEnum.JobOperatorEnum.EXEC.getValue()) {
jobInfo.setOperator(ConstantEnum.JobOperatorEnum.TRIGGER.getValue());
jobInfo.setStatus(ConstantEnum.JobStatusEnum.STARTING.getValue());
// jobInfo.setStatus(ConstantEnum.JobStatusEnum.STARTING.getValue());
updateScheduleJobInfo.setJobInfo(jobInfo);
updateScheduleJobInfo.setFuture(null);
... ... @@ -170,9 +170,10 @@ public class ElectionService {
} else if (jobInfo.getOperator() == ConstantEnum.JobOperatorEnum.STOP.getValue()) {
// 当前任务是执行状态时,才可以停止,然后cancel之
if (jobInfo.getStatus() == ConstantEnum.JobStatusEnum.STARTING.getValue()) {
jobInfo.setStatus(ConstantEnum.JobStatusEnum.STOPPING.getValue());
updateScheduleJobInfo.setJobInfo(jobInfo);
updateScheduleJobInfo.getFuture().cancel(false);
jobInfo.setStatus(ConstantEnum.JobStatusEnum.STOPPING.getValue());
// 重新更新到zk上
String jobPath = CLIENT_NODES + "/" + jobInfo.getModule() + "/jobinfo/"
+ jobInfo.getJobName();
... ...
... ... @@ -94,6 +94,7 @@ public class JobScheduleServiceImpl implements JobScheduleService {
} else {
return;
}
job.getJobInfo().setStatus(ConstantEnum.JobStatusEnum.STARTING.getValue());
}
... ...
... ... @@ -173,7 +173,7 @@ public class JobServiceImpl implements JobService {
ScheduleJobInfo updateScheduleJobInfo = Container.CONTAINER.getJobsByModule(jobInfo.getModule())
.get(jobInfo.getJobName());
updateScheduleJobInfo.getJobInfo().setOperator(ConstantEnum.JobOperatorEnum.EXEC.getValue());
updateScheduleJobInfo.getJobInfo().setStatus(ConstantEnum.JobStatusEnum.STARTING.getValue());
// updateScheduleJobInfo.getJobInfo().setStatus(ConstantEnum.JobStatusEnum.STARTING.getValue());
client.setData().forPath(jobPath,
JSON.toJSONString(updateScheduleJobInfo.getJobInfo()).getBytes(Charset.forName("UTF-8")));
}
... ... @@ -197,13 +197,13 @@ public class JobServiceImpl implements JobService {
} else {
// 存在
String jobPath = JOBINFO_PATH + "/" + jobInfo.getModule() + "/jobinfo/" + jobInfo.getJobName();
jobInfo.setOperator(ConstantEnum.JobOperatorEnum.STOP.getValue());
// jobInfo.setOperator(ConstantEnum.JobOperatorEnum.STOP.getValue());
if (null != updateScheduleJobInfo.getFuture()) {
updateScheduleJobInfo.getFuture().cancel(false);
}
updateScheduleJobInfo.getJobInfo().setOperator(ConstantEnum.JobOperatorEnum.STOP.getValue());
updateScheduleJobInfo.getJobInfo().setStatus(ConstantEnum.JobStatusEnum.STOPPING.getValue());
// updateScheduleJobInfo.getJobInfo().setStatus(ConstantEnum.JobStatusEnum.STOPPING.getValue());
client.setData().forPath(jobPath,
JSON.toJSONString(updateScheduleJobInfo.getJobInfo()).getBytes(Charset.forName("UTF-8")));
}
... ...
... ... @@ -36,4 +36,16 @@
<bean id="scheduleJobServer"
class="com.yoho.jobs.server.scheduler.impl.RandomScheduleJobServer"></bean>
<bean id="influxdbSource" class="com.yoho.jobs.server.alarm.InfluxDbSource">
<constructor-arg name="influxdbUrl"
value="${alarm.influxdb.url:http://influxdb.yohoops.org:8086}" />
<constructor-arg name="influxdbName" value="${alarm.influxdb.user:root}" />
<constructor-arg name="password"
value="${alarm.influxdb.password:root}" />
</bean>
<bean id="sendAlarmTool" class="com.yoho.jobs.server.alarm.SendAlarmTool">
<property name="dbSource" ref="influxdbSource" />
</bean>
</beans>
\ No newline at end of file
... ...
package org.yoho.jobs.server.alarm;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.influxdb.dto.Point;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.influxdb.dto.QueryResult.Result;
import org.influxdb.dto.Point.Builder;
import org.junit.Before;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import com.yoho.jobs.server.alarm.InfluxDbSource;
public class InfluxdbOpt {
private ApplicationContext ctx;
@Before
public void setUp() {
}
@Test
public void test() {
String url = "http://192.168.102.22:8086";
String usr = "root";
String pwd = "root";
InfluxDbSource dbsource = new InfluxDbSource(url, usr, pwd);
QueryResult result = dbsource.getInfluxDb().query(new Query("select * from monitor_alarm", "yoho-monitor"));
System.out.println(result.getResults().size());
System.out.println(result.getResults().get(0));
Builder builder = Point.measurement("yoho_job_alarm").time(System.currentTimeMillis(), TimeUnit.NANOSECONDS);
Map<String,String> tags = new HashMap<String,String>();
tags.put("jobname", "jobname");
tags.put("module", "order");
builder.tag(tags);
Map<String,Object> fields = new HashMap<String,Object>();
fields.put("message", "呵呵");
fields.put("desc", "exec job faild");
builder.fields(fields);
dbsource.getInfluxDb().write("yoho-monitor", "default", builder.build());
QueryResult result1 = dbsource.getInfluxDb().query(new Query("select * from yoho_job_alarm", "yoho-monitor"));
// System.out.println(result1.getResults().size());
System.out.println(result1.getResults().get(0).getSeries().get(0).getValues().size());
for(Result re : result1.getResults()) {
System.out.println(re);
}
}
}
... ...
... ... @@ -2,3 +2,7 @@ web.context=jobs
zkAddress=172.16.6.85:2181
web.port=8080
limitrecord=5
alarm.influxdb.url=http://192.168.102.22:8086
alarm.influxdb.user=root
alarm.influxdb.password=root
\ No newline at end of file
... ...