Authored by jack

并发执行zabbix查询请求;

修改测试的zookeeper地址;
... ... @@ -7,4 +7,4 @@ JavaApiExecutorPoolMaxSize=50
nginxsync.agent=192.168.102.15:6060
zookeeper.address=172.16.6.235:2181
\ No newline at end of file
zookeeper.address=192.168.102.76:2181
\ No newline at end of file
... ...
... ... @@ -17,7 +17,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.*;
/**
* Created by yoho on 2016/10/11.
... ... @@ -25,6 +25,7 @@ import java.util.concurrent.Callable;
@Data
public class ZabbixTask implements Callable {
public static final Logger DEBUG = LoggerFactory.getLogger(ZabbixTask.class);
String type;
String[] keys;
... ... @@ -38,6 +39,7 @@ public class ZabbixTask implements Callable {
this.keys = keys;
zabbixHttpComp = SpringContextUtils.getBeanByClass(ZabbixHttpComp.class);
}
@Override
... ... @@ -96,66 +98,125 @@ public class ZabbixTask implements Callable {
}
private void queryHistory(Map<String, List<ItemResponse.Item>> hostIdtoItems, Map<String, List<HistoryResponse.HistoryItem>> historyItemMap) throws IOException {
ExecutorService executorService = Executors.newFixedThreadPool(10, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
return thread;
}
});
for (Map.Entry<String, List<ItemResponse.Item>> entry : hostIdtoItems.entrySet()) {
List<ItemResponse.Item> itemList = entry.getValue();
executorService.submit(new Runnable() {
@Override
public void run() {
List<ItemResponse.Item> itemList = entry.getValue();
for (String key : keys) {
for (String key : keys) {
List<String> itemIdList = new ArrayList<>();
List<String> itemIdList = new ArrayList<>();
for (ItemResponse.Item item : itemList) {
for (ItemResponse.Item item : itemList) {
if (StringUtils.startsWith(item.getKey_(), key)) {
if (StringUtils.startsWith(item.getKey_(), key)) {
itemIdList.add(item.getItemid());
}
}
itemIdList.add(item.getItemid());
}
}
String response;
String response;
if (StringUtils.equals("system.cpu", key)) {
if (StringUtils.equals("system.cpu", key)) {
response = zabbixHttpComp.sendRequest(HistoryRequest.buildRequest(entry.getKey(), itemIdList, 0));
response = zabbixHttpComp.sendRequest(HistoryRequest.buildRequest(entry.getKey(), itemIdList, 0));
} else {
} else {
response = zabbixHttpComp.sendRequest(HistoryRequest.buildRequest(entry.getKey(), itemIdList, 3));
}
response = zabbixHttpComp.sendRequest(HistoryRequest.buildRequest(entry.getKey(), itemIdList, 3));
}
DEBUG.info("Found zabbix item response {} in items {}", response, itemIdList);
DEBUG.info("Found zabbix item response {} in items {}", response, itemIdList);
HistoryResponse historyResponse = Constants.OBJECT_MAPPER.readValue(response, HistoryResponse.class);
HistoryResponse historyResponse = null;
if (!historyItemMap.containsKey(entry.getKey())) {
try {
historyItemMap.put(entry.getKey(), historyResponse.getResult());
historyResponse = Constants.OBJECT_MAPPER.readValue(response, HistoryResponse.class);
} else {
} catch (IOException e) {
historyItemMap.get(entry.getKey()).addAll(historyResponse.getResult());
DEBUG.error("Failed to parse zabbix item response {} in items {}", response, itemIdList);
return;
}
if (!historyItemMap.containsKey(entry.getKey())) {
historyItemMap.put(entry.getKey(), historyResponse.getResult());
} else {
historyItemMap.get(entry.getKey()).addAll(historyResponse.getResult());
}
}
}
}
});
}
executorService.shutdown();
try {
executorService.awaitTermination(20, TimeUnit.SECONDS);
} catch (InterruptedException e) {
DEBUG.error("Failed to execute found zabbix history task in 20 seconds....");
}
}
private void queryItemByHostId(List<String> hostIps, Map<String, String> hostIptoIds, Map<String, List<ItemResponse.Item>> hostIdtoItems) throws IOException {
private void queryItemByHostId(List<String> hostIps, Map<String, String> hostIptoIds, Map<String, List<ItemResponse.Item>> hostIdtoItems) {
ExecutorService executorService = Executors.newFixedThreadPool(10, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
return thread;
}
});
for (String ip : hostIps) {
String hostId = hostIptoIds.get(ip);
if (null != hostId) {
String response = zabbixHttpComp.sendRequest(ItemRequest.buildRequest(hostId));
executorService.submit(new Runnable() {
@Override
public void run() {
String response = zabbixHttpComp.sendRequest(ItemRequest.buildRequest(hostId));
DEBUG.info("Found zabbix item response {} in hostId {}", response, hostId);
DEBUG.info("Found zabbix item response {} in hostId {}", response, hostId);
ItemResponse itemResponse = Constants.OBJECT_MAPPER.readValue(response, ItemResponse.class);
try {
ItemResponse itemResponse = Constants.OBJECT_MAPPER.readValue(response, ItemResponse.class);
hostIdtoItems.put(hostId, itemResponse.getResult());
hostIdtoItems.put(hostId, itemResponse.getResult());
} catch (Exception e) {
DEBUG.error("Failed to parese zabbix item {} in hostid {}", response, hostId);
}
}
});
}
}
executorService.shutdown();
try {
executorService.awaitTermination(20, TimeUnit.SECONDS);
} catch (InterruptedException e) {
DEBUG.error("Failed to execute found zabbix item task in 20 seconds....");
}
}
private void queryHostIdByIp(List<String> hostIps, Map<String, String> hostIptoIds) throws IOException {
... ...