Authored by jack.xue

fix rabbitmq bug

... ... @@ -78,8 +78,6 @@ public class InfluxComp implements Runnable {
} catch (Exception e) {
DEBUG.error("Failed to write point to influx {}", pointView.toPoint());
e.printStackTrace();
}
}
}
... ...
... ... @@ -62,4 +62,6 @@ public interface InterVar {
String RETENTION_POLICY = "default";
String port = "15672";
String LOCK="lock";
}
... ...
package com.monitor.middleware.rabbitmq.model.http;
import com.monitor.model.page.PageRequest;
import lombok.Data;
/**
* Created by yoho on 2016/6/21.
*/
@Data
public class QueryRequest {
public class QueryRequest extends PageRequest {
String moId;
}
... ...
... ... @@ -4,13 +4,12 @@ import com.model.MObjectInfo;
import com.model.RabbitAlertInfo;
import com.monitor.middleware.rabbitmq.component.InfluxComp;
import com.monitor.middleware.rabbitmq.constant.InterVar;
import com.monitor.middleware.rabbitmq.model.ClusterView;
import com.monitor.middleware.rabbitmq.model.OverView;
import com.monitor.middleware.rabbitmq.model.QueueInfo;
import com.monitor.middleware.rabbitmq.model.QueueView;
import com.monitor.middleware.rabbitmq.model.*;
import com.monitor.middleware.rabbitmq.model.http.FixRequest;
import com.monitor.middleware.rabbitmq.model.http.QueryRequest;
import com.monitor.middleware.rabbitmq.model.http.RabbitModel;
import com.monitor.model.domain.MObjectModel;
import com.monitor.model.page.PageResponse;
import com.monitor.model.response.BaseResponse;
import com.monitor.mysql.mapper.MObjectInfoMapper;
import com.monitor.mysql.mapper.RabbitAlertMapper;
... ... @@ -57,29 +56,85 @@ public class RabbitmqService {
@RequestMapping(value = "/cluster", method = RequestMethod.POST)
public BaseResponse queryCluster(@RequestBody QueryRequest request) {
BaseResponse response = new BaseResponse();
BaseResponse<PageResponse<NodeInfo>> baseResponse = new BaseResponse();
ClusterView oneView = InterVar.clusterViewMaps.get(Integer.parseInt(request.getMoId()));
if (null != oneView) {
response.setData(oneView.getClusterView());
List<NodeInfo> nodesList=oneView.getClusterView();
List<NodeInfo> selectedList = new ArrayList<>();
int start = (request.getCurrentPage() - 1) * request.getPageSize();
int end = (request.getCurrentPage() * request.getPageSize()) - 1;
int realCount = end < nodesList.size() ? request.getPageSize() : nodesList.size() - start;
for (int i = 0; i < realCount; i++) {
selectedList.add(nodesList.get(start + i));
}
PageResponse<NodeInfo> response = new PageResponse<>();
response.setCurrentPage(request.getCurrentPage());
response.setTotal(nodesList.size());
response.setPageSize(request.getPageSize());
response.setTotalPage(nodesList.size() / request.getPageSize() + 1);
response.setRows(selectedList);
baseResponse.setData(response);
}
return response;
return baseResponse;
}
@RequestMapping(value = "/queue", method = RequestMethod.POST)
public BaseResponse queryQueue(@RequestBody QueryRequest request) {
BaseResponse response = new BaseResponse();
BaseResponse<PageResponse<QueueInfo>> baseResponse = new BaseResponse();
QueueView queueView = InterVar.queueViewMaps.get(Integer.parseInt(request.getMoId()));
QueueView oneView = InterVar.queueViewMaps.get(Integer.parseInt(request.getMoId()));
if (null != queueView) {
response.setData(queueView.getQueueView());
if (null != oneView) {
List<QueueInfo> queuesList=oneView.getQueueView();
List<QueueInfo> selectedList = new ArrayList<>();
int start = (request.getCurrentPage() - 1) * request.getPageSize();
int end = (request.getCurrentPage() * request.getPageSize()) - 1;
int realCount = end < queuesList.size() ? request.getPageSize() : queuesList.size() - start;
for (int i = 0; i < realCount; i++) {
selectedList.add(queuesList.get(start + i));
}
PageResponse<QueueInfo> response = new PageResponse<>();
response.setCurrentPage(request.getCurrentPage());
response.setTotal(queuesList.size());
response.setPageSize(request.getPageSize());
response.setTotalPage(queuesList.size() / request.getPageSize() + 1);
response.setRows(selectedList);
baseResponse.setData(response);
}
return response;
return baseResponse;
}
@RequestMapping(value = "/allRabbitMq")
... ... @@ -124,10 +179,13 @@ public class RabbitmqService {
if (null != rabbitAlertMapper.queryAlertInfo(alertInfo)) {
rabbitAlertMapper.updateAlertInfo(alertInfo);
} else {
rabbitAlertMapper.insertAlertInfo(alertInfo);
}
break;
}
}
... ...
... ... @@ -37,32 +37,18 @@ public class RabbitMonitTask {
@Autowired
RabbitAlertMapper rabbitAlertMapper;
@Scheduled(cron = "0 0/3 * * * ? ")
@Scheduled(fixedRate = 3 * 60 * 1000L)
public void doTask() {
//加载所有的alert
List<RabbitAlertInfo> alertInfos = rabbitAlertMapper.getAllAlertsInfo();
synchronized (InterVar.LOCK.intern()) {
for (Map.Entry<Integer, MObjectInfo> entry : InterVar.moMaps.entrySet()) {
for (RabbitAlertInfo rabbitAlertInfo : alertInfos) {
EXECUTOR_SERVICE.submit(new OneJob(new ClusterViewJob(entry.getKey())));
HashMap<String, Integer> alertMaps = InterVar.alertMaps.get(rabbitAlertInfo.getMoId());
if (null == alertMaps) {
alertMaps = new HashMap<String, Integer>();
InterVar.alertMaps.put(rabbitAlertInfo.getMoId(), alertMaps);
EXECUTOR_SERVICE.submit(new OneJob(new OverViewJob(entry.getKey())));
EXECUTOR_SERVICE.submit(new OneJob(new QueueViewJob(entry.getKey(), rabbitAlertMapper)));
}
alertMaps.put(rabbitAlertInfo.getQueueName(), rabbitAlertInfo.getAlertHigh());
}
for (Map.Entry<Integer, MObjectInfo> entry : InterVar.moMaps.entrySet()) {
EXECUTOR_SERVICE.submit(new OneJob(new ClusterViewJob(entry.getKey())));
EXECUTOR_SERVICE.submit(new OneJob(new OverViewJob(entry.getKey())));
EXECUTOR_SERVICE.submit(new OneJob(new QueueViewJob(entry.getKey())));
}
}
}
... ...
... ... @@ -36,7 +36,7 @@ public class RabbitScanTask implements Runnable {
}
//自动发现rabbit监控对象
@Scheduled(cron = "0 0/1 * * * ? ")
@Scheduled(fixedRate = 60*1000L)
public void doTask() {
List<TypeInfo> typeInfosList = typeService.queryAllTypesInfo();
... ... @@ -51,10 +51,12 @@ public class RabbitScanTask implements Runnable {
}
}
synchronized (InterVar.LOCK.intern())
{
for (MObjectInfo info : mObjectInfoList) {
for (MObjectInfo info : mObjectInfoList) {
InterVar.moMaps.put(info.getMoId(), info);
InterVar.moMaps.put(info.getMoId(), info);
}
}
}
... ...
... ... @@ -25,9 +25,6 @@ import java.util.concurrent.Callable;
public class OverViewJob implements Callable {
private final static Logger DEBUG = LoggerFactory.getLogger("OverViewJob");
@Resource(name = "restClient")
RestTemplate restClient;
private int moId;
public OverViewJob(int moId) {
... ...
... ... @@ -2,11 +2,14 @@ package com.monitor.middleware.rabbitmq.task.job;
import com.model.MObjectInfo;
import com.model.RabbitAlertInfo;
import com.monitor.middleware.rabbitmq.component.RestComp;
import com.monitor.middleware.rabbitmq.constant.InterVar;
import com.monitor.middleware.rabbitmq.model.OverView;
import com.monitor.middleware.rabbitmq.model.QueueInfo;
import com.monitor.middleware.rabbitmq.model.QueueView;
import com.monitor.mysql.mapper.RabbitAlertMapper;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
... ... @@ -17,6 +20,7 @@ import java.io.IOException;
import java.text.MessageFormat;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.Callable;
/**
... ... @@ -26,14 +30,15 @@ import java.util.concurrent.Callable;
public class QueueViewJob implements Callable {
private final static Logger DEBUG = LoggerFactory.getLogger(QueueViewJob.class);
@Resource(name = "restClient")
RestTemplate restClient;
RabbitAlertMapper rabbitAlertMapper;
private int moId;
public QueueViewJob(int moId) {
public QueueViewJob(int moId, RabbitAlertMapper rabbitAlertMapper) {
this.moId = moId;
this.rabbitAlertMapper = rabbitAlertMapper;
}
public void doTask() {
... ... @@ -58,6 +63,8 @@ public class QueueViewJob implements Callable {
//fire alarm
for (QueueInfo queueInfo : oneView.getQueueView()) {
updateQueueAlert(Integer.toString(moId), queueInfo);
checkAlert(Integer.toString(moId), queueInfo);
}
... ... @@ -99,6 +106,22 @@ public class QueueViewJob implements Callable {
}
}
private void updateQueueAlert(String moId, QueueInfo queueInfo) {
List<RabbitAlertInfo> alertInfos = rabbitAlertMapper.getAllAlertsInfo();
for (RabbitAlertInfo alertInfo : alertInfos) {
if (StringUtils.equals(moId, Integer.toString(alertInfo.getMoId())) && StringUtils.equals(queueInfo.getName(), alertInfo.getQueueName())) {
queueInfo.setAlert_high(alertInfo.getAlertHigh());
break;
}
}
}
@Override
public Object call() throws Exception {
doTask();
... ...