Authored by zhengyouwei

任务配置管理

Showing 19 changed files with 869 additions and 2 deletions
package com.monitor.common.util;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
/**
* Created by zhengyouwei on 2016/8/12.
*/
@Component
public class ApplicationUtil implements ApplicationContextAware {
// Spring应用上下文环境
private static ApplicationContext applicationContext;
/**
* 实现ApplicationContextAware接口的回调方法,设置上下文环境
*/
public void setApplicationContext(ApplicationContext applicationContext)
throws BeansException {
ApplicationUtil.applicationContext = applicationContext;
}
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
/**
* 获取对象 这里重写了bean方法,起主要作用
*/
public static Object getBean(String beanId) throws BeansException {
return applicationContext.getBean(beanId);
}
/**
* 获取对象 这里重写了bean方法,起主要作用
*/
public static Object getBean(Class T) throws BeansException {
return applicationContext.getBean(T);
}
}
... ...
... ... @@ -13,7 +13,7 @@ public class InfluxDBQuery {
protected final Logger logger = LoggerFactory.getLogger(getClass());
protected QueryResult query(String influxDBName, String command, String database){
public QueryResult query(String influxDBName, String command, String database){
Query query = new Query(command, database);
try {
QueryResult queryResult = inluxDBSingle.getInfluxDBByName(influxDBName)
... ...
package com.monitor.influxdb.mapper.impl;
import com.monitor.influxdb.InfluxDBQuery;
import org.springframework.stereotype.Service;
/**
* Created by zhengyouwei on 2016/8/12.
*/
@Service("CommonQuery")
public class CommonQuery extends InfluxDBQuery{
}
... ...
package com.model;
import lombok.Data;
/**
* Created by zhengyouwei on 2016/8/10.
*/
@Data
public class TaskModel {
private int id;
/**
* 查询的列
*/
private String fields;
/**
* 查询使用的tag
*/
private String tags;
/**
* 查询的influxdbs
*/
private String influxdbs;
/**
* 查询的influxdb 数据库
*/
private String database;
/**
* sql语句
*/
private String influxSql;
/**
* 告警号码
*/
private String mobile;
/**
* 告警短信${}表示替换词
*/
private String sms;
/**
* alarm type
*/
private String type;
/**
*执行间隔
*/
private int interval;
/**
* 休息开始时间 H
*/
private int relaxStartTime;
/**
* 休息结束时间 H
*/
private int relaxEndTime;
/**
* 1 一直执行任务 2 有休息
*/
private int timeFlag;
/**
* 告警方式
* 1:单个influxdb 未出出现想要的数据
* 2:多个influxdb 合起来未出现想要的数据
* 3:单个influxdb 出现次数超过一定值
* 4:出现一个就告警,单条记录单独告警
*/
private int alarmType;
/**
* 数量告警
*/
private int alarmNumFlag;
/**
* 是否在运行中 1 : 运行中
*/
private int isRun;
/**
* 是否需要监控 1: 需要监控
*/
private int isOn;
/**
* 持续需要告警的次数
*/
private int alarmTimes;
/**
*上次执行时间
*/
private String lastTime;
/**
* 描述
*/
private String description;
}
... ...
package com.monitor.mysql.mapper;
import com.model.TaskModel;
import com.monitor.model.domain.PageBean;
import java.util.List;
/**
* Created by zhengyouwei on 2016/7/21.
*/
public interface TaskSheduleMapper {
List<TaskModel> selectAll();
TaskModel selectById(int id);
List<TaskModel> selectByPage(PageBean page);
int deleteById(int id);
int updateByid(TaskModel taskModel);
int insert(TaskModel taskModel);
int selectCount();
int updateLastTime(int id);
}
... ...
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.monitor.mysql.mapper.TaskSheduleMapper">
<resultMap id="BaseResultMap" type="com.model.TaskModel">
<id column="id" property="id" jdbcType="INTEGER"/>
<result column="alarm_interval" property="interval" jdbcType="INTEGER"/>
<result column="relax_start_time" property="relaxStartTime" jdbcType="INTEGER"/>
<result column="relax_end_time" property="relaxEndTime" jdbcType="INTEGER"/>
<result column="time_flag" property="timeFlag" jdbcType="INTEGER"/>
<result column="alarm_type" property="alarmType" jdbcType="INTEGER"/>
<result column="alarm_num_flag" property="alarmNumFlag" jdbcType="INTEGER"/>
<result column="is_run" property="isRun" jdbcType="INTEGER"/>
<result column="is_on" property="isOn" jdbcType="INTEGER"/>
<result column="continuous_alarm_times" property="alarmTimes" jdbcType="INTEGER"/>
<result column="fields" property="fields" jdbcType="VARCHAR"/>
<result column="tags" property="tags" jdbcType="VARCHAR"/>
<result column="influxdbs" property="influxdbs" jdbcType="VARCHAR"/>
<result column="influx_sql" property="influxSql" jdbcType="VARCHAR"/>
<result column="mobile" property="mobile" jdbcType="VARCHAR"/>
<result column="sms" property="sms" jdbcType="VARCHAR"/>
<result column="type" property="type" jdbcType="VARCHAR"/>
<result column="inf_database" property="database" jdbcType="VARCHAR"/>
<result column="description" property="description" jdbcType="VARCHAR"/>
<result column="last_time" property="lastTime"/>
</resultMap>
<select id="selectById" resultMap="BaseResultMap" parameterType="java.lang.Integer">
select
*
from task_shedule
where id = #{id,jdbcType=INTEGER}
</select>
<select id="selectAll" resultMap="BaseResultMap">
select
*
from task_shedule
</select>
<select id="selectByPage" resultMap="BaseResultMap">
select
*
from task_shedule
order by id
limit #{startIndex},#{pageSize}
</select>
<select id="selectCount" resultType="java.lang.Integer">
select
count(1)
from task_shedule
</select>
<delete id="deleteById" parameterType="java.lang.Integer">
delete from task_shedule
where id = #{id,jdbcType=INTEGER}
</delete>
<update id="updateByid" parameterType="com.model.TaskModel">
update task_shedule
set alarm_interval = #{interval,jdbcType=INTEGER},
relax_start_time = #{relaxStartTime,jdbcType=INTEGER},
time_flag = #{timeFlag,jdbcType=INTEGER},
relax_end_time = #{relaxEndTime,jdbcType=INTEGER},
alarm_type = #{alarmType,jdbcType=INTEGER},
alarm_num_flag = #{alarmNumFlag,jdbcType=INTEGER},
is_on = #{isOn,jdbcType=INTEGER},
fields = #{fields,jdbcType=VARCHAR},
tags = #{tags,jdbcType=VARCHAR},
influxdbs = #{influxdbs,jdbcType=VARCHAR},
influx_sql = #{influxSql,jdbcType=VARCHAR},
mobile = #{mobile,jdbcType=VARCHAR},
sms = #{sms,jdbcType=VARCHAR},
type = #{type,jdbcType=VARCHAR},
description = #{description,jdbcType=VARCHAR},
inf_database = #{database,jdbcType=VARCHAR}
where id = #{id,jdbcType=INTEGER}
</update>
<insert id="insert" parameterType="com.model.TaskModel">
insert into task_shedule
(alarm_interval,
relax_start_time,
time_flag,
relax_end_time,
alarm_num_flag,
alarm_type,
is_on,
fields,
tags,
influxdbs,
influx_sql,
mobile,
sms,
type ,
inf_database,
description)
values
(#{interval,jdbcType=INTEGER},
#{relaxStartTime,jdbcType=INTEGER},
#{timeFlag,jdbcType=INTEGER},
#{relaxEndTime,jdbcType=INTEGER},
#{alarmNumFlag,jdbcType=INTEGER},
#{alarmType,jdbcType=INTEGER},
#{isOn,jdbcType=INTEGER},
#{fields,jdbcType=VARCHAR},
#{tags,jdbcType=VARCHAR},
#{influxdbs,jdbcType=VARCHAR},
#{influxSql,jdbcType=VARCHAR},
#{mobile,jdbcType=VARCHAR},
#{sms,jdbcType=VARCHAR},
#{type,jdbcType=VARCHAR},
#{database,jdbcType=VARCHAR},
#{description,jdbcType=VARCHAR}
)
</insert>
<update id="updateLastTime" parameterType="java.lang.Integer">
update task_shedule
set last_time = now()
where id = #{id,jdbcType=INTEGER}
</update>
</mapper>
\ No newline at end of file
... ...
... ... @@ -20,6 +20,11 @@
</dependency>
<dependency>
<groupId>monitor-service</groupId>
<artifactId>monitor-service-mysql</artifactId>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
... ...
package com.monitor.other.task;
import com.model.TaskModel;
import java.util.List;
import java.util.Map;
/**
* Created by zhengyouwei on 2016/8/11.
*/
public interface AlarmTypeHandler {
void handle(TaskModel taskModel, Map<String, List<List<Map<String, Object>>>> result);
}
... ...
package com.monitor.other.task;
import com.model.TaskModel;
import com.monitor.common.util.ApplicationUtil;
import com.monitor.mysql.mapper.TaskSheduleMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.TaskScheduler;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
/**
* Created by zhengyouwei on 2016/8/17.
*/
public class NewTask implements Runnable{
Logger logger = LoggerFactory.getLogger(NewTask.class);
@Override
public void run() {
try {
TaskSheduleMapper taskSheduleMapper = (TaskSheduleMapper) ApplicationUtil.getBean(TaskSheduleMapper.class);
TaskScheduler taskScheduler = (TaskScheduler) ApplicationUtil.getBean(TaskScheduler.class);
List<TaskModel> list = taskSheduleMapper.selectAll();
for (TaskModel taskModel : list){
if(taskModel.getIsOn() != 1){
continue;
}
if (!TaskStroe.isOn(taskModel.getId())){
SingleTask singleTask = new SingleTask(taskModel.getId());
ScheduledFuture scheduledFuture = taskScheduler.scheduleAtFixedRate(singleTask, taskModel.getInterval() * 60 * 1000);
TaskStroe.addTask(taskModel.getId(),scheduledFuture);
}
}
}catch (Exception e){
logger.error("new Task ",e);
}
}
}
... ...
package com.monitor.other.task;
import org.springframework.scheduling.TaskScheduler;
import javax.annotation.PostConstruct;
/**
* Created by zhengyouwei on 2016/8/10.
*/
public class SheduleTask {
private TaskScheduler taskScheduler;
@PostConstruct
public void startTasks() {
taskScheduler.scheduleAtFixedRate(new NewTask(),1* 60 * 1000);//启动发现新任务线程
}
public void setTaskScheduler(TaskScheduler taskScheduler) {
this.taskScheduler = taskScheduler;
}
}
... ...
package com.monitor.other.task;
import com.model.TaskModel;
import com.monitor.common.util.ApplicationUtil;
import com.monitor.influxdb.InfluxDBQuery;
import com.monitor.influxdb.mapper.impl.CommonQuery;
import com.monitor.influxdb.util.QueryResultUtil;
import com.monitor.mysql.mapper.TaskSheduleMapper;
import org.apache.commons.lang.StringUtils;
import org.influxdb.dto.QueryResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import java.util.*;
/**
* Created by zhengyouwei on 2016/8/11.
*/
public class SingleTask implements Runnable {
Logger logger = LoggerFactory.getLogger(SingleTask.class);
private int id;
public SingleTask(int id) {
this.id = id;
}
@Override
public void run() {
try {
TaskSheduleMapper taskSheduleMapper = (TaskSheduleMapper)ApplicationUtil.getBean(TaskSheduleMapper.class);
TaskModel taskModel = taskSheduleMapper.selectById(id);
if (taskModel == null) {
return;
}
if (taskModel.getIsOn() != 1) {//监控关闭
TaskStroe.removeTask(id);//关闭任务
return;
}
if (taskModel.getTimeFlag() == 2) { //判断是否在执行时间
Calendar calendar = Calendar.getInstance();
int hour = calendar.get(Calendar.HOUR_OF_DAY);
if (hour >= taskModel.getRelaxStartTime() && hour < taskModel.getRelaxEndTime()) {//在休息时间
return;
}
}
//获取数据
Map<String, List<List<Map<String, Object>>>> result = getResult(taskModel);
taskSheduleMapper.updateLastTime(id);//更新时间
//按照告警类型进行处理
AlarmTypeHandler alarmTypeHandler = getHandler(taskModel.getAlarmType());
alarmTypeHandler.handle(taskModel,result);
} catch (Exception e) {
logger.error("excute task error,id = " + id, e);
}
}
public AlarmTypeHandler getHandler(int alarmType){
return (AlarmTypeHandler) ApplicationUtil.getApplicationContext().getBean("handlerType_" + alarmType);
}
/**
* 获取数据
* @param taskModel
* @return
*/
public Map<String, List<List<Map<String, Object>>>> getResult(TaskModel taskModel) {
String[] influxdbs = taskModel.getInfluxdbs().split(",");
String[] fields = taskModel.getFields().split(",");
String[] tags = null;
if(StringUtils.isNotBlank(taskModel.getTags())){
tags = taskModel.getTags().split(",");
}
Map<String, List<List<Map<String, Object>>>> resultMap = new HashMap<>();
for (String influxdb : influxdbs) {
QueryResult queryResult = ((CommonQuery)ApplicationUtil.getBean(CommonQuery.class)).query(influxdb, taskModel.getInfluxSql(), taskModel.getDatabase());
List<QueryResult.Series> seriesList = QueryResultUtil.getSeries(queryResult);
if (seriesList == null) {
continue;
}
List<List<Map<String, Object>>> seriesResultList = new ArrayList<>();
for (QueryResult.Series series : seriesList) {
List<Map<String, Object>> valueResultList = new ArrayList<>();
List<List<Object>> values = series.getValues();
for (List<Object> objects : values) {
Map<String, Object> map = new HashMap<>();
for (int i = 0; i < fields.length; i++) {
map.put(fields[i], objects.get(i + 1));
}
if(tags != null){
for (int i = 0; i < tags.length; i++) {
map.put(tags[i], series.getTags().get(tags[i]));
}
}
valueResultList.add(map);
}
seriesResultList.add(valueResultList);
}
resultMap.put(influxdb, seriesResultList);
}
return resultMap;
}
}
... ...
package com.monitor.other.task;
import com.model.TaskModel;
import com.monitor.model.domain.PageBean;
import com.monitor.model.page.PageRequest;
import com.monitor.model.page.PageResponse;
import com.monitor.model.response.BaseResponse;
import com.monitor.mysql.mapper.TaskSheduleMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.util.CollectionUtils;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import java.util.List;
/**
* Created by zhengyouwei on 2016/8/15.
*/
@Controller
@RequestMapping("taskConfigure")
public class TaskConfigureCtrl {
Logger log = LoggerFactory.getLogger(TaskConfigureCtrl.class);
@Autowired
private TaskSheduleMapper taskSheduleMapper;
@RequestMapping("/getTaskConfigure")
@ResponseBody
public BaseResponse<PageResponse<TaskModel>> getTaskConfigure(@RequestBody PageRequest request) {
try {
// 组装分页对象
PageBean page = PageBean.initPageInfo(request.getCurrentPage(),
request.getPageSize(), request);
// 先查询符合条件的总数量
int total = taskSheduleMapper.selectCount();
// 数量为0 直接返回
if (total == 0) {
// 返回初始page对象
return null;
}
// 获取列表
List<TaskModel> taskModels = taskSheduleMapper.selectByPage(page);
if (CollectionUtils.isEmpty(taskModels)) {
return null;
}
PageResponse<TaskModel> response = new PageResponse<TaskModel>();
response.setCurrentPage(request.getCurrentPage());
response.setPageSize(request.getPageSize());
response.setTotal(total);
response.setRows(taskModels);
return new BaseResponse<PageResponse<TaskModel>>(response);
} catch (Exception e) {
log.error("getTaskConfigure error", e);
return new BaseResponse<>(e.getMessage());
}
}
@RequestMapping("/saveTaskModel")
@ResponseBody
public BaseResponse<Integer> saveTaskModel(@RequestBody TaskModel taskModel) {
try {
int result = 0;
if (taskModel.getId() > 0) {
result = taskSheduleMapper.updateByid(taskModel);
} else {
result = taskSheduleMapper.insert(taskModel);
}
return new BaseResponse<Integer>(result);
} catch (Exception e) {
log.error("saveTaskModel error", e);
return new BaseResponse<>(e.getMessage());
}
}
@RequestMapping("/delTaskModel")
@ResponseBody
public BaseResponse<Integer> delTaskModel(int id) {
try {
int result = taskSheduleMapper.deleteById(id);
return new BaseResponse<Integer>(result);
} catch (Exception e) {
log.error("delTaskModel error", e);
return new BaseResponse<>(e.getMessage());
}
}
@RequestMapping("/getTaskModelById")
@ResponseBody
public BaseResponse<TaskModel> getTaskModelById(int id){
try {
TaskModel taskModel = taskSheduleMapper.selectById(id);
return new BaseResponse<TaskModel>(taskModel);
} catch (Exception e) {
log.error("getTaskModelById error", e);
return new BaseResponse<>(e.getMessage());
}
}
}
... ...
package com.monitor.other.task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
/**
* Created by zhengyouwei on 2016/8/17.
*/
public class TaskStroe {
private static Logger logger = LoggerFactory.getLogger(TaskStroe.class);
private static Map<Integer, ScheduledFuture> map = new ConcurrentHashMap<>();
public static void addTask(Integer id, ScheduledFuture scheduledFuture) {
try {
if (map.containsKey(id)) {
map.get(id).cancel(false);
}
map.put(id, scheduledFuture);
} catch (Exception e) {
logger.error("addTask failed,id=" + id, e);
}
}
public static void removeTask(Integer id) {
try {
if (map.containsKey(id)) {
map.get(id).cancel(false);
map.remove(id);
}
} catch (Exception e) {
logger.error("removeTask failed,id=" + id, e);
}
}
public static boolean isOn(Integer integer){
return map.containsKey(integer);
}
}
... ...
package com.monitor.other.task.handler;
import com.model.TaskModel;
import com.monitor.common.service.AlarmMsgService;
import com.monitor.other.task.AlarmTypeHandler;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Created by zhengyouwei on 2016/8/12.
*/
public abstract class BaseHandler implements AlarmTypeHandler {
@Autowired
private AlarmMsgService alarmMsgService;
protected void sendSms(TaskModel taskModel, Map<String, Object> map){
String newSms = replcaeSms(taskModel.getSms(), map);
alarmMsgService.sendSms(taskModel.getType(),newSms,taskModel.getMobile());
}
private String replcaeSms(String sms, Map<String, Object> map) {
if (map == null){
return sms;
}
Pattern p = Pattern.compile("\\$\\{[a-zA-Z_]+?\\}"); // 正则表达式
while (true) {
Matcher m = p.matcher(sms); // 操作的字符串
if (m.find()) {
String a = m.group(0);
a = a.substring(2, a.length() - 1);
sms = m.replaceFirst(String.valueOf(map.get(a)));
} else {
break;
}
}
return sms;
}
}
... ...
package com.monitor.other.task.handler;
import com.model.TaskModel;
import com.monitor.influxdb.InfluxDBQuery;
import com.monitor.other.task.AlarmTypeHandler;
import org.influxdb.dto.QueryResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Created by zhengyouwei on 2016/8/11.
* <p>
* 单个influxdb,只要未出现想要的数据,就告警
*/
@Service("handlerType_1")
public class HandlerType_1 extends BaseHandler {
@Override
public void handle(TaskModel taskModel, Map<String, List<List<Map<String, Object>>>> resultMap) {
String[] influxdbs = taskModel.getInfluxdbs().split(",");
for (String influx : influxdbs) {
if (!resultMap.containsKey(influx)) {//没有取到数据,告警
Map<String, Object> map = new HashMap<>();
map.put("influxdb", influx);
sendSms(taskModel, map);
}
}
}
}
... ...
package com.monitor.other.task.handler;
import com.model.TaskModel;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Created by zhengyouwei on 2016/8/11.
* <p>
* 2:多个influxdb 合起来未出现想要的数据
*/
@Service("handlerType_2")
public class HandlerType_2 extends BaseHandler {
@Override
public void handle(TaskModel taskModel, Map<String, List<List<Map<String, Object>>>> resultMap) {
String[] influxdbs = taskModel.getInfluxdbs().split(",");
int count = 0;
for (String influx : influxdbs) {
if (!resultMap.containsKey(influx)) {//没有取到数据,告警
continue;
}else {
count ++;
}
}
if (count == 0){
sendSms(taskModel, null);
}
}
}
... ...
package com.monitor.other.task.handler;
import com.model.TaskModel;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
/**
* Created by zhengyouwei on 2016/8/11.
* <p>
* 3:单个influxdb 出现次数超过一定值
*/
@Service("handlerType_3")
public class HandlerType_3 extends BaseHandler {
@Override
public void handle(TaskModel taskModel, Map<String, List<List<Map<String, Object>>>> resultMap) {
String[] influxdbs = taskModel.getInfluxdbs().split(",");
for (String influx : influxdbs) {
if (!resultMap.containsKey(influx)) {//没有取到数据
continue;
}else {
List<List<Map<String, Object>>> lists = resultMap.get(influx);
for(List<Map<String, Object>> mapList : lists){
for (Map<String, Object> map : mapList){
double count = (double) map.get("count");
if(count > taskModel.getAlarmNumFlag()){
map.put("influxdb",influx);
sendSms(taskModel, map);
}
}
}
}
}
}
}
... ...
package com.monitor.other.task.handler;
import com.model.TaskModel;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
/**
* Created by zhengyouwei on 2016/8/11.
* <p>
* 出现一个就告警,单条记录单独告警
*/
@Service("handlerType_4")
public class HandlerType_4 extends BaseHandler {
@Override
public void handle(TaskModel taskModel, Map<String, List<List<Map<String, Object>>>> resultMap) {
String[] influxdbs = taskModel.getInfluxdbs().split(",");
for (String influx : influxdbs) {
if (!resultMap.containsKey(influx)) {//没有取到数据
continue;
}else {
List<List<Map<String, Object>>> lists = resultMap.get(influx);
for(List<Map<String, Object>> mapList : lists){
for (Map<String, Object> map : mapList){
map.put("influxdb",influx);
sendSms(taskModel,map);
}
}
}
}
}
}
... ...
... ... @@ -8,4 +8,7 @@
<constructor-arg name = "url" value="${dns.config.url}"/>
<constructor-arg name = "dnsApiUrl" value="${dns.config.api.url}"/>
</bean>
</beans>
<bean id="sheduletask" class="com.monitor.other.task.SheduleTask">
<property name="taskScheduler" ref="scheduler"/>
</bean></beans>
... ...