Authored by jack

线程内完成写influxdb任务

... ... @@ -29,5 +29,7 @@ public interface Constants {
Map<String, String> HOSTIP_TO_IDS = new HashMap<>();
Map<String, String> HOSTID_TO_IPS = new HashMap<>();
Map<String, List<ItemResponse.Item>> HOSTID_TO_ITEMS = new HashMap<>();
}
... ...
... ... @@ -63,7 +63,7 @@ public class PointBuilder {
String itemId = StringUtils.EMPTY;
String itemValue = StringUtils.EMPTY;
String itemValue = "0";
try {
for (ItemResponse.Item item : items) {
... ...
... ... @@ -19,6 +19,7 @@ import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;
import static com.monitor.zabbix.constants.Constants.HOSTID_TO_IPS;
import static com.monitor.zabbix.constants.Constants.HOSTID_TO_ITEMS;
import static com.monitor.zabbix.constants.Constants.HOSTIP_TO_IDS;
... ... @@ -59,18 +60,12 @@ public class ZabbixTask implements Callable {
queryItemByHostId(hostIps);
//根据 hostid itemid 取值
Map<String, List<HistoryResponse.HistoryItem>> historyItemMap = new HashMap<>();
queryHistory(historyItemMap);
buildBatchPoints(historyItemMap);
queryHistory();
} catch (Exception e) {
DEBUG.error("Failed to execute zabbix task...error {}", e);
}
return null;
}
... ... @@ -97,7 +92,7 @@ public class ZabbixTask implements Callable {
return false;
}
private void queryHistory(Map<String, List<HistoryResponse.HistoryItem>> historyItemMap) throws IOException {
private void queryHistory() throws IOException {
ExecutorService executorService = Executors.newFixedThreadPool(10, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
... ... @@ -111,6 +106,9 @@ public class ZabbixTask implements Callable {
executorService.submit(new Runnable() {
@Override
public void run() {
List<HistoryResponse.HistoryItem> itemsHistory = new ArrayList<HistoryResponse.HistoryItem>();
List<ItemResponse.Item> itemList = entry.getValue();
for (String key : keys) {
... ... @@ -151,15 +149,10 @@ public class ZabbixTask implements Callable {
return;
}
if (!historyItemMap.containsKey(entry.getKey())) {
historyItemMap.put(entry.getKey(), historyResponse.getResult());
} else {
historyItemMap.get(entry.getKey()).addAll(historyResponse.getResult());
}
itemsHistory.addAll(historyResponse.getResult());
}
buildBatchPoints(HOSTID_TO_IPS.get(entry.getKey()), entry.getKey(), itemsHistory);
}
});
}
... ... @@ -259,32 +252,21 @@ public class ZabbixTask implements Callable {
for (HostInterfaceResponse.HostInterface hostInterface : hostInterfaces) {
HOSTIP_TO_IDS.put(hostInterface.getIp(), hostInterface.getHostid());
HOSTID_TO_IPS.put(hostInterface.getHostid(), hostInterface.getIp());
}
}
public void buildBatchPoints(Map<String, List<HistoryResponse.HistoryItem>> historyItems) {
public void buildBatchPoints(String ip, String hostId, List<HistoryResponse.HistoryItem> historyItems) {
DEBUG.info("start to build zabbix batch points...");
BatchPoints pointBp = BatchPoints.database(Constants.INFLUX_DB).retentionPolicy(Constants.INFLUX_POLICY).build();
try {
for (Map.Entry<String, String> entry : HOSTIP_TO_IDS.entrySet()) {
//zabbix中未找到hostid
if (StringUtils.isBlank(entry.getValue())) {
continue;
}
DEBUG.info("Build point by history items {}", historyItems);
Point point = buildPoints(entry.getKey(), HOSTID_TO_ITEMS.get(entry.getValue()), historyItems.get(entry.getValue()));
pointBp.point(point);
BatchPoints pointBp = BatchPoints.database(Constants.INFLUX_DB).retentionPolicy(Constants.INFLUX_POLICY).build();
}
} catch (Exception e) {
Point point = buildPoints(ip, HOSTID_TO_ITEMS.get(hostId), historyItems);
DEBUG.error(" Failed to build zabbix point ,error {}", e);
}
pointBp.point(point);
DEBUG.info("Start to write batch points {} into zabbix - influxdb", pointBp);
... ... @@ -296,8 +278,6 @@ public class ZabbixTask implements Callable {
private Point buildPoints(String ip, List<ItemResponse.Item> items, List<HistoryResponse.HistoryItem> historyItems) {
DEBUG.info("Build point by history items {}" ,historyItems);
Point point = PointBuilder.buildPoint(type, ip, items, historyItems);
return point;
... ...