Authored by qinchao

mysql监控

package com.monitor.influxdb.mapper;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Created by yoho on 2016/6/21.
*/
public interface MetricMapper {
Map<String, Map<String, String>> selectMysqlInfos(String influxDBName, List<String> ips);
Map<String, Map<String, String>> selectMysqlInfos(String influxDBName, Set<String> ipPorts);
}
... ...
package com.monitor.influxdb.mapper;
import java.util.Map;
import java.util.Set;
public interface MysqlMonitorMapper {
void insert(String host,String port,String hostAndPort,String metrics, int value);
Map<String, Map<String, String>> selectMysqlInfos(Set<String> ipPortList);
}
... ...
... ... @@ -10,10 +10,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
/**
* Created by craig.qin on 2018/2/12.
... ... @@ -26,18 +23,18 @@ public class MetricMapperImpl implements MetricMapper {
private InfluxDataReporter influxDataReporter;
@Override
public Map<String, Map<String, String>> selectMysqlInfos(String influxDBName ,List<String> ipList){
if(ipList == null || ipList.size() == 0){
public Map<String, Map<String, String>> selectMysqlInfos(String influxDBName ,Set<String> ipPortList){
if(ipPortList == null || ipPortList.size() == 0){
return null;
}
String ips = "";
for (String ip : ipList) {
ips = ips.concat("host=\'").concat(ip).concat("\'").concat(" or ");
for (String ipPort : ipPortList) {
ips = ips.concat("host=\'").concat(ipPort.split(":")[0]).concat("\'").concat(" or ");
}
ips = StringUtils.stripEnd(ips, " or ");
//取出每个ip最新的一条数据
String query_cmd = "select value from metric_mysql ";
query_cmd = query_cmd.concat("where (").concat(ips).concat(")").concat("and time > now() - 1m group by host,metric order by time desc limit 1 ");
query_cmd = query_cmd.concat("where (").concat(ips).concat(")").concat("and time > now() - 1m group by host,port,metric order by time desc limit 1 ");
Map<String,Map<String,String>> resultMap = new HashMap<>();
... ... @@ -56,20 +53,22 @@ public class MetricMapperImpl implements MetricMapper {
if(tags == null)
continue;
String host = tags.get("host");//ip
String metric = tags.get("metric");//ip
String port = tags.get("port");
String metric = tags.get("metric");
if(s.getValues() == null ) continue;
int idx=s.getColumns().indexOf("value");
String key=host+":"+port;
String metricValue= String.valueOf(s.getValues().get(0).get(idx));
if(!resultMap.containsKey(host)){
if(!resultMap.containsKey(key)){
Map<String,String> metricMap= new LinkedHashMap();
resultMap.put(host,metricMap);
resultMap.put(key,metricMap);
}
resultMap.get(host).put(metric,metricValue);
resultMap.get(key).put(metric,metricValue);
}
} catch (Exception e) {
log.error("batch influx getMaliousIpInfo failed ", e);
log.error("MetricMapperImpl failed ", e);
}
}
return resultMap;
... ...
package com.monitor.influxdb.mapper.impl;
import com.monitor.influxdb.InfluxDataReporter;
import com.monitor.influxdb.contants.InfluxDBContants;
import com.monitor.influxdb.mapper.MysqlMonitorMapper;
import org.apache.commons.lang.StringUtils;
import org.influxdb.dto.Point;
import org.influxdb.dto.QueryResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.*;
@Component
public class MysqlMonitorMapperImpl implements MysqlMonitorMapper {
@Autowired
private InfluxDataReporter influxDataReporter;
@Override
public void insert(String host,String port,String hostAndPort, String metrics, int value) {
Point point = Point.measurement("mysql_monitor")
.tag("host", host)
.tag("port", port)
.tag("hostAndPort", hostAndPort)
.tag("metrics", metrics)
.addField("value", value).build();
influxDataReporter.report(InfluxDBContants.AWS,InfluxDBContants.MONITOR_SYSTEM,point);
}
@Override
public Map<String, Map<String, String>> selectMysqlInfos(Set<String> ipPortList){
if(ipPortList == null || ipPortList.size() == 0){
return null;
}
String ipPorts = "";
for (String ipPort : ipPortList) {
ipPorts = ipPorts.concat("hostAndPort=\'").concat(ipPort).concat("\'").concat(" or ");
}
ipPorts = StringUtils.stripEnd(ipPorts, " or ");
//取出每个ip最新的一条数据
String query_cmd = "select value from mysql_monitor ";
query_cmd = query_cmd.concat("where (").concat(ipPorts).concat(")").concat("and time > now() - 1000m group by hostAndPort,metrics order by time desc limit 1 ");
Map<String,Map<String,String>> resultMap = new HashMap<>();
QueryResult result = influxDataReporter.queryResult(InfluxDBContants.AWS, query_cmd, InfluxDBContants.MONITOR_SYSTEM);
for (QueryResult.Result queryResult : result.getResults()) {
if (queryResult.getSeries() == null)
continue;
try {
List<QueryResult.Series> series = queryResult.getSeries();
if(series == null || series.size() == 0)
return null;
for(QueryResult.Series s : series){
Map<String,String> tags = s.getTags();
if(tags == null)
continue;
String hostAndPort = tags.get("hostAndPort");//ip
String metrics = tags.get("metrics");
if(s.getValues() == null ) continue;
int idx=s.getColumns().indexOf("value");
String key=hostAndPort;
String metricValue= String.valueOf(s.getValues().get(0).get(idx));
if(!resultMap.containsKey(key)){
Map<String,String> metricMap= new LinkedHashMap();
resultMap.put(key,metricMap);
}
resultMap.get(key).put(metrics,metricValue);
}
} catch (Exception e) {
}
}
return resultMap;
}
}
... ...
... ... @@ -36,35 +36,11 @@ public class MysqlMonitorCtrl {
BaseResponse response = new BaseResponse();
List<MysqlMobjectRep> mysqlMobjectReps= mobjectService.getMysqlMobject();
mysqlMonitorService.setMySQLMetric(mysqlMobjectReps);
//setMySQLConnectStatus(mysqlMobjectReps);
response.setData(mysqlMobjectReps);
return response;
}
//探测mysql的连接状态
public void setMySQLConnectStatus(List<MysqlMobjectRep> mysqlMobjectReps){
ExecutorService serversExecutor = Executors.newFixedThreadPool(mysqlMobjectReps.size()>=30?30:mysqlMobjectReps.size());
for (MysqlMobjectRep mysqlMobject : mysqlMobjectReps) {
serversExecutor.submit(new Runnable() {
@Override
public void run() {
String result = mysqlMonitorService.detectMysqlStatus(mysqlMobject.getIpAndPort());
mysqlMobject.setConnectStatus(result);
}
});
}
serversExecutor.shutdown();
try {
serversExecutor.awaitTermination(50, TimeUnit.SECONDS);
} catch (InterruptedException e) {
if (!serversExecutor.isShutdown()) {
serversExecutor.shutdownNow();
}
}
}
/**
* 获取所有mysql host
*
... ...
... ... @@ -2,6 +2,7 @@ package com.monitor.other.mysqlMonitor.service;
import com.monitor.influxdb.contants.InfluxDBContants;
import com.monitor.influxdb.mapper.MetricMapper;
import com.monitor.influxdb.mapper.MysqlMonitorMapper;
import com.monitor.other.sqlOperate.constant.SqlQueryConstant;
import com.response.MysqlMobjectRep;
import org.apache.commons.lang.StringUtils;
... ... @@ -21,51 +22,72 @@ public class MysqlMonitorServiceImpl {
@Autowired
private MetricMapper metricMapper;
@Autowired
private MysqlMonitorMapper mysqlMonitorMapper;
Logger log = LoggerFactory.getLogger(MysqlMonitorServiceImpl.class);
private final String detectSQL="select 1";
private final String connectMetric="connectstatus";
//设置超时时间,autoReconnect超时不再重试
private final String param=SqlQueryConstant.DB_PARAM+"&autoReconnect=false&connectTimeout=3000&socketTimeout=6000";
//private final String detectSQL="show databases";
public void setMySQLMetric(List<MysqlMobjectRep> mysqlMobjectReps){
Map<String,MysqlMobjectRep> aws_MysqlMobjectRepMap=new HashMap();
List<String> aws_ips=new ArrayList<String>();
Map<String,MysqlMobjectRep> qcloud_MysqlMobjectRepMap=new HashMap<>();
List<String> qcloud_ips=new ArrayList<String>();
Map<String,MysqlMobjectRep> all_ip_port_MysqlMobjectRepMap=new HashMap();
Map<String,MysqlMobjectRep> aws_ip_port_MysqlMobjectRepMap=new HashMap();
Map<String,MysqlMobjectRep> qcloud_ip_port_MysqlMobjectRepMap=new HashMap<>();
for (MysqlMobjectRep mysqlMobject : mysqlMobjectReps) {
if("aws".equalsIgnoreCase(mysqlMobject.getCloudType())){
aws_MysqlMobjectRepMap.put(mysqlMobject.getMoHostIp(),mysqlMobject);
aws_ips.add(mysqlMobject.getMoHostIp());
aws_ip_port_MysqlMobjectRepMap.put(mysqlMobject.getIpAndPort(),mysqlMobject);
}else{
qcloud_MysqlMobjectRepMap.put(mysqlMobject.getMoHostIp(),mysqlMobject);
qcloud_ips.add(mysqlMobject.getMoHostIp());
qcloud_ip_port_MysqlMobjectRepMap.put(mysqlMobject.getIpAndPort(),mysqlMobject);
}
all_ip_port_MysqlMobjectRepMap.put(mysqlMobject.getIpAndPort(),mysqlMobject);
}
if(aws_ips.size()>0){
Map<String, Map<String, String>> maps=metricMapper.selectMysqlInfos(InfluxDBContants.METRIC_AWS,aws_ips);
setMetricInfo(maps,aws_MysqlMobjectRepMap);
if(aws_ip_port_MysqlMobjectRepMap.size()>0){
Map<String, Map<String, String>> maps=metricMapper.selectMysqlInfos(InfluxDBContants.METRIC_AWS,aws_ip_port_MysqlMobjectRepMap.keySet());
setMetricInfo(maps,aws_ip_port_MysqlMobjectRepMap);
}
if(qcloud_ips.size()>0){
Map<String, Map<String, String>> maps=metricMapper.selectMysqlInfos(InfluxDBContants.METRIC_QCLOUD,qcloud_ips);
setMetricInfo(maps,qcloud_MysqlMobjectRepMap);
if(qcloud_ip_port_MysqlMobjectRepMap.size()>0){
Map<String, Map<String, String>> maps=metricMapper.selectMysqlInfos(InfluxDBContants.METRIC_QCLOUD,qcloud_ip_port_MysqlMobjectRepMap.keySet());
setMetricInfo(maps,qcloud_ip_port_MysqlMobjectRepMap);
}
//连接状态,从运维自己的influx查询
if(all_ip_port_MysqlMobjectRepMap.size()>0){
Map<String, Map<String, String>> maps=mysqlMonitorMapper.selectMysqlInfos(all_ip_port_MysqlMobjectRepMap.keySet());
setConnectStatus(maps,all_ip_port_MysqlMobjectRepMap);
}
}
private void setMetricInfo(Map<String, Map<String, String>> maps,Map<String,MysqlMobjectRep> mysqlMobjectRepMap){
if(maps!=null&&maps.size()>0){
for(String ip:maps.keySet()){
MysqlMobjectRep obj = mysqlMobjectRepMap.get(ip);
for(String ipAndPort:maps.keySet()){
MysqlMobjectRep obj = mysqlMobjectRepMap.get(ipAndPort);
if(obj!=null){
obj.getMysqlMetric().changeFromInfluxQueryResultMap(maps.get(ip));
String status=obj.getMysqlMetric().getStatus();
obj.getMysqlMetric().changeFromInfluxQueryResultMap(maps.get(ipAndPort));
}
}
}
}
//运维定时任务检测数据库状态
private void setConnectStatus(Map<String, Map<String, String>> maps,Map<String,MysqlMobjectRep> mysqlMobjectRepMap){
if(maps!=null&&maps.size()>0){
for(String ipAndPort:maps.keySet()){
Map<String, String> metrics=maps.get(ipAndPort);
MysqlMobjectRep obj = mysqlMobjectRepMap.get(ipAndPort);
if(obj!=null){
String connect=metrics.get(connectMetric);
int statusInt=0;
if(StringUtils.isNotBlank(status)){
if(connect!=null&&StringUtils.isNotBlank(connect)){
try{
Double d=Double.parseDouble(status);
Double d=Double.parseDouble(connect);
statusInt =d.intValue();
//System.out.println(statusInt);
}catch (Exception e){
... ... @@ -75,7 +97,7 @@ public class MysqlMonitorServiceImpl {
if(1==statusInt){
obj.setConnectStatus("OK");
}else{
obj.setConnectStatus("——");
obj.setConnectStatus("Fail");
}
}
}
... ... @@ -83,8 +105,8 @@ public class MysqlMonitorServiceImpl {
}
public String detectMysqlStatus(String ipAndPort){
String result="FAIL";
public int detectMysqlStatus(String ipAndPort){
int result=0;
try {
Class.forName(SqlQueryConstant.DRIVE_CLASS_MYSQL);
} catch (ClassNotFoundException e) {
... ... @@ -113,8 +135,7 @@ public class MysqlMonitorServiceImpl {
dbNameList.add(String.valueOf(value));
}
}
result="OK";
return result;
result=1;
} catch (SQLException e) {
log.error("MysqlMonitorServiceImpl detectMysqlStatus 查询数据库失败",e);
} catch (Exception e) {
... ... @@ -141,6 +162,9 @@ public class MysqlMonitorServiceImpl {
} catch (SQLException e) {
log.error("MysqlMonitorServiceImpl detectMysqlStatus finally 关闭connect异常", e);
}
//把结果写到influx中
String[] array_ipAndPort=ipAndPort.split(":");
mysqlMonitorMapper.insert(array_ipAndPort[0],array_ipAndPort[1],ipAndPort,connectMetric,result);
}
return result;
}
... ...
package com.monitor.other.mysqlMonitor.task;
import com.monitor.cmdb.service.IMObjectInfoService;
import com.monitor.other.dns.constant.InterVar;
import com.monitor.other.dns.service.IDNSMonitorService;
import com.monitor.other.mysqlMonitor.service.MysqlMonitorServiceImpl;
import com.response.MysqlMobjectRep;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* mysql的状态探测,探测数据,并保存的influxdb中
*/
@Component
public class MysqlMonitorTask {
public final String lockStr="MysqlMonitorTaskLock";
@Autowired
IMObjectInfoService mobjectService;
@Autowired
MysqlMonitorServiceImpl mysqlMonitorService;
@Scheduled(cron = "${cron_task_mysql_monit}")
public void monitor() {
synchronized (lockStr.intern()) {
List<MysqlMobjectRep> mysqlMobjectReps= mobjectService.getMysqlMobject();
writeMySQLConnectStatus(mysqlMobjectReps);
}
}
//探测mysql的连接状态
public void writeMySQLConnectStatus(List<MysqlMobjectRep> mysqlMobjectReps){
for (MysqlMobjectRep mysqlMobject : mysqlMobjectReps) {
new Thread(new Runnable(){
public void run(){
mysqlMonitorService.detectMysqlStatus(mysqlMobject.getIpAndPort());
}
}).start();
}
}
}
... ...
... ... @@ -12,6 +12,8 @@ cron_task_rabbit_scan=0 0/1 * * * ?
cron_task_dns_monit=0 1/5 * * * ?
cron_task_mysql_monit=0/30 * * * * ?
cron_task_clear_docker_image=0 0 7 * * ?
flag_task_clear_docker_image=1
... ...
... ... @@ -19,6 +19,8 @@ cron_task_rabbit_scan=0 0 0 * * ?
#cron_task_dns_monit=0 0/1 * * * ?
cron_task_dns_monit=0 0 0 * * ?
cron_task_mysql_monit=0 0 0 * * ?
cron_task_clear_docker_image=0 0 7 * * ?
flag_task_clear_docker_image=0
... ...