Authored by qinchao

influx 写入 升级版本2.7

Showing 19 changed files with 191 additions and 225 deletions
... ... @@ -2,7 +2,6 @@ package com.monitor.influxdb;
import org.apache.commons.lang.StringUtils;
import org.influxdb.InfluxDB;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
... ...
package com.monitor.influxdb.mapper.impl;
import com.monitor.influxdb.InfluxDBSingle;
import com.monitor.influxdb.InfluxDataReporter;
import com.monitor.influxdb.contants.InfluxDBContants;
import com.monitor.influxdb.mapper.CommonAlarmDataMapper;
import org.apache.commons.lang.StringUtils;
... ... @@ -19,18 +19,18 @@ public class CommonAlarmDataMapperImpl implements CommonAlarmDataMapper {
Logger logger = LoggerFactory.getLogger("commonAlarmDataLogger");
@Autowired
private InfluxDBSingle influxDBSingle;
private InfluxDataReporter influxDataReporter;
@Override
public void write(Point point, String duration) {
public void write(Point point, String retentionPolicy) {
if(point == null){
return;
}
if(StringUtils.isBlank(duration)){
duration = "default";
if(StringUtils.isBlank(retentionPolicy)){
retentionPolicy = "default";
}
try{
influxDBSingle.getInfluxDBByName(InfluxDBContants.ALARM).getInfluxDB().write(InfluxDBContants.SOME_OHTER_DATA, duration, point);
influxDataReporter.report(InfluxDBContants.ALARM,InfluxDBContants.SOME_OHTER_DATA,point,retentionPolicy);
}catch(Exception e){
logger.info(" - CommonAlarmDataMapperImpl - write - error", e);
}
... ...
package com.monitor.influxdb.mapper.impl;
import com.monitor.influxdb.InfluxDataReporter;
import com.monitor.influxdb.InfluxDBSingle;
import com.monitor.influxdb.contants.InfluxDBContants;
import com.monitor.influxdb.mapper.IDNSMonitorMapper;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.influxdb.dto.QueryResult;
import org.springframework.beans.factory.annotation.Autowired;
... ... @@ -17,10 +15,7 @@ import org.springframework.stereotype.Service;
public class DNSMonitorMapperImpl implements IDNSMonitorMapper{
@Autowired
private InfluxDBSingle influxDBSingle;
@Autowired
private InfluxDataReporter commonQuery;
private InfluxDataReporter influxDataReporter;
/**
* host dns info insert
... ... @@ -30,10 +25,6 @@ public class DNSMonitorMapperImpl implements IDNSMonitorMapper{
*/
@Override
public void insert(String host, String uid, String status, String info) {
BatchPoints batchPoints = BatchPoints
.database(InfluxDBContants.MONITOR_SYSTEM)
.retentionPolicy("default").build();
Point point = Point.measurement("dns_monitor")
.tag("host", host)
.tag("uid", uid)
... ... @@ -41,9 +32,7 @@ public class DNSMonitorMapperImpl implements IDNSMonitorMapper{
.tag("status", status)
.addField("info", info).build();
batchPoints.point(point);
influxDBSingle.getInfluxDBByName(InfluxDBContants.AWS).getInfluxDB()
.write(batchPoints);
influxDataReporter.report(InfluxDBContants.AWS,InfluxDBContants.MONITOR_SYSTEM,point);
}
/**
... ... @@ -56,9 +45,9 @@ public class DNSMonitorMapperImpl implements IDNSMonitorMapper{
*/
@Override
public void insert(String host,String uid, String ip, String status, String failedType, String info) {
BatchPoints batchPoints = BatchPoints
/*BatchPoints batchPoints = BatchPoints
.database(InfluxDBContants.MONITOR_SYSTEM)
.retentionPolicy("default").build();
.retentionPolicy("default").build();*/
Point point;
... ... @@ -81,9 +70,9 @@ public class DNSMonitorMapperImpl implements IDNSMonitorMapper{
.addField("info", info).build();
}
batchPoints.point(point);
influxDBSingle.getInfluxDBByName(InfluxDBContants.AWS).getInfluxDB()
.write(batchPoints);
//batchPoints.point(point);
influxDataReporter.report(InfluxDBContants.AWS,InfluxDBContants.MONITOR_SYSTEM,point);
}
/**
... ... @@ -100,7 +89,7 @@ public class DNSMonitorMapperImpl implements IDNSMonitorMapper{
String command = "SELECT time, host, uid, type, ip, status, failedType, info FROM dns_monitor"
+ " WHERE host = '" + host + "' AND uid = '" + uid + "' AND status = '" + status + "'"
+ " ORDER BY time DESC LIMIT " + limitCount + " OFFSET " + offsetCount;
return commonQuery.queryResult(InfluxDBContants.AWS, command, InfluxDBContants.MONITOR_SYSTEM);
return influxDataReporter.queryResult(InfluxDBContants.AWS, command, InfluxDBContants.MONITOR_SYSTEM);
}
/**
... ... @@ -116,7 +105,7 @@ public class DNSMonitorMapperImpl implements IDNSMonitorMapper{
String command = "SELECT time, host, uid, type, ip, status, failedType, info FROM dns_monitor"
+ " WHERE host = '" + host + "' AND uid = '" + uid + "'"
+ " ORDER BY time DESC LIMIT " + limitCount + " OFFSET " + offsetCount;
return commonQuery.queryResult(InfluxDBContants.AWS, command, InfluxDBContants.MONITOR_SYSTEM);
return influxDataReporter.queryResult(InfluxDBContants.AWS, command, InfluxDBContants.MONITOR_SYSTEM);
}
/**
... ... @@ -130,7 +119,7 @@ public class DNSMonitorMapperImpl implements IDNSMonitorMapper{
public QueryResult selectCount(String host, String uid, String status) {
String command = "SELECT COUNT(info) FROM dns_monitor WHERE host = '" + host + "'"
+ " AND uid = '" + uid + "' AND status = '" + status + "'";
return commonQuery.queryResult(InfluxDBContants.AWS, command, InfluxDBContants.MONITOR_SYSTEM);
return influxDataReporter.queryResult(InfluxDBContants.AWS, command, InfluxDBContants.MONITOR_SYSTEM);
}
/**
... ... @@ -143,6 +132,6 @@ public class DNSMonitorMapperImpl implements IDNSMonitorMapper{
public QueryResult selectCount(String host, String uid) {
String command = "SELECT COUNT(info) FROM dns_monitor WHERE host = '" + host + "'"
+ " AND uid = '" + uid + "'";
return commonQuery.queryResult(InfluxDBContants.AWS, command, InfluxDBContants.MONITOR_SYSTEM);
return influxDataReporter.queryResult(InfluxDBContants.AWS, command, InfluxDBContants.MONITOR_SYSTEM);
}
}
... ...
... ... @@ -14,16 +14,16 @@ import org.springframework.stereotype.Service;
public class DeviceActiveMapperImpl implements DeviceActiveMapper {
@Autowired
private InfluxDataReporter commonQuery;
private InfluxDataReporter influxDataReporter;
@Override
public QueryResult selectCountBefore(int minute) {
String command = "select count(muid) from yoho_device_active where time > now() - " + minute + "m group by interfaceType";
return commonQuery.queryResult(InfluxDBContants.AWS, command, InfluxDBContants.YOMO_MONITOR);
return influxDataReporter.queryResult(InfluxDBContants.AWS, command, InfluxDBContants.YOMO_MONITOR);
}
@Override
public QueryResult selectCountYestoday(int minute) {
String command = "select count(muid) from yoho_device_active where time < now() - 1d and time > now() - (1d + " + minute + "m) group by interfaceType";
return commonQuery.queryResult(InfluxDBContants.AWS, command, InfluxDBContants.YOMO_MONITOR); }
return influxDataReporter.queryResult(InfluxDBContants.AWS, command, InfluxDBContants.YOMO_MONITOR); }
}
... ...
... ... @@ -12,7 +12,7 @@ import org.springframework.stereotype.Component;
public class GatewayAccessEventsMapperImpl implements GatewayAccessEventsMapper {
@Autowired
private InfluxDataReporter commonQuery;
private InfluxDataReporter influxDataReporter;
/**
* 查询type的tag-value
... ... @@ -23,7 +23,7 @@ public class GatewayAccessEventsMapperImpl implements GatewayAccessEventsMapper
@Override
public QueryResult selectTagValuesType(String influxDBName) {
String command = "SHOW TAG VALUES FROM gateway_access_events WITH KEY = event";
return commonQuery.queryResult(influxDBName, command, InfluxDBContants.YOHO_EVENT_SAMPLE);
return influxDataReporter.queryResult(influxDBName, command, InfluxDBContants.YOHO_EVENT_SAMPLE);
}
}
... ...
package com.monitor.influxdb.mapper.impl;
import com.alibaba.fastjson.JSONObject;
import com.monitor.influxdb.InfluxDataReporter;
import com.monitor.influxdb.contants.InfluxDBContants;
import com.monitor.influxdb.mapper.IJavaApiStaticsMapper;
import com.monitor.influxdb.util.DateFormatUtil;
import com.monitor.influxdb.util.QueryResultUtil;
import com.monitor.influxdb.InfluxDBSingle;
import com.monitor.influxdb.mapper.IJavaApiStaticsMapper;
import com.monitor.model.domain.JavaApiStaticsModel;
import com.monitor.model.request.JavaApiHisReq;
import com.monitor.model.request.JavaApiStatusReq;
import org.influxdb.dto.Point;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
... ... @@ -29,8 +28,9 @@ import java.util.concurrent.TimeUnit;
public class JavaApiStaticsMapper implements IJavaApiStaticsMapper {
Logger log = LoggerFactory.getLogger(JavaApiStaticsMapper.class);
@Autowired
private InfluxDBSingle influxDBSingle;
private InfluxDataReporter influxDataReporter;
Random random = new Random();
... ... @@ -62,11 +62,10 @@ public class JavaApiStaticsMapper implements IJavaApiStaticsMapper {
Point point = pointBuilder.build();
// batchPoints.point(point);
influxDataReporter.report(InfluxDBContants.ALARM,InfluxDBContants.APP_ALARM,point);
influxDBSingle.getInfluxDBByName(InfluxDBContants.ALARM).getInfluxDB()
.write(InfluxDBContants.APP_ALARM, "default", point);
/*influxDBSingle.getInfluxDBByName(InfluxDBContants.ALARM).getInfluxDB()
.write(InfluxDBContants.APP_ALARM, "default", point);*/
}
@Override
... ... @@ -77,13 +76,16 @@ public class JavaApiStaticsMapper implements IJavaApiStaticsMapper {
sql += " and api_id='" + api_id + "'";
sql += " and mobj_id='" + mobj_id + "'";
sql += " order by time desc limit 1";
Query query = new Query(sql, InfluxDBContants.APP_ALARM);
QueryResult result = null;
try {
result = influxDBSingle.getInfluxDBByName(InfluxDBContants.ALARM).getInfluxDB().query(query);
//Query query = new Query(sql, InfluxDBContants.APP_ALARM);
QueryResult result = influxDataReporter.queryResult(InfluxDBContants.ALARM,sql,InfluxDBContants.APP_ALARM);
/*try {
//result = influxDBSingle.getInfluxDBByName(InfluxDBContants.ALARM).getInfluxDB().query(query);
} catch (Exception e) {
log.warn("influxdb query error ", e);
return null;
}*/
if(result==null){
return null;
}
JavaApiStaticsModel javaApiStaticsModel = new JavaApiStaticsModel();
javaApiStaticsModel.setServiceId(api_id);
... ... @@ -130,23 +132,6 @@ public class JavaApiStaticsMapper implements IJavaApiStaticsMapper {
*/
@Override
public List<JavaApiStaticsModel> selectlatestJavaApiStaticsList(String influxDBName, List<JavaApiStatusReq> paramList) {
List<JavaApiStaticsModel> javaApiStaticsModels = new ArrayList<JavaApiStaticsModel>();
// List<JavaApiStatusReq> splitReqList = new ArrayList<JavaApiStatusReq>();
// for (JavaApiStatusReq param : paramList) {
// splitReqList.add(param);
// if (splitReqList.size() >= 15) {
// javaApiStaticsModels.addAll(selectlatestJavaApiStaticsList_batch_split(influxDBName, splitReqList));
// splitReqList.clear();
// }
// }
// if (splitReqList.size() > 0) {
// javaApiStaticsModels.addAll(selectlatestJavaApiStaticsList_batch_split(influxDBName, splitReqList));
// }
// return javaApiStaticsModels;
return selectlatestJavaApiStaticsList_batch_split(influxDBName, paramList);
}
... ... @@ -164,14 +149,19 @@ public class JavaApiStaticsMapper implements IJavaApiStaticsMapper {
sb.append(" and mobj_id='" + mobj_id + "'");
sb.append(" order by time desc limit 1;");
}
log.info(sb.toString());
Query query = new Query(sb.toString(), InfluxDBContants.APP_ALARM);
QueryResult result = influxDataReporter.queryResult(InfluxDBContants.ALARM,sb.toString(),InfluxDBContants.APP_ALARM);
/*Query query = new Query(sb.toString(), InfluxDBContants.APP_ALARM);
QueryResult result = null;
try {
result = influxDBSingle.getInfluxDBByName(InfluxDBContants.ALARM).getInfluxDB().query(query);
} catch (Exception e) {
log.warn("influx db select error ,", e);
return javaApiStaticsModels;
}*/
if(result==null){
return javaApiStaticsModels;
}
for (QueryResult.Result queryResult : result.getResults()) {
... ... @@ -246,16 +236,22 @@ public class JavaApiStaticsMapper implements IJavaApiStaticsMapper {
}
sql += " order by time desc LIMIT " + limitCount + " OFFSET " + offsetCount;
log.info("sql:{}", sql);
Query query = new Query(sql, InfluxDBContants.APP_ALARM);
QueryResult result = null;
List<JavaApiStaticsModel> list = new ArrayList<JavaApiStaticsModel>();
QueryResult result = influxDataReporter.queryResult(InfluxDBContants.ALARM,sql,InfluxDBContants.APP_ALARM);
if(result==null){
return list;
}
/*Query query = new Query(sql, InfluxDBContants.APP_ALARM);
QueryResult result = null;
try {
result = influxDBSingle.getInfluxDBByName(InfluxDBContants.ALARM).getInfluxDB().query(query);
} catch (Exception e) {
log.warn("influxdb query error ", e);
return list;
}
}*/
QueryResult.Result rel = result.getResults().get(0);
... ... @@ -340,15 +336,19 @@ public class JavaApiStaticsMapper implements IJavaApiStaticsMapper {
sql += " and is_exception= 'false' ";
}
QueryResult result = influxDataReporter.queryResult(InfluxDBContants.ALARM,sql,InfluxDBContants.APP_ALARM);
if(result==null){
return 0;
}
Query query = new Query(sql, InfluxDBContants.APP_ALARM);
/* Query query = new Query(sql, InfluxDBContants.APP_ALARM);
QueryResult result = null;
try {
result = influxDBSingle.getInfluxDBByName(InfluxDBContants.ALARM).getInfluxDB().query(query);
} catch (Exception e) {
log.warn("influxdb query error ", e);
return 0;
}
}*/
return QueryResultUtil.getCount(result);
}
... ...
package com.monitor.influxdb.mapper.impl;
import com.monitor.influxdb.InfluxDataReporter;
import com.monitor.influxdb.InfluxDBSingle;
import com.monitor.influxdb.contants.InfluxDBContants;
import com.monitor.influxdb.mapper.JavaProjectMapper;
import com.monitor.influxdb.util.QueryResultUtil;
import com.monitor.model.domain.JavaProjectStatus;
import org.influxdb.dto.Point;
import org.influxdb.dto.QueryResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
... ... @@ -21,13 +18,10 @@ import java.util.concurrent.TimeUnit;
*/
@Component
public class JavaProjectMapperImpl implements JavaProjectMapper {
Logger log = LoggerFactory.getLogger(JavaProjectMapperImpl.class);
//Logger log = LoggerFactory.getLogger(JavaProjectMapperImpl.class);
@Autowired
private InfluxDBSingle influxDBSingle;
@Autowired
private InfluxDataReporter commonQuery;
private InfluxDataReporter influxDataReporter;
Random random = new Random();
... ... @@ -42,14 +36,15 @@ public class JavaProjectMapperImpl implements JavaProjectMapper {
.addField("lastStart", javaProjectStatus.getLastStart() == null ? "1" : javaProjectStatus.getLastStart())
.time(System.currentTimeMillis() * 1000000 + random.nextInt(999999), TimeUnit.NANOSECONDS)
.build();
influxDBSingle.getInfluxDBByName(InfluxDBContants.AWS).getInfluxDB()
.write(InfluxDBContants.MONITOR_SYSTEM, "default", point);
influxDataReporter.report(InfluxDBContants.AWS,InfluxDBContants.MONITOR_SYSTEM,point);
/* influxDBSingle.getInfluxDBByName(InfluxDBContants.AWS).getInfluxDB()
.write(InfluxDBContants.MONITOR_SYSTEM, "default", point);*/
}
@Override
public List<JavaProjectStatus> getByProject(String project) {
String cpmmond = "select cloud,ip,lastStart,status from java_project_status where time > now() - 6m and \"name\" = '" + project + "' order by time desc";
QueryResult result = commonQuery.queryResult(InfluxDBContants.AWS, cpmmond, InfluxDBContants.MONITOR_SYSTEM);
QueryResult result = influxDataReporter.queryResult(InfluxDBContants.AWS, cpmmond, InfluxDBContants.MONITOR_SYSTEM);
List<List<Object>> lists = QueryResultUtil.getValues(result);
List<JavaProjectStatus> javaProjectStatusList = new ArrayList<>();
Set<String> stringSet = new HashSet<>();
... ...
... ... @@ -2,7 +2,6 @@ package com.monitor.influxdb.mapper.impl;
import com.monitor.influxdb.InfluxDataReporter;
import com.monitor.influxdb.InfluxDBSingle;
import com.monitor.influxdb.contants.InfluxDBContants;
import com.monitor.influxdb.mapper.MaliciousIpMapper;
import org.apache.commons.lang.StringUtils;
... ... @@ -24,10 +23,7 @@ public class MaliciousIpMapperImpl implements MaliciousIpMapper {
Logger log = LoggerFactory.getLogger(MaliciousIpMapperImpl.class);
@Autowired
private InfluxDBSingle influxDBSingle;
@Autowired
private InfluxDataReporter commonQuery;
private InfluxDataReporter influxDataReporter;
private final String mip_rpname="rp_thirtyweek";///恶意ip默认保留30个星期,半年左右
... ... @@ -39,29 +35,36 @@ public class MaliciousIpMapperImpl implements MaliciousIpMapper {
*/
@Override
public void insertMip(String influxDBName, String ip){
BatchPoints batchPoints = BatchPoints
.database(InfluxDBContants.YOMO_MONITOR).retentionPolicy(mip_rpname)
.build();
Point point = Point.measurement("monitor_malicousips")
.tag("ip",ip)
.addField("value", ip).build();
batchPoints.point(point);
influxDBSingle.getInfluxDBByName(influxDBName).getInfluxDB()
.write(batchPoints);
/* BatchPoints batchPoints = BatchPoints
.database(InfluxDBContants.YOMO_MONITOR).retentionPolicy(mip_rpname)
.build();
batchPoints.point(point);*/
influxDataReporter.report(influxDBName,InfluxDBContants.YOMO_MONITOR,point,mip_rpname);
/* influxDBSingle.getInfluxDBByName(influxDBName).getInfluxDB()
.write(batchPoints);*/
}
//duration 是influxdb关键字,因此字段名称取为dur,保留策略: rp_thirtyweek
//由于不是默认的保留策略,因此执行查询时,select * from rp_thirtyweek.monitor_malicousips where value= '127.0.0.1';
public void insertMipObj(String influxDBName, String ip,String reason,String duration,String hbaseTime,String createTimeStr){
BatchPoints batchPoints = BatchPoints
.database(InfluxDBContants.YOMO_MONITOR).retentionPolicy(mip_rpname)
.build();
Point point = Point.measurement("monitor_malicousips")
.tag("ip",ip)
.addField("value", ip).addField("dur",duration).addField("reason",reason).addField("hbasetime",hbaseTime).addField("createtime",createTimeStr).build();
batchPoints.point(point);
influxDBSingle.getInfluxDBByName(influxDBName).getInfluxDB()
.write(batchPoints);
/*BatchPoints batchPoints = BatchPoints
.database(InfluxDBContants.YOMO_MONITOR).retentionPolicy(mip_rpname)
.build();
batchPoints.point(point);*/
influxDataReporter.report(influxDBName,InfluxDBContants.YOMO_MONITOR,point,mip_rpname);
/*influxDBSingle.getInfluxDBByName(influxDBName).getInfluxDB()
.write(batchPoints);*/
}
/**
... ... @@ -85,7 +88,7 @@ public class MaliciousIpMapperImpl implements MaliciousIpMapper {
//取出每个ip最新的一条数据
query_cmd = query_cmd.concat("where (").concat(ips).concat(")").concat("group by ip order by time desc limit 1;");
QueryResult result = commonQuery.queryResult(influxDBStr, query_cmd, InfluxDBContants.YOMO_MONITOR);
QueryResult result = influxDataReporter.queryResult(influxDBStr, query_cmd, InfluxDBContants.YOMO_MONITOR);
for (QueryResult.Result queryResult : result.getResults()) {
if (queryResult.getSeries() == null)
continue;
... ...
... ... @@ -2,10 +2,8 @@ package com.monitor.influxdb.mapper.impl;
import com.monitor.influxdb.InfluxDataReporter;
import com.monitor.influxdb.InfluxDBSingle;
import com.monitor.influxdb.contants.InfluxDBContants;
import com.monitor.influxdb.mapper.MonitorAlarmMapper;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.influxdb.dto.QueryResult;
import org.springframework.beans.factory.annotation.Autowired;
... ... @@ -15,10 +13,7 @@ import org.springframework.stereotype.Service;
public class MonitorAlarmMapperImpl implements MonitorAlarmMapper {
@Autowired
private InfluxDBSingle influxDBSingle;
@Autowired
private InfluxDataReporter commonQuery;
private InfluxDataReporter influxDataReporter;
/**
* 插入告警短信
... ... @@ -30,19 +25,23 @@ public class MonitorAlarmMapperImpl implements MonitorAlarmMapper {
*/
@Override
public void insertAlarmMsg(String influxDBName, String alarmType, String snsContent, String alarmInfo, String sendStatus,String result,String mobile) {
BatchPoints batchPoints = BatchPoints
.database(InfluxDBContants.YOMO_MONITOR).tag("event", "alarm").retentionPolicy("default")
.build();
Point point = Point.measurement("monitor_alarm")
.tag("type", alarmType)
.tag("send_status", sendStatus)
.tag("event", "alarm")
.addField("sms_content", snsContent)
.addField("result", result)
.addField("mobile", mobile)
.addField("alarm_info", alarmInfo).build();
influxDataReporter.report(influxDBName,InfluxDBContants.YOMO_MONITOR,point);
/* BatchPoints batchPoints = BatchPoints
.database(InfluxDBContants.YOMO_MONITOR).tag("event", "alarm").retentionPolicy("default")
.build();
batchPoints.point(point);
influxDBSingle.getInfluxDBByName(influxDBName).getInfluxDB()
.write(batchPoints);
.write(batchPoints);*/
}
/**
... ... @@ -55,7 +54,7 @@ public class MonitorAlarmMapperImpl implements MonitorAlarmMapper {
public QueryResult selectTagValuesType(String influxDBName) {
String command = "SHOW TAG VALUES FROM monitor_alarm WITH KEY = type";
return commonQuery.queryResult(influxDBName, command, InfluxDBContants.YOMO_MONITOR);
return influxDataReporter.queryResult(influxDBName, command, InfluxDBContants.YOMO_MONITOR);
}
/**
... ... @@ -70,7 +69,7 @@ public class MonitorAlarmMapperImpl implements MonitorAlarmMapper {
String command = "SELECT sms_content, info, send_status FROM monitor_alarm WHERE event = 'alarm'" +
" AND time > now() - " + minute + "m" +
" AND type = '" + type + "' ORDER BY time DESC";
return commonQuery.queryResult(influxDBName, command, InfluxDBContants.YOMO_MONITOR);
return influxDataReporter.queryResult(influxDBName, command, InfluxDBContants.YOMO_MONITOR);
}
/**
... ... @@ -86,7 +85,7 @@ public class MonitorAlarmMapperImpl implements MonitorAlarmMapper {
String command = "SELECT sms_content, info, send_status FROM monitor_alarm WHERE event = 'alarm'" +
" AND type = '" + type + "' ORDER BY time DESC LIMIT " + limitCount +
" OFFSET " + offsetCount;
return commonQuery.queryResult(influxDBName, command, InfluxDBContants.YOMO_MONITOR);
return influxDataReporter.queryResult(influxDBName, command, InfluxDBContants.YOMO_MONITOR);
}
/**
... ... @@ -100,7 +99,7 @@ public class MonitorAlarmMapperImpl implements MonitorAlarmMapper {
public QueryResult selectAlarmMsgCount(String influxDBName, String type) {
String command = "SELECT COUNT(sms_content) FROM monitor_alarm " +
"WHERE event = 'alarm' AND type = '" + type + "'";
return commonQuery.queryResult(influxDBName, command, InfluxDBContants.YOMO_MONITOR);
return influxDataReporter.queryResult(influxDBName, command, InfluxDBContants.YOMO_MONITOR);
}
@Override
... ... @@ -116,7 +115,7 @@ public class MonitorAlarmMapperImpl implements MonitorAlarmMapper {
" and sms_content != 'YOHOBUY实时店铺监控系统:spark数据没写入' " +
" and sms_content != 'MonitAlarmInfo(host=10.67.1.139, service=SparkJobRunner, info=spark任务异常)' ";
// String command = "SELECT count(sms_content) FROM monitor_alarm where time > now() - "+ min+"m";
return commonQuery.queryResult(influxDBName, command, InfluxDBContants.YOMO_MONITOR);
return influxDataReporter.queryResult(influxDBName, command, InfluxDBContants.YOMO_MONITOR);
}
... ... @@ -134,7 +133,7 @@ public class MonitorAlarmMapperImpl implements MonitorAlarmMapper {
" and sms_content != 'YOHOBUY实时店铺监控系统:spark数据没写入' " +
" and sms_content != 'MonitAlarmInfo(host=10.67.1.139, service=SparkJobRunner, info=spark任务异常)' ";
// String command = "SELECT count(sms_content) FROM monitor_alarm where time > now() - "+ min+"m";
return commonQuery.queryResult(influxDBName, command, InfluxDBContants.YOMO_MONITOR);
return influxDataReporter.queryResult(influxDBName, command, InfluxDBContants.YOMO_MONITOR);
}
}
... ...
package com.monitor.influxdb.mapper.impl;
import com.monitor.influxdb.InfluxDBSingle;
import com.monitor.influxdb.InfluxDataReporter;
import com.monitor.influxdb.contants.InfluxDBContants;
import com.monitor.influxdb.mapper.NodeHostMapper;
import org.influxdb.dto.Point;
... ... @@ -14,7 +14,7 @@ import java.util.concurrent.TimeUnit;
public class NodeHostMapperImpl implements NodeHostMapper {
@Autowired
private InfluxDBSingle influxDBSingle;
private InfluxDataReporter influxDataReporter;
private Random random = new Random();
... ... @@ -29,8 +29,9 @@ public class NodeHostMapperImpl implements NodeHostMapper {
.addField("projectCpu", projectCpu)
.time(System.currentTimeMillis() * 1000000 + random.nextInt(999999), TimeUnit.NANOSECONDS)
.build();
influxDBSingle.getInfluxDBByName(InfluxDBContants.AWS).getInfluxDB()
.write(InfluxDBContants.YOHO_JMX, "seven_days", point);
influxDataReporter.report(InfluxDBContants.AWS,InfluxDBContants.YOHO_JMX,point,"seven_days");
/*influxDBSingle.getInfluxDBByName(InfluxDBContants.AWS).getInfluxDB()
.write(InfluxDBContants.YOHO_JMX, "seven_days", point);*/
}
}
... ...
... ... @@ -11,7 +11,7 @@ import org.springframework.stereotype.Service;
public class PaymentMapperImpl implements PaymentMapper {
@Autowired
private InfluxDataReporter commonQuery;
private InfluxDataReporter influxDataReporter;
/**
* 查询最近时间内支付宝回调次数
* @param minute
... ... @@ -23,7 +23,7 @@ public class PaymentMapperImpl implements PaymentMapper {
String command = "SELECT count(ip) FROM payment " +
"WHERE time > now() - " + minute + "m " +
"AND event = 'payment' AND type = 'alipay'";
return commonQuery.queryResult(InfluxDBContants.ORDER, command, InfluxDBContants.YOHO_ORDER);
return influxDataReporter.queryResult(InfluxDBContants.ORDER, command, InfluxDBContants.YOHO_ORDER);
}
/**
... ... @@ -38,7 +38,7 @@ public class PaymentMapperImpl implements PaymentMapper {
String command = "SELECT count(ip) FROM payment " +
"WHERE time > now() - " + minute + "m " +
"AND event = 'payment' AND type = 'alipay'";
return commonQuery.queryResult(InfluxDBContants.Q_CLOUD, command, InfluxDBContants.YOHO_ORDER);
return influxDataReporter.queryResult(InfluxDBContants.Q_CLOUD, command, InfluxDBContants.YOHO_ORDER);
}
}
... ...
... ... @@ -25,7 +25,7 @@ import java.util.*;
public class ServiceAccessMapperImpl implements ServiceAccessMapper {
@Autowired
private InfluxDataReporter commonQuery;
private InfluxDataReporter influxDataReporter;
private String contextFilter = "(context = 'gateway' or context = 'order' or context = 'promotion' or context = 'product' or context = 'message' or context = 'sns' or context = 'users' or context = 'resources' or context = 'brower')";
... ... @@ -44,7 +44,7 @@ public class ServiceAccessMapperImpl implements ServiceAccessMapper {
}else{
sql = String.format("select count(cost),mean(cost),sum(cost) from service_access where context = '%s' and time > '%s' and time < '%s' group by hostAddress",req.getServiceName(),req.getStartTime(),req.getEndTime());
}
log.info("getBaseDataByContext sql:" + sql);
// log.info("getBaseDataByContext sql:" + sql);
map.putAll(getDataByContext(InfluxDBContants.AWS,sql));
map.putAll(getDataByContext(InfluxDBContants.Q_CLOUD,sql));
... ... @@ -53,7 +53,7 @@ public class ServiceAccessMapperImpl implements ServiceAccessMapper {
private Map<String,NewJavaApiInfoRep> getDataByContext(String source,String sql) {
Map<String,NewJavaApiInfoRep> map = new HashMap<>();
QueryResult queryResult = commonQuery.queryResult(source, sql, InfluxDBContants.YOHO_EVENT);
QueryResult queryResult = influxDataReporter.queryResult(source, sql, InfluxDBContants.YOHO_EVENT);
QueryResult.Result rel = queryResult.getResults().get(0);
List<QueryResult.Series> listSeries = rel.getSeries();
if(listSeries == null)
... ... @@ -82,7 +82,7 @@ public class ServiceAccessMapperImpl implements ServiceAccessMapper {
}else{
sql = String.format("select count(cost) from service_access where context='%s' and cost > 200 and time > '%s' and time < '%s' group by hostAddress",req.getServiceName(),req.getStartTime(),req.getEndTime());
}
log.info("getTimeoutCount sql:" + sql);
// log.info("getTimeoutCount sql:" + sql);
map.putAll(getTimeoutCount(InfluxDBContants.AWS,sql));
map.putAll(getTimeoutCount(InfluxDBContants.Q_CLOUD,sql));
... ... @@ -91,7 +91,7 @@ public class ServiceAccessMapperImpl implements ServiceAccessMapper {
private Map<String,Integer> getTimeoutCount(String source,String sql) {
Map<String,Integer> map = new HashMap<>();
QueryResult queryResult = commonQuery.queryResult(source, sql, InfluxDBContants.YOHO_EVENT);
QueryResult queryResult = influxDataReporter.queryResult(source, sql, InfluxDBContants.YOHO_EVENT);
QueryResult.Result rel = queryResult.getResults().get(0);
List<QueryResult.Series> listSeries = rel.getSeries();
if(listSeries == null)
... ... @@ -112,7 +112,7 @@ public class ServiceAccessMapperImpl implements ServiceAccessMapper {
public Map<String,List> getJavaApiGraphByServiceType(String context){
Map<String,List> map = new HashMap();
String sql = String.format("SELECT mean(cost) FROM service_access WHERE time > now() - 10m and context = '%s' GROUP BY hostAddress,time(1m) fill(null)",context);
log.info("getJavaApiGraphByServiceType sql:" + sql);
// log.info("getJavaApiGraphByServiceType sql:" + sql);
map.putAll(getJavaApiGraphByServiceType(InfluxDBContants.AWS,sql));
map.putAll(getJavaApiGraphByServiceType(InfluxDBContants.Q_CLOUD,sql));
return map;
... ... @@ -122,7 +122,7 @@ public class ServiceAccessMapperImpl implements ServiceAccessMapper {
private Map<String,List> getJavaApiGraphByServiceType(String source,String sql) {
Map<String,List> resultMap = new HashMap<>();
QueryResult queryResult = commonQuery.queryResult(source, sql, InfluxDBContants.YOHO_EVENT);
QueryResult queryResult = influxDataReporter.queryResult(source, sql, InfluxDBContants.YOHO_EVENT);
QueryResult.Result rel = queryResult.getResults().get(0);
List<QueryResult.Series> listSeries = rel.getSeries();
if(listSeries == null)
... ... @@ -165,10 +165,10 @@ public class ServiceAccessMapperImpl implements ServiceAccessMapper {
@Override
public Map<String,List> getServiceNameJavaApiGraph(String source){
String sql = String.format("SELECT mean(cost) FROM service_access WHERE %s and time > now() - 10m GROUP BY context,time(1m) fill(null)",contextFilter);
log.info("getServiceNameJavaApiGraph sql:" + sql);
// log.info("getServiceNameJavaApiGraph sql:" + sql);
Map<String,List> resultMap = new HashMap<>();
QueryResult queryResult = commonQuery.queryResult(source, sql, InfluxDBContants.YOHO_EVENT);
QueryResult queryResult = influxDataReporter.queryResult(source, sql, InfluxDBContants.YOHO_EVENT);
QueryResult.Result rel = queryResult.getResults().get(0);
List<QueryResult.Series> listSeries = rel.getSeries();
if(listSeries == null)
... ... @@ -229,8 +229,7 @@ public class ServiceAccessMapperImpl implements ServiceAccessMapper {
}
}
log.info("getDataByContextAndIP sql:" + sql);
QueryResult queryResult = commonQuery.queryResult(source, sql, InfluxDBContants.YOHO_EVENT);
QueryResult queryResult = influxDataReporter.queryResult(source, sql, InfluxDBContants.YOHO_EVENT);
QueryResult.Result rel = queryResult.getResults().get(0);
List<QueryResult.Series> listSeries = rel.getSeries();
if(listSeries == null)
... ... @@ -284,7 +283,7 @@ public class ServiceAccessMapperImpl implements ServiceAccessMapper {
private Map<String,Integer> getTimeoutCountByContextAndIp(String source,String sql) {
Map<String,Integer> map = new HashMap<>();
QueryResult queryResult = commonQuery.queryResult(source, sql, InfluxDBContants.YOHO_EVENT);
QueryResult queryResult = influxDataReporter.queryResult(source, sql, InfluxDBContants.YOHO_EVENT);
QueryResult.Result rel = queryResult.getResults().get(0);
List<QueryResult.Series> listSeries = rel.getSeries();
if(listSeries == null)
... ... @@ -304,7 +303,7 @@ public class ServiceAccessMapperImpl implements ServiceAccessMapper {
Map<String,NewJavaApiInfoRep> map = new HashMap<>();
String sql = String.format("select count(cost),mean(cost),sum(cost) as sum from service_access where context = '%s' and event = '%s' and time > '%s' and time < '%s' group by hostAddress",req.getServiceName(),req.getApiName(),req.getStartTime(),req.getEndTime());
log.info("getDataByContextAndApiName sql:" + sql);
// log.info("getDataByContextAndApiName sql:" + sql);
map.putAll(getDataByContext(InfluxDBContants.AWS,sql));
map.putAll(getDataByContext(InfluxDBContants.Q_CLOUD,sql));
return map;
... ... @@ -318,7 +317,7 @@ public class ServiceAccessMapperImpl implements ServiceAccessMapper {
public Map<String,Integer> getTimeoutCountByContextAndApiName(NewJavaApiInfoReq req) {
Map<String,Integer> map = new HashMap();
String sql = String.format("select count(cost) from service_access where context='%s' and event = '%s' and cost > 200 and time > '%s' and time < '%s' group by hostAddress",req.getServiceName(),req.getApiName(),req.getStartTime(),req.getEndTime());
log.info("getTimeoutCountByContextAndApiName sql:" + sql);
// log.info("getTimeoutCountByContextAndApiName sql:" + sql);
map.putAll(getTimeoutCount(InfluxDBContants.AWS,sql));
map.putAll(getTimeoutCount(InfluxDBContants.Q_CLOUD,sql));
return map;
... ... @@ -368,7 +367,7 @@ public class ServiceAccessMapperImpl implements ServiceAccessMapper {
private List<String> getTimeoutInfo(String source,String sql) {
List<String> list = new ArrayList<>();
QueryResult queryResult = commonQuery.queryResult(source, sql, InfluxDBContants.YOHO_EVENT);
QueryResult queryResult = influxDataReporter.queryResult(source, sql, InfluxDBContants.YOHO_EVENT);
QueryResult.Result rel = queryResult.getResults().get(0);
List<QueryResult.Series> listSeries = rel.getSeries();
if(listSeries == null)
... ... @@ -394,7 +393,7 @@ public class ServiceAccessMapperImpl implements ServiceAccessMapper {
String sql1 = String.format("select count(cost) from service_access where %s and time > '%s' and time < '%s' and cost > %d group by hostAddress,context;",contextFilter,startDateStr,endDateStr,costThreshold);
String sql2 = String.format("select count(cost),mean(cost) from service_access where %s and time > '%s' and time < '%s' group by hostAddress,context",contextFilter,startDateStr,endDateStr);
String sql = sql1 + sql2;
log.info("getAlarmData sql is " + sql);
// log.info("getAlarmData sql is " + sql);
List<Map> list = new ArrayList<>();
list.addAll(getAlarmData(InfluxDBContants.AWS,sql,countThreshold));
list.addAll(getAlarmData(InfluxDBContants.Q_CLOUD,sql,countThreshold));
... ... @@ -404,7 +403,7 @@ public class ServiceAccessMapperImpl implements ServiceAccessMapper {
private List<Map> getAlarmData(String source,String sql,Integer countThreshold){
List<Map> returnList = new ArrayList<>();
QueryResult queryResult = commonQuery.queryResult(source, sql, InfluxDBContants.YOHO_EVENT);
QueryResult queryResult = influxDataReporter.queryResult(source, sql, InfluxDBContants.YOHO_EVENT);
QueryResult.Result rel1 = queryResult.getResults().get(0);
QueryResult.Result rel2 = queryResult.getResults().get(1);
List<QueryResult.Series> listSeries1 = rel1.getSeries();
... ...
... ... @@ -21,7 +21,7 @@ import java.util.*;
@Component
public class ServiceServerExceptionMapperImpl implements ServiceServerExceptionMapper {
@Autowired
private InfluxDataReporter commonQuery;
private InfluxDataReporter influxDataReporter;
private String contextFilter = "(context = 'gateway' or context = 'order' or context = 'promotion' or context = 'product' or context = 'message' or context = 'sns' or context = 'users' or context = 'resources' or context = 'brower')";
... ... @@ -44,7 +44,7 @@ public class ServiceServerExceptionMapperImpl implements ServiceServerException
private Map<String,Integer> getErrorCount(String source,String sql) {
Map<String,Integer> map = new HashMap<>();
QueryResult queryResult = commonQuery.queryResult(source, sql, InfluxDBContants.YOMO_MONITOR);
QueryResult queryResult = influxDataReporter.queryResult(source, sql, InfluxDBContants.YOMO_MONITOR);
QueryResult.Result rel = queryResult.getResults().get(0);
List<QueryResult.Series> listSeries = rel.getSeries();
if(listSeries == null)
... ... @@ -72,7 +72,7 @@ public class ServiceServerExceptionMapperImpl implements ServiceServerException
private Map<String,List> getJavaApiGraphByServiceType(String source,String sql) {
Map<String,List> resultMap = new HashMap<>();
QueryResult queryResult = commonQuery.queryResult(source, sql, InfluxDBContants.YOMO_MONITOR);
QueryResult queryResult = influxDataReporter.queryResult(source, sql, InfluxDBContants.YOMO_MONITOR);
QueryResult.Result rel = queryResult.getResults().get(0);
List<QueryResult.Series> listSeries = rel.getSeries();
if(listSeries == null)
... ... @@ -114,8 +114,8 @@ public class ServiceServerExceptionMapperImpl implements ServiceServerException
public Map<String,List> getServiceNameJavaApiGraph(String source){
Map<String,List> resultMap = new HashMap();
String sql = String.format("select count(stack) from service_server_exception where %s and time > now() - 10m GROUP BY context,time(1m) fill(null)",contextFilter);
log.info("getServiceNameJavaApiGraph sql:" + sql);
QueryResult queryResult = commonQuery.queryResult(source, sql, InfluxDBContants.YOMO_MONITOR);
//log.info("getServiceNameJavaApiGraph sql:" + sql);
QueryResult queryResult = influxDataReporter.queryResult(source, sql, InfluxDBContants.YOMO_MONITOR);
QueryResult.Result rel = queryResult.getResults().get(0);
List<QueryResult.Series> listSeries = rel.getSeries();
if(listSeries == null)
... ... @@ -182,7 +182,7 @@ public class ServiceServerExceptionMapperImpl implements ServiceServerException
private Map<String,Integer> getErrorCountByContextAndIp(String source,String sql) {
Map<String,Integer> map = new HashMap<>();
QueryResult queryResult = commonQuery.queryResult(source, sql, InfluxDBContants.YOMO_MONITOR);
QueryResult queryResult = influxDataReporter.queryResult(source, sql, InfluxDBContants.YOMO_MONITOR);
QueryResult.Result rel = queryResult.getResults().get(0);
List<QueryResult.Series> listSeries = rel.getSeries();
if(listSeries == null)
... ... @@ -252,7 +252,7 @@ public class ServiceServerExceptionMapperImpl implements ServiceServerException
private List<String> queryErrorInfoList(String source,String sql) {
List<String> list = new ArrayList<>();
QueryResult queryResult = commonQuery.queryResult(source, sql, InfluxDBContants.YOMO_MONITOR);
QueryResult queryResult = influxDataReporter.queryResult(source, sql, InfluxDBContants.YOMO_MONITOR);
QueryResult.Result rel = queryResult.getResults().get(0);
List<QueryResult.Series> listSeries = rel.getSeries();
if(listSeries == null)
... ...
package com.monitor.influxdb.mapper.impl;
import com.monitor.influxdb.InfluxDataReporter;
import com.monitor.influxdb.InfluxDBSingle;
import com.monitor.influxdb.contants.InfluxDBContants;
import com.monitor.influxdb.mapper.SmsUpMapper;
import com.monitor.model.domain.QcloudSmsUp;
import com.monitor.model.domain.QcloudVoiceUpBody;
import org.apache.commons.lang.StringUtils;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.influxdb.dto.QueryResult;
import org.springframework.beans.factory.annotation.Autowired;
... ... @@ -18,45 +16,39 @@ import org.springframework.stereotype.Component;
*/
@Component
public class SmsUpMapperImpl implements SmsUpMapper {
@Autowired
private InfluxDBSingle influxDBSingle;
@Autowired
private InfluxDataReporter commonQuery;
private InfluxDataReporter influxDataReporter;
@Override
public void insert(QcloudVoiceUpBody qcloudSmsUpBody) {
BatchPoints batchPoints = BatchPoints
/* BatchPoints batchPoints = BatchPoints
.database(InfluxDBContants.YOMO_MONITOR).retentionPolicy("default")
.build();
try {
Point point = Point.measurement("voice_up")
.addField("result", qcloudSmsUpBody.getResult())
.addField("callid", StringUtils.isBlank(qcloudSmsUpBody.getCallid()) ? " " : qcloudSmsUpBody.getCallid())
.addField("mobile", StringUtils.isBlank(qcloudSmsUpBody.getMobile())?" ":qcloudSmsUpBody.getMobile())
.addField("nationcode", StringUtils.isBlank(qcloudSmsUpBody.getNationcode())?" ":qcloudSmsUpBody.getNationcode())
.addField("call_from", StringUtils.isBlank(qcloudSmsUpBody.getCall_from())?" ":qcloudSmsUpBody.getCall_from())
.addField("start_calltime", StringUtils.isBlank(qcloudSmsUpBody.getStart_calltime())?" ":qcloudSmsUpBody.getStart_calltime())
.addField("accept_time", StringUtils.isBlank(qcloudSmsUpBody.getAccept_time()) ? " " : qcloudSmsUpBody.getAccept_time())
.addField("fee", StringUtils.isBlank(qcloudSmsUpBody.getFee()) ? " " : qcloudSmsUpBody.getFee()).build();
batchPoints.point(point);
.build();*/
//try {
Point point = Point.measurement("voice_up")
.addField("result", qcloudSmsUpBody.getResult())
.addField("callid", StringUtils.isBlank(qcloudSmsUpBody.getCallid()) ? " " : qcloudSmsUpBody.getCallid())
.addField("mobile", StringUtils.isBlank(qcloudSmsUpBody.getMobile())?" ":qcloudSmsUpBody.getMobile())
.addField("nationcode", StringUtils.isBlank(qcloudSmsUpBody.getNationcode())?" ":qcloudSmsUpBody.getNationcode())
.addField("call_from", StringUtils.isBlank(qcloudSmsUpBody.getCall_from())?" ":qcloudSmsUpBody.getCall_from())
.addField("start_calltime", StringUtils.isBlank(qcloudSmsUpBody.getStart_calltime())?" ":qcloudSmsUpBody.getStart_calltime())
.addField("accept_time", StringUtils.isBlank(qcloudSmsUpBody.getAccept_time()) ? " " : qcloudSmsUpBody.getAccept_time())
.addField("fee", StringUtils.isBlank(qcloudSmsUpBody.getFee()) ? " " : qcloudSmsUpBody.getFee()).build();
influxDataReporter.report(InfluxDBContants.AWS,InfluxDBContants.YOMO_MONITOR,point);
/*batchPoints.point(point);
influxDBSingle.getInfluxDBByName(InfluxDBContants.AWS).getInfluxDB()
.write(batchPoints);
}
.write(batchPoints);*/
/* }
catch (Exception e)
{
e.printStackTrace();
}
}*/
}
@Override
public void insert(QcloudSmsUp qcloudSmsUp) {
BatchPoints batchPoints = BatchPoints
.database(InfluxDBContants.YOMO_MONITOR).retentionPolicy("default")
.build();
Point point = Point.measurement("sms_up")
.addField("sign", StringUtils.isBlank(qcloudSmsUp.getSign()) ? " " : qcloudSmsUp.getSign())
.addField("text", StringUtils.isBlank(qcloudSmsUp.getText()) ? " " : qcloudSmsUp.getText())
... ... @@ -64,16 +56,22 @@ public class SmsUpMapperImpl implements SmsUpMapper {
.addField("time", StringUtils.isBlank(qcloudSmsUp.getTime())?" ":qcloudSmsUp.getTime())
.addField("time", StringUtils.isBlank(qcloudSmsUp.getNationcode())?" ":qcloudSmsUp.getNationcode())
.addField("extend", StringUtils.isBlank(qcloudSmsUp.getExtend())?" ":qcloudSmsUp.getExtend()).build();
influxDataReporter.report(InfluxDBContants.AWS,InfluxDBContants.YOMO_MONITOR,point);
/*BatchPoints batchPoints = BatchPoints
.database(InfluxDBContants.YOMO_MONITOR).retentionPolicy("default")
.build();
batchPoints.point(point);
influxDBSingle.getInfluxDBByName(InfluxDBContants.AWS).getInfluxDB()
.write(batchPoints);
.write(batchPoints);*/
}
@Override
public QueryResult select(int min) {
String command = "SELECT count(text) FROM sms_up where time > now() - "+ min+"m";
return commonQuery.queryResult(InfluxDBContants.AWS, command, InfluxDBContants.YOMO_MONITOR);
return influxDataReporter.queryResult(InfluxDBContants.AWS, command, InfluxDBContants.YOMO_MONITOR);
}
... ...
package com.monitor.influxdb.mapper.impl;
import com.monitor.influxdb.InfluxDBSingle;
import com.monitor.influxdb.InfluxDataReporter;
import com.monitor.influxdb.contants.InfluxDBContants;
import com.monitor.influxdb.mapper.UserOperateMapper;
import com.monitor.model.domain.UserOperate;
... ... @@ -22,7 +22,7 @@ public class UserOperateMapperImpl implements UserOperateMapper {
Logger log = LoggerFactory.getLogger(UserOperateMapperImpl.class);
@Autowired
private InfluxDBSingle influxDBSingle;
private InfluxDataReporter influxDataReporter;
Random random = new Random();
... ... @@ -35,7 +35,8 @@ public class UserOperateMapperImpl implements UserOperateMapper {
.tag("user",userOperate.getUser())
.time(System.currentTimeMillis() * 1000000 + random.nextInt(999999), TimeUnit.NANOSECONDS)
.build();
influxDBSingle.getInfluxDBByName(InfluxDBContants.ALARM).getInfluxDB()
.write(InfluxDBContants.SOME_OHTER_DATA, "default", point);
influxDataReporter.report(InfluxDBContants.ALARM,InfluxDBContants.SOME_OHTER_DATA,point);
/*influxDBSingle.getInfluxDBByName(InfluxDBContants.ALARM).getInfluxDB()
.write(InfluxDBContants.SOME_OHTER_DATA, "default", point);*/
}
}
... ...
... ... @@ -19,7 +19,7 @@ import java.util.Map;
public class VMInfoMapperImpl implements VMInfoMapper {
@Autowired
private InfluxDataReporter commonQuery;
private InfluxDataReporter influxDataReporter;
@Override
public Map getVMInfo() {
... ... @@ -32,7 +32,7 @@ public class VMInfoMapperImpl implements VMInfoMapper {
private Map getVMInfo(String source,String sql){
Map map = new HashMap<>();
QueryResult queryResult = commonQuery.queryResult(source, sql, InfluxDBContants.ZABBIX);
QueryResult queryResult = influxDataReporter.queryResult(source, sql, InfluxDBContants.ZABBIX);
QueryResult.Result rel = queryResult.getResults().get(0);
List<QueryResult.Series> listSeries = rel.getSeries();
if(listSeries == null)
... ...
package com.monitor.influxdb.mapper.impl;
import com.monitor.influxdb.InfluxDBModel;
import com.monitor.influxdb.InfluxDataReporter;
import com.monitor.influxdb.InfluxDBSingle;
import com.monitor.influxdb.contants.InfluxDBContants;
import com.monitor.influxdb.mapper.IZkMapper;
import com.monitor.influxdb.model.ZkInfo;
... ... @@ -11,7 +9,6 @@ import com.monitor.influxdb.util.QueryResultUtil;
import com.monitor.model.domain.PageBean;
import org.apache.commons.lang.StringUtils;
import org.influxdb.dto.Point;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
... ... @@ -32,25 +29,23 @@ public class ZkMapper implements IZkMapper {
Logger log = LoggerFactory.getLogger(ZkMapper.class);
@Autowired
private InfluxDBSingle influxDBSingle;
@Autowired
private InfluxDataReporter commonQuery;
private InfluxDataReporter influxDataReporter;
Random random = new Random();
@Override
public void insert(ZkInfo zkInfo) {
log.info("insert zkInfo param ip is {},isLive {}",zkInfo.getHostIp(),zkInfo.getIsLive());
//log.info("insert zkInfo param ip is {},isLive {}",zkInfo.getHostIp(),zkInfo.getIsLive());
Point point = Point.measurement(InfluxDBContants.ZOOKEEPER_ALARM)
.addField("hostIp", zkInfo.getHostIp())
.addField("cloudType", zkInfo.getCloudType())
.addField("isLive", zkInfo.getIsLive())
.time(System.currentTimeMillis() * 1000000 + random.nextInt(999999), TimeUnit.NANOSECONDS)
.build();
influxDBSingle.getInfluxDBByName(InfluxDBContants.ALARM).getInfluxDB()
.write(InfluxDBContants.MIDDLEWARE_ALARM, "default", point);
influxDataReporter.report(InfluxDBContants.ALARM,InfluxDBContants.MIDDLEWARE_ALARM,point);
/*influxDBSingle.getInfluxDBByName(InfluxDBContants.ALARM).getInfluxDB()
.write(InfluxDBContants.MIDDLEWARE_ALARM, "default", point);*/
}
... ... @@ -80,8 +75,8 @@ public class ZkMapper implements IZkMapper {
command.append(" where isLive="+isLive);
}
}
InfluxDBModel influxDBModel= influxDBSingle.getInfluxDBByName(InfluxDBContants.ALARM);
QueryResult result= commonQuery.queryResult(influxDBModel.getName(), command.toString(), InfluxDBContants.MIDDLEWARE_ALARM);
// InfluxDBModel influxDBModel= influxDBSingle.getInfluxDBByName(InfluxDBContants.ALARM);
QueryResult result= influxDataReporter.queryResult(InfluxDBContants.ALARM, command.toString(), InfluxDBContants.MIDDLEWARE_ALARM);
return QueryResultUtil.getCount(result);
}
... ... @@ -112,8 +107,8 @@ public class ZkMapper implements IZkMapper {
}
}
command.append(" ORDER BY time DESC LIMIT " + page.getPageSize() + " OFFSET " + page.getStartIndex());
InfluxDBModel influxDBModel= influxDBSingle.getInfluxDBByName(InfluxDBContants.ALARM);
QueryResult result= commonQuery.queryResult(influxDBModel.getName(), command.toString(), InfluxDBContants.MIDDLEWARE_ALARM);
// InfluxDBModel influxDBModel= influxDBSingle.getInfluxDBByName(InfluxDBContants.ALARM);
QueryResult result= influxDataReporter.queryResult(InfluxDBContants.ALARM, command.toString(), InfluxDBContants.MIDDLEWARE_ALARM);
return QueryResultUtil.getValues(result);
}
... ... @@ -129,9 +124,12 @@ public class ZkMapper implements IZkMapper {
sb.append(" and cloudType=" + cloudType);
sb.append(" order by time desc limit 1;");
}
Query query = new Query(sb.toString(), InfluxDBContants.MIDDLEWARE_ALARM);
QueryResult result = influxDBSingle.getInfluxDBByName(InfluxDBContants.ALARM).getInfluxDB().query(query);
QueryResult result= influxDataReporter.queryResult(InfluxDBContants.ALARM, sb.toString(), InfluxDBContants.MIDDLEWARE_ALARM);
/* Query query = new Query(sb.toString(), InfluxDBContants.MIDDLEWARE_ALARM);
QueryResult result = influxDBSingle.getInfluxDBByName(InfluxDBContants.ALARM).getInfluxDB().query(query);*/
for (QueryResult.Result queryResult : result.getResults()) {
if (queryResult.getSeries() == null)
... ... @@ -164,7 +162,7 @@ public class ZkMapper implements IZkMapper {
String query_cmd = "select time,context,event,hostAddress,total_cost from service_call_stat where time > "+timeBegin +" and time <= "+timeEnd;
log.info(" getServiceCallstat begin: ");
log.info(" getServiceCallstat sql: " + query_cmd);
QueryResult result = commonQuery.queryResult(influxDBStr, query_cmd, InfluxDBContants.YOHO_EVENT_SAMPLE);
QueryResult result = influxDataReporter.queryResult(influxDBStr, query_cmd, InfluxDBContants.YOHO_EVENT_SAMPLE);
for (QueryResult.Result queryResult : result.getResults()) {
if (queryResult.getSeries() == null)
continue;
... ...
... ... @@ -22,11 +22,9 @@ import java.util.List;
public class InfluxDBSimpleTest {
Logger logger = LoggerFactory.getLogger("frw");
@Autowired
private InfluxDBSingle influxDBSingle;
@Autowired
private InfluxDataReporter commonQuery;
private InfluxDataReporter influxDataReporter;
@Before
public void init() {
... ... @@ -56,7 +54,7 @@ public class InfluxDBSimpleTest {
}*/
String sql="select \"name\",number from test";
String db="gml_test";
QueryResult queryResult = commonQuery.queryResult("test", sql, db);
QueryResult queryResult = influxDataReporter.queryResult("test", sql, db);
logger.info("{}",queryResult);
List<QueryResult.Series> seriesList = QueryResultUtil.getSeries(queryResult);
if (seriesList == null) {
... ...
package com.monitor.javaserver.ctrl;
import com.monitor.influxdb.InfluxDBSingle;
import com.monitor.influxdb.InfluxDataReporter;
import com.monitor.influxdb.contants.InfluxDBContants;
import com.monitor.javaserver.service.NewJavaApiInfoService;
import com.monitor.model.request.NewJavaApiInfoReq;
... ... @@ -20,7 +20,9 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import java.text.DecimalFormat;
import java.util.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Created by wangning on 2016/10/20.
... ... @@ -36,7 +38,7 @@ public class NewJavaApiInfoCtrl {
NewJavaApiInfoService newJavaApiInfoService;
@Autowired
private InfluxDBSingle influxDBSingle;
private InfluxDataReporter influxDataReporter;
private static final String DATE_TIME_FORMAT = "yyyy-MM-dd HH:mm:ss";
... ... @@ -175,21 +177,6 @@ public class NewJavaApiInfoCtrl {
return rep;
}
@RequestMapping("/test")
@ResponseBody
public BaseResponse test() {
... ... @@ -210,8 +197,8 @@ public class NewJavaApiInfoCtrl {
.addField("CPU_UTIL_USER",CPU_UTIL_USER).addField("MEMORY_SIZE_AVAILABLE",MEMORY_SIZE_AVAILABLE).addField("MEMORY_SIZE_TOTAL",MEMORY_SIZE_TOTAL)
.addField("NET_IF_IN",NET_IF_IN).addField("NET_IF_OUT",NET_IF_OUT)
.build();
influxDBSingle.getInfluxDBByName(InfluxDBContants.AWS).getInfluxDB()
.write(InfluxDBContants.ZABBIX, "default", point);
influxDataReporter.report(InfluxDBContants.AWS,InfluxDBContants.ZABBIX,point);
}
return null;
}
... ... @@ -230,8 +217,8 @@ public class NewJavaApiInfoCtrl {
.addField("cost", RandomUtils.nextInt(199) + 2000).addField("ip",ip)
.addField("stack","Total Delay [2005ms] /gateway\n +---[2005ms] - com.yoho.core.common.monitor.ThreadProfileInterceptor.preHandle - [enter:1477966424008,exit:1477966426013]\n +---[2005ms] - app.passport.signinAES. - [enter:1477966424008,exit:1477966426013]\n +---[1ms] - com.yoho.core.redis.YHValueOperations.increment - [enter:1477966424008,exit:1477966424009]\n +---[1ms] - com.yoho.core.redis.YHValueOperations.get - [enter:1477966424011,exit:1477966424012]\n +---[2000ms] - com.yoho.core.rest.client.HystrixServiceCaller.usersingle.login - [enter:1477966424012,exit:1477966426012]\n")
.build();
influxDBSingle.getInfluxDBByName(InfluxDBContants.AWS).getInfluxDB()
.write(InfluxDBContants.YOHO_EVENT, "default", point);
influxDataReporter.report(InfluxDBContants.AWS,InfluxDBContants.YOHO_EVENT,point);
}
return null;
}
... ... @@ -250,8 +237,7 @@ public class NewJavaApiInfoCtrl {
.addField("ip",ip)
.addField("stack","org.apache.catalina\n.connector.ClientAbortException: java.io.IOException: 断开的管道\n\tat org.apache.catalina.connector.OutputBuffer\n.realWriteBytes(OutputBuffer.java:393)\n\tat org.apache.tomcat.util.buf.ByteChunk.flushBuffer(ByteChunk\n.java:426)\n\tat org.apache.catalina.connector.OutputBuffer.doFlush(OutputBuffer.java:342)\n\tat org\n.apache.catalina.connector.OutputBuffer.flush(OutputBuffer.java:317)\n\tat org.apache.catalina.connector\n.CoyoteOutputStream.flush(CoyoteOutputStream.java:110)\n\tat com.fasterxml.jackson.core.json.UTF8JsonGenerator\n.flush(UTF8JsonGenerator.java:1022)\n\tat com.fasterxml.jackson.databind.ObjectWriter.writeValue(ObjectWriter\n.java:891)\n\tat org.springframework.http.converter.json.AbstractJackson2HttpMessageConverter.writeInternal\n(AbstractJackson2HttpMessageConverter.java:264)\n\tat org.springframework.http.converter.AbstractGenericHttpMessageConverter\n.write(AbstractGenericHttpMessageConverter.java:100)\n\tat org.springframework.web.servlet.mvc.method\n.annotation.AbstractMessageConverterMethodProcessor.writeWithMessageConverters(AbstractMessageConverterMethodProcessor\n.java:202)\n\tat org.springframework.web.servlet.mvc.method.annotation.AbstractMessageConverterMethodProcessor\n.writeWithMessageConverters(AbstractMessageConverterMethodProcessor.java:133)\n\tat org.springframework\n.web.servlet.mvc.method.annotation.RequestResponseBodyMethodProcessor.handleReturnValue(RequestResponseBodyMethodProcessor\n.java:165)\n\tat org.springframework.web.method.support.HandlerMethodReturnValueHandlerComposite.handleReturnValue\n(HandlerMethodReturnValueHandlerComposite.java:80)\n\tat org.springframework.web.servlet.mvc.method.annotation\n.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:126)\n\tat org.springframework\n.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter\n.java:806)\n\tat org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal\n(RequestMappingHandlerAdapter.java:729)\n\tat org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter\n.handle(AbstractHandlerMethodAdapter.java:85)\n\tat org.springframework.web.servlet.DispatcherServlet\n.doDispatch(DispatcherServlet.java:959)\n\tat org.springframework.web.servlet.DispatcherServlet.doService\n(DispatcherServlet.java:893)\n\tat org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet\n.java:970)\n\tat org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:872)\n\n\tat javax.servlet.http.HttpServlet.service(HttpServlet.java:648)\n\tat org.springframework.web.servlet\n.FrameworkServlet.service(FrameworkServlet.java:846)\n\tat javax.servlet.http.HttpServlet.service(HttpServlet\n.java:729)\n\tat org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain\n.java:291)\n\tat org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java\n:206)\n\tat org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)\n\tat org.apache.catalina\n.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:239)\n\tat org.apache.catalina\n.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)\n\tat org.springframework.web\n.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:85)\n\tat org.springframework\n.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)\n\tat org.apache.catalina.core\n.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:239)\n\tat org.apache.catalina.core\n.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)\n\tat org.apache.catalina.core.StandardWrapperValve\n.invoke(StandardWrapperValve.java:217)\n\tat org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve\n.java:106)\n\tat org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:502\n)\n\tat org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:142)\n\tat org.apache\n.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:79)\n\tat org.apache.catalina.core.StandardEngineValve\n.invoke(StandardEngineValve.java:88)\n\tat org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter\n.java:518)\n\tat org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java\n:1091)\n\tat org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java\n:673)\n\tat org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1500)\n\tat\n org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.run(NioEndpoint.java:1456)\n\tat java.util.concurrent\n.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker\n.run(ThreadPoolExecutor.java:617)\n\tat org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run\n(TaskThread.java:61)\n\tat java.lang.Thread.run(Thread.java:745)\nCaused by: java.io.IOException: 断开\n的管道\n\tat sun.nio.ch.FileDispatcherImpl.write0(Native Method)\n\tat sun.nio.ch.SocketDispatcher.write\n(SocketDispatcher.java:47)\n\tat sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)\n\tat sun.nio\n.ch.IOUtil.write(IOUtil.java:65)\n\tat sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471\n)\n\tat org.apache.tomcat.util.net.NioChannel.write(NioChannel.java:124)\n\tat org.apache.tomcat.util\n.net.NioBlockingSelector.write(NioBlockingSelector.java:101)\n\tat org.apache.tomcat.util.net.NioSelectorPool\n.write(NioSelectorPool.java:172)\n\tat org.apache.coyote.http11.InternalNioOutputBuffer.writeToSocket\n(InternalNioOutputBuffer.java:139)\n\tat org.apache.coyote.http11.InternalNioOutputBuffer.addToBB(InternalNioOutputBuffer\n.java:197)\n\tat org.apache.coyote.http11.InternalNioOutputBuffer.access$000(InternalNioOutputBuffer\n.java:41)\n\tat org.apache.coyote.http11.InternalNioOutputBuffer$SocketOutputBuffer.doWrite(InternalNioOutputBuffer\n.java:320)\n\tat org.apache.coyote.http11.filters.ChunkedOutputFilter.doWrite(ChunkedOutputFilter.java\n:118)\n\tat org.apache.coyote.http11.AbstractOutputBuffer.doWrite(AbstractOutputBuffer.java:256)\n\tat\n org.apache.coyote.Response.doWrite(Response.java:501)\n\tat org.apache.catalina.connector.OutputBuffer\n.realWriteBytes(OutputBuffer.java:388)\n\t... 47 more\n")
.build();
influxDBSingle.getInfluxDBByName(InfluxDBContants.AWS).getInfluxDB()
.write(InfluxDBContants.YOMO_MONITOR, "default", point);
influxDataReporter.report(InfluxDBContants.AWS,InfluxDBContants.YOMO_MONITOR,point);
}
return null;
}
... ...