|
|
package com.monitor.zabbix.service;
|
|
|
|
|
|
import com.model.HostInfo;
|
|
|
import com.monitor.cmdb.service.impl.HostInfoServiceImpl;
|
|
|
import com.monitor.common.util.SpringContextUtils;
|
|
|
import com.monitor.zabbix.comp.InfluxdbComp;
|
|
|
import com.monitor.zabbix.comp.ZabbixHttpComp;
|
|
|
import com.monitor.zabbix.constants.Constants;
|
|
|
import com.monitor.zabbix.impl.PointBuilder;
|
|
|
import com.monitor.zabbix.model.*;
|
|
|
import lombok.Data;
|
|
|
import org.apache.commons.lang.StringUtils;
|
|
|
import org.influxdb.dto.BatchPoints;
|
|
|
import org.influxdb.dto.Point;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.util.*;
|
|
|
import java.util.concurrent.*;
|
|
|
|
|
|
import static com.monitor.zabbix.constants.Constants.HOSTID_TO_IPS;
|
|
|
import static com.monitor.zabbix.constants.Constants.HOSTID_TO_ITEMS;
|
|
|
import static com.monitor.zabbix.constants.Constants.HOSTIP_TO_IDS;
|
|
|
|
|
|
/**
|
|
|
* Created by yoho on 2016/10/11.
|
|
|
*/
|
|
|
@Data
|
|
|
public class ZabbixTask implements Callable {
|
|
|
public static final Logger DEBUG = LoggerFactory.getLogger(ZabbixTask.class);
|
|
|
|
|
|
String type;
|
|
|
|
|
|
String[] keys;
|
|
|
|
|
|
ZabbixHttpComp zabbixHttpComp;
|
|
|
|
|
|
public ZabbixTask(String type, String[] keys) {
|
|
|
|
|
|
this.type = type;
|
|
|
|
|
|
this.keys = keys;
|
|
|
|
|
|
zabbixHttpComp = SpringContextUtils.getBeanByClass(ZabbixHttpComp.class);
|
|
|
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public Object call() {
|
|
|
List<String> hostIps = new ArrayList<>();
|
|
|
|
|
|
if (queryHostByType(hostIps)) {
|
|
|
return null;
|
|
|
}
|
|
|
try {
|
|
|
//根据ip找到hostid
|
|
|
queryHostIdByIp(hostIps);
|
|
|
|
|
|
queryItemByHostId(hostIps);
|
|
|
|
|
|
//根据 hostid itemid 取值
|
|
|
queryHistory();
|
|
|
} catch (Exception e) {
|
|
|
|
|
|
DEBUG.error("Failed to execute zabbix task...error {}", e);
|
|
|
}
|
|
|
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
private boolean queryHostByType(List<String> hostIps) {
|
|
|
HostInfoServiceImpl iHostInfoService = SpringContextUtils.getBeanByClass(com.monitor.cmdb.service.impl.HostInfoServiceImpl.class);
|
|
|
|
|
|
List<HostInfo> hostInfoList = iHostInfoService.getHostInfosByTag(type);
|
|
|
|
|
|
DEBUG.info("Found host {} in type {}", hostInfoList, type);
|
|
|
|
|
|
if (hostInfoList.isEmpty()) {
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
|
|
|
for (HostInfo hostInfo : hostInfoList) {
|
|
|
|
|
|
hostIps.add(hostInfo.getHostIp());
|
|
|
|
|
|
if (!HOSTIP_TO_IDS.containsKey(hostInfo.getHostIp())) {
|
|
|
HOSTIP_TO_IDS.put(hostInfo.getHostIp(), null);
|
|
|
}
|
|
|
}
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
private void queryHistory() throws IOException {
|
|
|
ExecutorService executorService = Executors.newFixedThreadPool(10, new ThreadFactory() {
|
|
|
@Override
|
|
|
public Thread newThread(Runnable r) {
|
|
|
Thread thread = new Thread(r);
|
|
|
thread.setDaemon(true);
|
|
|
return thread;
|
|
|
}
|
|
|
});
|
|
|
for (Map.Entry<String, List<ItemResponse.Item>> entry : HOSTID_TO_ITEMS.entrySet()) {
|
|
|
|
|
|
executorService.submit(new Runnable() {
|
|
|
@Override
|
|
|
public void run() {
|
|
|
|
|
|
List<HistoryResponse.HistoryItem> itemsHistory = new ArrayList<HistoryResponse.HistoryItem>();
|
|
|
|
|
|
List<ItemResponse.Item> itemList = entry.getValue();
|
|
|
|
|
|
for (String key : keys) {
|
|
|
|
|
|
List<String> itemIdList = new ArrayList<>();
|
|
|
|
|
|
for (ItemResponse.Item item : itemList) {
|
|
|
|
|
|
if (StringUtils.startsWith(item.getKey_(), key)) {
|
|
|
|
|
|
itemIdList.add(item.getItemid());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
String response;
|
|
|
|
|
|
if (StringUtils.equals("system.cpu", key)) {
|
|
|
|
|
|
response = zabbixHttpComp.sendRequest(HistoryRequest.buildRequest(entry.getKey(), itemIdList, 0));
|
|
|
|
|
|
} else {
|
|
|
|
|
|
response = zabbixHttpComp.sendRequest(HistoryRequest.buildRequest(entry.getKey(), itemIdList, 3));
|
|
|
}
|
|
|
|
|
|
DEBUG.info("Found zabbix history response {} in items {}", response, itemIdList);
|
|
|
|
|
|
HistoryResponse historyResponse = null;
|
|
|
|
|
|
try {
|
|
|
|
|
|
historyResponse = Constants.OBJECT_MAPPER.readValue(response, HistoryResponse.class);
|
|
|
|
|
|
} catch (IOException e) {
|
|
|
|
|
|
DEBUG.error("Failed to parse zabbix history response {} in items {}", response, itemIdList);
|
|
|
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
itemsHistory.addAll(historyResponse.getResult());
|
|
|
}
|
|
|
|
|
|
buildBatchPoints(HOSTID_TO_IPS.get(entry.getKey()), entry.getKey(), itemsHistory);
|
|
|
}
|
|
|
});
|
|
|
}
|
|
|
executorService.shutdown();
|
|
|
|
|
|
try {
|
|
|
executorService.awaitTermination(40, TimeUnit.SECONDS);
|
|
|
|
|
|
} catch (InterruptedException e) {
|
|
|
|
|
|
DEBUG.error("Failed to execute found zabbix history task in 20 seconds....");
|
|
|
}
|
|
|
}
|
|
|
|
|
|
private void queryItemByHostId(List<String> hostIps) {
|
|
|
|
|
|
ExecutorService executorService = Executors.newFixedThreadPool(10, new ThreadFactory() {
|
|
|
@Override
|
|
|
public Thread newThread(Runnable r) {
|
|
|
Thread thread = new Thread(r);
|
|
|
thread.setDaemon(true);
|
|
|
return thread;
|
|
|
}
|
|
|
});
|
|
|
for (String ip : hostIps) {
|
|
|
|
|
|
String hostId = HOSTIP_TO_IDS.get(ip);
|
|
|
|
|
|
if (null != hostId && null != HOSTID_TO_ITEMS.get(hostId)) {
|
|
|
|
|
|
DEBUG.info("Found hostid {} items in cache...no need to query zabbix server...", hostId);
|
|
|
|
|
|
continue;
|
|
|
}
|
|
|
//如果没有查询过该hostid的items信息,则查询
|
|
|
if (null != hostId) {
|
|
|
|
|
|
executorService.submit(new Runnable() {
|
|
|
@Override
|
|
|
public void run() {
|
|
|
String response = zabbixHttpComp.sendRequest(ItemRequest.buildRequest(hostId));
|
|
|
|
|
|
DEBUG.info("Found zabbix item response {} in hostId {}", response, hostId);
|
|
|
|
|
|
try {
|
|
|
ItemResponse itemResponse = Constants.OBJECT_MAPPER.readValue(response, ItemResponse.class);
|
|
|
|
|
|
//缓存items记录到内存中,省去下次查询耗时
|
|
|
HOSTID_TO_ITEMS.put(hostId, itemResponse.getResult());
|
|
|
} catch (Exception e) {
|
|
|
|
|
|
DEBUG.error("Failed to parese zabbix item {} in hostid {}", response, hostId);
|
|
|
}
|
|
|
}
|
|
|
});
|
|
|
}
|
|
|
}
|
|
|
executorService.shutdown();
|
|
|
|
|
|
try {
|
|
|
executorService.awaitTermination(20, TimeUnit.SECONDS);
|
|
|
} catch (InterruptedException e) {
|
|
|
|
|
|
DEBUG.error("Failed to execute found zabbix item task in 20 seconds....");
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
private void queryHostIdByIp(List<String> hostIps) throws IOException {
|
|
|
|
|
|
//ip 找 hostid , 如果已存在,则跳过
|
|
|
List<String> queryHostIdsList = new ArrayList<>();
|
|
|
|
|
|
for (String ip : hostIps) {
|
|
|
|
|
|
if (null == HOSTIP_TO_IDS.get(ip)) {
|
|
|
|
|
|
queryHostIdsList.add(ip);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
if (queryHostIdsList.isEmpty()) {
|
|
|
|
|
|
DEBUG.info("Not found new host update.... no need to query zabbix...");
|
|
|
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
String response = zabbixHttpComp.sendRequest(HostInterfaceRequest.buildRequest(queryHostIdsList));
|
|
|
|
|
|
DEBUG.info("Found zabbix interface response {} in type {}", response, type);
|
|
|
|
|
|
HostInterfaceResponse hostInterfaceResponse = Constants.OBJECT_MAPPER.readValue(response, HostInterfaceResponse.class);
|
|
|
|
|
|
List<HostInterfaceResponse.HostInterface> hostInterfaces = hostInterfaceResponse.getResult();
|
|
|
|
|
|
for (HostInterfaceResponse.HostInterface hostInterface : hostInterfaces) {
|
|
|
|
|
|
HOSTIP_TO_IDS.put(hostInterface.getIp(), hostInterface.getHostid());
|
|
|
|
|
|
HOSTID_TO_IPS.put(hostInterface.getHostid(), hostInterface.getIp());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public void buildBatchPoints(String ip, String hostId, List<HistoryResponse.HistoryItem> historyItems) {
|
|
|
DEBUG.info("start to build zabbix batch points...");
|
|
|
|
|
|
DEBUG.info("Build point by history items {}", historyItems);
|
|
|
|
|
|
BatchPoints pointBp = BatchPoints.database(Constants.INFLUX_DB).retentionPolicy(Constants.INFLUX_POLICY).build();
|
|
|
|
|
|
Point point = buildPoints(ip, HOSTID_TO_ITEMS.get(hostId), historyItems);
|
|
|
|
|
|
pointBp.point(point);
|
|
|
|
|
|
DEBUG.info("Start to write batch points {} into zabbix - influxdb", pointBp);
|
|
|
|
|
|
InfluxdbComp influxDBClient = (InfluxdbComp) SpringContextUtils.getBeanByClass(InfluxdbComp.class);
|
|
|
|
|
|
influxDBClient.doWritePoints(pointBp);
|
|
|
|
|
|
}
|
|
|
|
|
|
private Point buildPoints(String ip, List<ItemResponse.Item> items, List<HistoryResponse.HistoryItem> historyItems) {
|
|
|
|
|
|
Point point = PointBuilder.buildPoint(type, ip, items, historyItems);
|
|
|
|
|
|
return point;
|
|
|
}
|
|
|
|
|
|
} |