Authored by jack.xue

add rabbitmq monitor

Showing 23 changed files with 934 additions and 7 deletions
@@ -12,5 +12,28 @@ @@ -12,5 +12,28 @@
12 <artifactId>monitor-service-middleware</artifactId> 12 <artifactId>monitor-service-middleware</artifactId>
13 <version>1.0-SNAPSHOT</version> 13 <version>1.0-SNAPSHOT</version>
14 14
  15 + <dependencies>
  16 + <!-- internal -->
  17 + <dependency>
  18 + <groupId>monitor-service</groupId>
  19 + <artifactId>monitor-service-common</artifactId>
  20 + </dependency>
  21 +
  22 + <dependency>
  23 + <groupId>monitor-service</groupId>
  24 + <artifactId>monitor-service-cmdb</artifactId>
  25 + </dependency>
  26 +
  27 + <!-- 3rd -->
  28 + <dependency>
  29 + <groupId>org.projectlombok</groupId>
  30 + <artifactId>lombok</artifactId>
  31 + </dependency>
  32 +
  33 + <dependency>
  34 + <groupId>org.influxdb</groupId>
  35 + <artifactId>influxdb-java</artifactId>
  36 + </dependency>
  37 + </dependencies>
15 38
16 </project> 39 </project>
1 -package com.monitor.middleware.rabbitmq;  
2 -  
3 -/**  
4 - * Created by zhengyouwei on 2016/6/20.  
5 - */  
6 -public class Test {  
7 -}  
  1 +package com.monitor.middleware.rabbitmq.component;
  2 +
  3 +import com.monitor.middleware.rabbitmq.constant.InterVar;
  4 +import com.monitor.middleware.rabbitmq.model.PointView;
  5 +import org.influxdb.InfluxDB;
  6 +import org.influxdb.InfluxDBFactory;
  7 +import org.influxdb.dto.BatchPoints;
  8 +import org.influxdb.dto.Point;
  9 +import org.influxdb.dto.Query;
  10 +import org.influxdb.dto.QueryResult;
  11 +import org.slf4j.Logger;
  12 +import org.slf4j.LoggerFactory;
  13 +import org.springframework.beans.factory.annotation.Autowired;
  14 +import org.springframework.beans.factory.annotation.Value;
  15 +import org.springframework.stereotype.Component;
  16 +
  17 +import javax.annotation.PostConstruct;
  18 +import java.util.List;
  19 +
  20 +/**
  21 + * Created by yoho on 2016/6/21.
  22 + */
  23 +@Component
  24 +public class InfluxComp {
  25 +
  26 + public static final Logger DEBUG = LoggerFactory.getLogger(InfluxComp.class);
  27 +
  28 + @Value("influxUrl")
  29 + private String influxUrl;
  30 +
  31 + @Value("influxUser")
  32 + private String influxUser;
  33 +
  34 + @Value("influxPwd")
  35 + private String influxPwd;
  36 +
  37 + private InfluxDB influxDBClient;
  38 +
  39 + public InfluxComp() {
  40 + influxDBClient = InfluxDBFactory.connect(this.influxUrl, this.influxUser, this.influxPwd);
  41 +
  42 + }
  43 +
  44 + @PostConstruct
  45 + public void doService() {
  46 +
  47 + while (true) {
  48 +
  49 + if (!InterVar.POINT_QUEUE.isEmpty()) {
  50 +
  51 + PointView pointView = InterVar.POINT_QUEUE.poll();
  52 +
  53 + influxDBClient.createDatabase(InterVar.DBNAME);
  54 +
  55 + influxDBClient.write(pointView.toPoint());
  56 +
  57 + }
  58 +
  59 + try {
  60 + Thread.sleep(100L);
  61 +
  62 + } catch (InterruptedException e) {
  63 +
  64 + DEBUG.error("Interrupt watch point_queue , error {}", e);
  65 + }
  66 + }
  67 + }
  68 +
  69 +
  70 + public QueryResult doQuery(String command) {
  71 +
  72 + Query query=new Query(command,InterVar.DBNAME);
  73 +
  74 + return influxDBClient.query(query);
  75 + }
  76 +}
  1 +package com.monitor.middleware.rabbitmq.constant;
  2 +
  3 +import com.fasterxml.jackson.databind.DeserializationFeature;
  4 +import com.fasterxml.jackson.databind.ObjectMapper;
  5 +import com.model.MObjectInfo;
  6 +import com.monitor.middleware.rabbitmq.model.PointView;
  7 +import org.influxdb.dto.BatchPoints;
  8 +
  9 +import javax.print.DocFlavor;
  10 +import java.util.concurrent.*;
  11 +
  12 +/**
  13 + * Created by yoho on 2016/6/21.
  14 + */
  15 +public interface InterVar {
  16 +
  17 + ObjectMapper JSONMAPPER = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
  18 +
  19 + ConcurrentHashMap<Integer, MObjectInfo> moMaps = new ConcurrentHashMap<>();
  20 +
  21 + ConcurrentHashMap<Integer, Integer> nodesCountMaps = new ConcurrentHashMap<>();
  22 +
  23 + ConcurrentHashMap<Integer, Integer> queueCountMaps = new ConcurrentHashMap<>();
  24 +
  25 + ConcurrentLinkedQueue<PointView> POINT_QUEUE = new ConcurrentLinkedQueue<>();
  26 +
  27 + ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(50);
  28 +
  29 + String OVERVIEW_URL = "/api/overview";
  30 +
  31 + String NODES_URL = "/api/nodes";
  32 +
  33 + String QUEUE_URL = "/api/queues";
  34 +
  35 + String MOMATCHPREFIX = "*rabbitmq*";
  36 +
  37 + String URLFORMAT = "http://{}:{}";
  38 +
  39 + String DBNAME = "rabbitmq_info";
  40 +
  41 + String OVERVIEW_MEASURE = "overview_info";
  42 +
  43 + String CLUSTERVIEW_MEASURE = "cluster_info";
  44 +
  45 + String QUEUEVIEW_MEASURE = "queue_info";
  46 +
  47 + String DBTAG = "moId";
  48 +
  49 + int MONIT_TIMEOUT = 10;
  50 +
  51 + String OVERVIEW_SQL="SELECT * FROM " + InterVar.OVERVIEW_MEASURE + " WHERE moId = \'{}\' ORDER BY time DESC LIMIT 1";
  52 +
  53 + String CLUSTERVIEW_SQL="SELECT * FROM " + InterVar.CLUSTERVIEW_MEASURE + "WHERE moId = \'{}\' ORDER BY time DESC LIMIT ";
  54 +
  55 + String QUEUEVIEW_SQL="SELECT * FROM " + InterVar.QUEUEVIEW_MEASURE + "WHERE moId = \'{}\' ORDER BY time DESC LIMIT ";
  56 +
  57 +}
  1 +package com.monitor.middleware.rabbitmq.model;
  2 +
  3 +import com.monitor.middleware.rabbitmq.constant.InterVar;
  4 +import lombok.Data;
  5 +import org.influxdb.dto.BatchPoints;
  6 +import org.influxdb.dto.Point;
  7 +
  8 +import java.util.ArrayList;
  9 +import java.util.List;
  10 +
  11 +/**
  12 + * Created by yoho on 2016/6/21.
  13 + */
  14 +@Data
  15 +public class ClusterView implements PointView {
  16 +
  17 + private int moId;
  18 +
  19 + List<NodeInfo> clusterView;
  20 +
  21 + @Override
  22 + public BatchPoints toPoint() {
  23 + BatchPoints pointBp = BatchPoints.database(InterVar.DBNAME).retentionPolicy("3d").build();
  24 +
  25 + for (NodeInfo nodeInfo : clusterView) {
  26 +
  27 + Point point = Point.measurement(InterVar.CLUSTERVIEW_MEASURE)
  28 + .tag(InterVar.DBTAG, Integer.toString(this.getMoId()))
  29 + .addField("name", nodeInfo.getName())
  30 + .addField("state", nodeInfo.isState() ? "running" : "idle")
  31 + .addField("fd_used", nodeInfo.getFd_used())
  32 + .addField("fd_total", nodeInfo.getFd_total())
  33 + .addField("mem_used", nodeInfo.getMem_used())
  34 + .addField("mem_limit", nodeInfo.getMem_limit())
  35 + .addField("mem_alarm", nodeInfo.isMem_alarm())
  36 + .addField("disk_free", nodeInfo.getDisk_free())
  37 + .addField("disk_free_limit", nodeInfo.getDisk_free_limit())
  38 + .addField("disk_free_alarm", nodeInfo.isDisk_free_alarm())
  39 + .addField("proc_used", nodeInfo.getProc_used())
  40 + .addField("proc_total", nodeInfo.getProc_total())
  41 + .addField("sockets_used", nodeInfo.getSockets_used())
  42 + .addField("sockets_total", nodeInfo.getSockets_total())
  43 + .addField("os_pid", nodeInfo.getOs_pid())
  44 + .addField("io_read_bytes", nodeInfo.getIo_read_bytes())
  45 + .addField("io_write_bytes", nodeInfo.getIo_write_bytes())
  46 + .addField("io_read_bytes_rate", nodeInfo.getIo_read_bytes_rate().getRate())
  47 + .addField("io_write_bytes_rate", nodeInfo.getIo_write_bytes_rate().getRate())
  48 + .addField("ip", nodeInfo.getClusters().get(0).getIp())
  49 + .addField("port", nodeInfo.getClusters().get(0).getPort())
  50 + .build();
  51 +
  52 + pointBp.point(point);
  53 +
  54 + }
  55 + InterVar.nodesCountMaps.put(moId, clusterView.size());
  56 + return pointBp;
  57 + }
  58 +}
  1 +package com.monitor.middleware.rabbitmq.model;
  2 +
  3 +import lombok.Data;
  4 +
  5 +/**
  6 + * Created by yoho on 2016/6/21.
  7 + */
  8 +@Data
  9 +public class GlobalCount {
  10 + int connections;
  11 +
  12 + int channels;
  13 +
  14 + int exchanges;
  15 +
  16 + int queues;
  17 +
  18 + int consumers;
  19 +}
  1 +package com.monitor.middleware.rabbitmq.model;
  2 +
  3 +import com.fasterxml.jackson.annotation.JsonProperty;
  4 +import lombok.Data;
  5 +
  6 +/**
  7 + * Created by yoho on 2016/6/21.
  8 + */
  9 +@Data
  10 +public class Host {
  11 + @JsonProperty("peer_addr")
  12 + String ip;
  13 +
  14 + @JsonProperty("peer_port")
  15 + int port;
  16 +}
  1 +package com.monitor.middleware.rabbitmq.model;
  2 +
  3 +import com.fasterxml.jackson.annotation.JsonProperty;
  4 +import lombok.Data;
  5 +
  6 +/**
  7 + * Created by yoho on 2016/6/21.
  8 + */
  9 +@Data
  10 +public class Message {
  11 +
  12 + @JsonProperty("messages_ready")
  13 + int readyMsg;
  14 +
  15 + @JsonProperty("messages_unacknowledged")
  16 + int unackMsg;
  17 +
  18 + @JsonProperty("messages")
  19 + int totalMsg;
  20 +}
  1 +package com.monitor.middleware.rabbitmq.model;
  2 +
  3 +import lombok.Data;
  4 +
  5 +/**
  6 + * Created by yoho on 2016/6/21.
  7 + */
  8 +@Data
  9 +public class MoInfo {
  10 +
  11 + private int moId;
  12 +
  13 + private String moName;
  14 +
  15 + private int moTypes;
  16 +
  17 + private int mo_ip;
  18 +}
  1 +package com.monitor.middleware.rabbitmq.model;
  2 +
  3 +import com.fasterxml.jackson.annotation.JsonProperty;
  4 +import com.monitor.middleware.rabbitmq.constant.InterVar;
  5 +import lombok.Data;
  6 +
  7 +import java.io.IOException;
  8 +import java.util.ArrayList;
  9 +import java.util.Arrays;
  10 +import java.util.List;
  11 +
  12 +import static com.monitor.middleware.rabbitmq.constant.InterVar.JSONMAPPER;
  13 +
  14 +/**
  15 + * Created by yoho on 2016/6/21.
  16 + */
  17 +@Data
  18 +public class NodeInfo {
  19 + String name;
  20 +
  21 + @JsonProperty("running")
  22 + boolean state;
  23 +
  24 + int fd_used;
  25 +
  26 + int fd_total;
  27 +
  28 + long mem_used;
  29 +
  30 + long mem_limit;
  31 +
  32 + boolean mem_alarm;
  33 +
  34 + long disk_free;
  35 +
  36 + long disk_free_limit;
  37 +
  38 + boolean disk_free_alarm;
  39 +
  40 + int proc_used;
  41 +
  42 + int proc_total;
  43 +
  44 + int sockets_used;
  45 +
  46 + int sockets_total;
  47 +
  48 + int os_pid;
  49 +
  50 + long io_read_bytes;
  51 +
  52 + long io_write_bytes;
  53 +
  54 + //need set un-auto
  55 + @JsonProperty("io_read_bytes_details")
  56 + IoReadRate io_read_bytes_rate;
  57 +
  58 + @JsonProperty("io_write_bytes_details")
  59 + IoWriteRate io_write_bytes_rate;
  60 +
  61 + @JsonProperty("cluster_links")
  62 + List<Host> clusters;
  63 +
  64 + @Data
  65 + class IoReadRate{
  66 + double rate;
  67 + }
  68 +
  69 + @Data
  70 + class IoWriteRate{
  71 + double rate;
  72 + }
  73 +
  74 +}
  75 +
  76 +
  1 +package com.monitor.middleware.rabbitmq.model;
  2 +
  3 +import com.monitor.middleware.rabbitmq.constant.InterVar;
  4 +import lombok.Data;
  5 +import org.influxdb.dto.BatchPoints;
  6 +import org.influxdb.dto.Point;
  7 +
  8 +import java.util.ArrayList;
  9 +import java.util.List;
  10 +
  11 +/**
  12 + * Created by yoho on 2016/6/21.
  13 + */
  14 +@Data
  15 +public class OverView implements PointView {
  16 +
  17 + int moId;
  18 +
  19 + GlobalCount object_totals;
  20 +
  21 + Message queue_totals;
  22 +
  23 + @Override
  24 + public BatchPoints toPoint() {
  25 +
  26 + BatchPoints pointBp = BatchPoints.database(InterVar.DBNAME).retentionPolicy("3d").build();
  27 +
  28 + Point point = Point.measurement(InterVar.OVERVIEW_MEASURE)
  29 + .tag(InterVar.DBTAG, Integer.toString(this.getMoId()))
  30 + .addField("connections", this.getObject_totals().getConnections())
  31 + .addField("channels", this.getObject_totals().getChannels())
  32 + .addField("exchanges", this.getObject_totals().getExchanges())
  33 + .addField("queues", this.getObject_totals().getQueues())
  34 + .addField("consumers", this.getObject_totals().getConsumers())
  35 + .addField("messages_ready", this.getQueue_totals().getReadyMsg())
  36 + .addField("messages_unacknowledged", this.getQueue_totals().getUnackMsg())
  37 + .addField("messages", this.getQueue_totals().getTotalMsg())
  38 + .build();
  39 +
  40 + pointBp.point(point);
  41 +
  42 + return pointBp;
  43 + }
  44 +}
  45 +
  1 +package com.monitor.middleware.rabbitmq.model;
  2 +
  3 +import org.influxdb.dto.BatchPoints;
  4 +
  5 +/**
  6 + * Created by yoho on 2016/6/21.
  7 + */
  8 +public interface PointView {
  9 +
  10 + BatchPoints toPoint();
  11 +}
  1 +package com.monitor.middleware.rabbitmq.model;
  2 +
  3 +import lombok.Data;
  4 +
  5 +/**
  6 + * Created by yoho on 2016/6/21.
  7 + */
  8 +@Data
  9 +public class QueryRequest {
  10 +
  11 + String moId;
  12 +}
  1 +package com.monitor.middleware.rabbitmq.model;
  2 +
  3 +import lombok.Data;
  4 +
  5 +/**
  6 + * Created by yoho on 2016/6/21.
  7 + */
  8 +@Data
  9 +public class QueueInfo {
  10 +
  11 + String name;
  12 +
  13 + String vhost;
  14 +
  15 + String node;
  16 +
  17 + int messages;
  18 +
  19 + int messages_ready;
  20 +
  21 + int messages_unacknowledged;
  22 +
  23 + int consumers;
  24 +
  25 + String state;
  26 +
  27 + String idle_since;
  28 +
  29 + boolean isWatch = false;
  30 +}
  1 +package com.monitor.middleware.rabbitmq.model;
  2 +
  3 +import com.monitor.middleware.rabbitmq.constant.InterVar;
  4 +import lombok.Data;
  5 +import org.influxdb.dto.BatchPoints;
  6 +import org.influxdb.dto.Point;
  7 +
  8 +
  9 +import java.util.ArrayList;
  10 +import java.util.List;
  11 +
  12 +/**
  13 + * Created by yoho on 2016/6/21.
  14 + */
  15 +@Data
  16 +public class QueueView implements PointView {
  17 + int moId;
  18 +
  19 + List<QueueInfo> queueView;
  20 +
  21 + @Override
  22 + public BatchPoints toPoint() {
  23 +
  24 + BatchPoints pointBp = BatchPoints.database(InterVar.DBNAME).retentionPolicy("3d").build();
  25 +
  26 + for (QueueInfo queueInfo : queueView) {
  27 +
  28 + Point point = Point.measurement(InterVar.QUEUEVIEW_MEASURE)
  29 + .tag(InterVar.DBTAG, Integer.toString(this.getMoId()))
  30 + .addField("name", queueInfo.getName())
  31 + .addField("vhost", queueInfo.getVhost())
  32 + .addField("node", queueInfo.getNode())
  33 + .addField("messages", queueInfo.getMessages())
  34 + .addField("messages_ready", queueInfo.getMessages_ready())
  35 + .addField("messages_unacknowledged", queueInfo.getMessages_unacknowledged())
  36 + .addField("messages_ready", queueInfo.getMessages_ready())
  37 + .addField("state", queueInfo.getState())
  38 + .addField("idle_since", queueInfo.getIdle_since())
  39 + .build();
  40 +
  41 + pointBp.point(point);
  42 + }
  43 +
  44 + InterVar.queueCountMaps.put(moId, queueView.size());
  45 +
  46 + return pointBp;
  47 + }
  48 +}
  1 +package com.monitor.middleware.rabbitmq.service;
  2 +
  3 +import com.monitor.middleware.rabbitmq.component.InfluxComp;
  4 +import com.monitor.middleware.rabbitmq.constant.InterVar;
  5 +import com.monitor.middleware.rabbitmq.model.QueryRequest;
  6 +import com.monitor.model.response.BaseResponse;
  7 +import org.influxdb.InfluxDB;
  8 +import org.influxdb.dto.Query;
  9 +import org.influxdb.dto.QueryResult;
  10 +import org.slf4j.Logger;
  11 +import org.slf4j.LoggerFactory;
  12 +import org.springframework.beans.factory.annotation.Autowired;
  13 +import org.springframework.web.bind.annotation.RequestBody;
  14 +import org.springframework.web.bind.annotation.RequestMapping;
  15 +import org.springframework.web.bind.annotation.RestController;
  16 +
  17 +/**
  18 + * Created by yoho on 2016/6/21.
  19 + */
  20 +@RestController(value = "/middleware/rabbitmq")
  21 +public class RabbitmqService {
  22 + public static final Logger DEBUG = LoggerFactory.getLogger(RabbitmqService.class);
  23 +
  24 + @Autowired
  25 + InfluxComp influxComp;
  26 +
  27 + @RequestMapping(value = "/overview")
  28 + public BaseResponse queryOverview(@RequestBody QueryRequest request) {
  29 +
  30 + BaseResponse response = new BaseResponse();
  31 + QueryResult result = null;
  32 + try {
  33 + result = this.influxComp.doQuery(String.format(InterVar.OVERVIEW_SQL, request.getMoId()));
  34 + } catch (Exception e) {
  35 + DEBUG.error("Failed to query overview about mo {}", request.getMoId());
  36 + response.setData(e);
  37 + }
  38 + response.setData(result);
  39 +
  40 + return response;
  41 + }
  42 +
  43 +
  44 + @RequestMapping(value = "/cluster")
  45 + public BaseResponse queryCluster(@RequestBody QueryRequest request) {
  46 +
  47 + BaseResponse response = new BaseResponse();
  48 + QueryResult result = null;
  49 + try {
  50 + result = this.influxComp.doQuery(String.format(InterVar.CLUSTERVIEW_SQL, request.getMoId()) + InterVar.nodesCountMaps.get(request.getMoId()));
  51 + } catch (Exception e) {
  52 + DEBUG.error("Failed to query overview about mo {}", request.getMoId());
  53 + response.setData(e);
  54 + }
  55 + response.setData(result);
  56 +
  57 + return response;
  58 + }
  59 +
  60 +
  61 + @RequestMapping(value = "/queue")
  62 + public BaseResponse queryQueue(@RequestBody QueryRequest request) {
  63 +
  64 + BaseResponse response = new BaseResponse();
  65 + QueryResult result = null;
  66 + try {
  67 + result = this.influxComp.doQuery(String.format(InterVar.QUEUEVIEW_SQL, request.getMoId()) + InterVar.queueCountMaps.get(request.getMoId()));
  68 +
  69 +
  70 + } catch (Exception e) {
  71 + DEBUG.error("Failed to query overview about mo {}", request.getMoId());
  72 + response.setData(e);
  73 + }
  74 + response.setData(result);
  75 +
  76 + return response;
  77 + }
  78 +
  79 +}
  1 +package com.monitor.middleware.rabbitmq.task;
  2 +
  3 +import com.model.MObjectInfo;
  4 +import com.model.TypeInfo;
  5 +import com.monitor.cmdb.service.IMObjectInfoService;
  6 +import com.monitor.cmdb.service.ITypeInfoService;
  7 +import com.monitor.middleware.rabbitmq.constant.InterVar;
  8 +import org.springframework.beans.factory.annotation.Autowired;
  9 +import org.springframework.scheduling.annotation.Scheduled;
  10 +
  11 +import java.util.List;
  12 +
  13 +/**
  14 + * Created by yoho on 2016/6/21.
  15 + */
  16 +public class MoScanTask implements Runnable{
  17 +
  18 + @Autowired
  19 + IMObjectInfoService moService;
  20 +
  21 + @Autowired
  22 + ITypeInfoService typeService;
  23 +
  24 + //自动发现rabbit监控对象
  25 + @Scheduled(cron = "")
  26 + public void doTask() {
  27 + List<MObjectInfo> mObjectInfoList = moService.queryMObjectsInfo();
  28 +
  29 + for (MObjectInfo info : mObjectInfoList) {
  30 +
  31 + TypeInfo typeInfo = typeService.queryTypeInfo(info.getMoTypeId());
  32 +
  33 + if (1 == typeInfo.getTypeIsLeaf() && typeInfo.getTypeName().matches(InterVar.MOMATCHPREFIX)) {
  34 +
  35 + InterVar.moMaps.put(info.getMoId(), info);
  36 +
  37 + }
  38 + }
  39 + }
  40 +
  41 + @Override
  42 + public void run() {
  43 + doTask();
  44 + }
  45 +}
  1 +package com.monitor.middleware.rabbitmq.task;
  2 +
  3 +import com.model.MObjectInfo;
  4 +import com.monitor.middleware.rabbitmq.constant.InterVar;
  5 +import com.monitor.middleware.rabbitmq.task.job.ClusterViewJob;
  6 +import com.monitor.middleware.rabbitmq.task.job.OneJob;
  7 +import com.monitor.middleware.rabbitmq.task.job.OverViewJob;
  8 +import com.monitor.middleware.rabbitmq.task.job.QueueViewJob;
  9 +import org.springframework.scheduling.annotation.Scheduled;
  10 +
  11 +import java.util.Map;
  12 +
  13 +import static com.monitor.middleware.rabbitmq.constant.InterVar.EXECUTOR_SERVICE;
  14 +
  15 +/**
  16 + * Created by yoho on 2016/6/21.
  17 + */
  18 +
  19 +/**
  20 + * 加载服务监控任务
  21 + */
  22 +public class MonitTask {
  23 +
  24 + @Scheduled(cron = "")
  25 + public void doTask() {
  26 + for (Map.Entry<Integer, MObjectInfo> entry : InterVar.moMaps.entrySet()) {
  27 + EXECUTOR_SERVICE.submit(new OneJob(new ClusterViewJob(entry.getKey())));
  28 + EXECUTOR_SERVICE.submit(new OneJob(new OverViewJob(entry.getKey())));
  29 + EXECUTOR_SERVICE.submit(new OneJob(new QueueViewJob(entry.getKey())));
  30 + }
  31 + }
  32 +}
  1 +package com.monitor.middleware.rabbitmq.task.job;
  2 +
  3 +
  4 +import com.model.MObjectInfo;
  5 +import com.monitor.middleware.rabbitmq.constant.InterVar;
  6 +import com.monitor.middleware.rabbitmq.model.ClusterView;
  7 +import com.monitor.middleware.rabbitmq.model.NodeInfo;
  8 +import org.slf4j.Logger;
  9 +import org.slf4j.LoggerFactory;
  10 +import org.springframework.beans.factory.annotation.Autowired;
  11 +import org.springframework.web.client.RestTemplate;
  12 +
  13 +import java.io.IOException;
  14 +import java.util.Arrays;
  15 +import java.util.concurrent.Callable;
  16 +
  17 +/**
  18 + * Created by yoho on 2016/6/21.
  19 + */
  20 +
  21 +public class ClusterViewJob implements Callable {
  22 + private final static Logger DEBUG = LoggerFactory.getLogger(ClusterViewJob.class);
  23 +
  24 + @Autowired
  25 + RestTemplate restClient;
  26 +
  27 + private int moId;
  28 +
  29 + public ClusterViewJob(int moId) {
  30 +
  31 + this.moId = moId;
  32 + }
  33 +
  34 + public void doTask() {
  35 + MObjectInfo moInfo = InterVar.moMaps.get(moId);
  36 +
  37 + String url = String.format(InterVar.URLFORMAT, moInfo.getMoHostIp(), moInfo.getMoTags()) + InterVar.NODES_URL;
  38 +
  39 + if (null == moInfo) {
  40 +
  41 + return;
  42 + }
  43 +
  44 + String respJson = restClient.getForObject(url, String.class);
  45 +
  46 + ClusterView oneView = new ClusterView();
  47 +
  48 + try {
  49 + oneView.setMoId(moId);
  50 +
  51 + oneView.setClusterView(Arrays.asList(InterVar.JSONMAPPER.readValue(respJson, NodeInfo[].class)));
  52 +
  53 +
  54 + } catch (IOException e) {
  55 +
  56 + DEBUG.error("Failed to parse cluster_view info: {} , error {}", respJson, e);
  57 +
  58 + return;
  59 + }
  60 +
  61 + if (null != oneView) {
  62 +
  63 + InterVar.POINT_QUEUE.offer(oneView);
  64 + }
  65 + }
  66 +
  67 + @Override
  68 + public Object call() throws Exception {
  69 + doTask();
  70 + return null;
  71 + }
  72 +}
  1 +package com.monitor.middleware.rabbitmq.task.job;
  2 +
  3 +import com.monitor.middleware.rabbitmq.constant.InterVar;
  4 +import java.util.concurrent.*;
  5 +
  6 +
  7 +
  8 +/**
  9 + * Created by yoho on 2016/6/21.
  10 + */
  11 +public class OneJob implements Runnable {
  12 +
  13 + private Callable oneJob;
  14 +
  15 + public OneJob(Callable oneJob) {
  16 + this.oneJob = oneJob;
  17 + }
  18 +
  19 +
  20 + @Override
  21 + public void run() {
  22 + Future future = InterVar.EXECUTOR_SERVICE.submit(this.oneJob);
  23 +
  24 + try {
  25 + future.get(InterVar.MONIT_TIMEOUT, TimeUnit.SECONDS);
  26 + } catch (InterruptedException e) {
  27 + e.printStackTrace();
  28 + } catch (ExecutionException e) {
  29 + e.printStackTrace();
  30 + } catch (TimeoutException e) {
  31 + e.printStackTrace();
  32 + }
  33 + }
  34 +}
  1 +package com.monitor.middleware.rabbitmq.task.job;
  2 +
  3 +
  4 +import com.model.MObjectInfo;
  5 +import com.monitor.middleware.rabbitmq.constant.InterVar;
  6 +import com.monitor.middleware.rabbitmq.model.OverView;
  7 +import org.slf4j.Logger;
  8 +import org.slf4j.LoggerFactory;
  9 +import org.springframework.beans.factory.annotation.Autowired;
  10 +import org.springframework.web.client.RestTemplate;
  11 +
  12 +import java.io.IOException;
  13 +import java.util.concurrent.Callable;
  14 +
  15 +/**
  16 + * Created by yoho on 2016/6/21.
  17 + */
  18 +
  19 +public class OverViewJob implements Callable{
  20 + private final static Logger DEBUG = LoggerFactory.getLogger("OverViewJob");
  21 +
  22 + @Autowired
  23 + RestTemplate restClient;
  24 +
  25 + private int moId;
  26 +
  27 + public OverViewJob(int moId) {
  28 +
  29 + this.moId = moId;
  30 + }
  31 +
  32 + public void doTask() {
  33 + MObjectInfo moInfo = InterVar.moMaps.get(moId);
  34 +
  35 + String url = String.format(InterVar.URLFORMAT, moInfo.getMoHostIp(), moInfo.getMoTags()) + InterVar.OVERVIEW_URL;
  36 +
  37 + if (null == moInfo) {
  38 +
  39 + return;
  40 + }
  41 +
  42 + String respJson = restClient.getForObject(url, String.class);
  43 +
  44 + OverView oneView = null;
  45 +
  46 + try {
  47 + oneView = InterVar.JSONMAPPER.readValue(respJson, OverView.class);
  48 +
  49 + oneView.setMoId(moInfo.getMoId());
  50 +
  51 + } catch (IOException e) {
  52 +
  53 + DEBUG.error("Failed to parse overview info: {} , error {}", respJson, e);
  54 +
  55 + return;
  56 + }
  57 +
  58 + if (null != oneView) {
  59 + InterVar.POINT_QUEUE.offer(oneView);
  60 + }
  61 + }
  62 +
  63 + @Override
  64 + public Object call() throws Exception {
  65 + doTask();
  66 + return null;
  67 + }
  68 +}
  1 +package com.monitor.middleware.rabbitmq.task.job;
  2 +
  3 +
  4 +import com.model.MObjectInfo;
  5 +import com.monitor.middleware.rabbitmq.constant.InterVar;
  6 +import com.monitor.middleware.rabbitmq.model.QueueInfo;
  7 +import com.monitor.middleware.rabbitmq.model.QueueView;
  8 +import org.slf4j.Logger;
  9 +import org.slf4j.LoggerFactory;
  10 +import org.springframework.beans.factory.annotation.Autowired;
  11 +import org.springframework.web.client.RestTemplate;
  12 +
  13 +import java.io.IOException;
  14 +import java.util.Arrays;
  15 +import java.util.concurrent.Callable;
  16 +
  17 +/**
  18 + * Created by yoho on 2016/6/21.
  19 + */
  20 +
  21 +public class QueueViewJob implements Callable {
  22 + private final static Logger DEBUG = LoggerFactory.getLogger(QueueViewJob.class);
  23 +
  24 + @Autowired
  25 + RestTemplate restClient;
  26 +
  27 + private int moId;
  28 +
  29 + public QueueViewJob(int moId) {
  30 +
  31 + this.moId = moId;
  32 + }
  33 +
  34 + public void doTask() {
  35 + MObjectInfo moInfo = InterVar.moMaps.get(moId);
  36 +
  37 + String url = String.format(InterVar.URLFORMAT, moInfo.getMoHostIp(), moInfo.getMoTags()) + InterVar.QUEUE_URL;
  38 +
  39 + if (null == moInfo) {
  40 +
  41 + return;
  42 + }
  43 +
  44 + String respJson = restClient.getForObject(url, String.class);
  45 +
  46 + QueueView oneView = new QueueView();
  47 +
  48 + try {
  49 + oneView.setQueueView(Arrays.asList(InterVar.JSONMAPPER.readValue(respJson, QueueInfo[].class)));
  50 +
  51 + oneView.setMoId(moInfo.getMoId());
  52 +
  53 + } catch (IOException e) {
  54 +
  55 + DEBUG.error("Failed to parse queue_view info: {} , error {}", respJson, e);
  56 +
  57 + return;
  58 + }
  59 +
  60 + if (null != oneView) {
  61 +
  62 + InterVar.POINT_QUEUE.offer(oneView);
  63 + }
  64 + }
  65 +
  66 +
  67 + @Override
  68 + public Object call() throws Exception {
  69 + doTask();
  70 + return null;
  71 + }
  72 +}
  1 +<?xml version="1.0" encoding="UTF-8"?>
  2 +<beans xmlns="http://www.springframework.org/schema/beans"
  3 + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4 + xmlns:task="http://www.springframework.org/schema/task"
  5 + xsi:schemaLocation="http://www.springframework.org/schema/beans
  6 + http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
  7 + http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
  8 +
  9 + <task:scheduler id="taskScheduler" pool-size="5"></task:scheduler>
  10 +
  11 + <bean id="httpClientFactory" class="org.springframework.http.client.SimpleClientHttpRequestFactory">
  12 + <property name="connectTimeout" value="2000"/>
  13 + <property name="readTimeout" value="3000"/>
  14 + </bean>
  15 +
  16 + <!--RestTemplate-->
  17 + <bean id="restClient" class="org.springframework.web.client.RestTemplate" scope="prototype">
  18 + <constructor-arg ref="httpClientFactory"/>
  19 + </bean>
  20 +
  21 +
  22 +
  23 +</beans>