ProducerRabbitMqMessageHelper.java
5.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package com.yoho.datasync.producer.canal.common;
import com.alibaba.fastjson.JSON;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.yoho.datasync.core.DatasyncConstant;
import com.yoho.datasync.message.CharUtils;
import com.yoho.datasync.message.MessageHelper;
import com.yoho.datasync.message.MqBeansResgister;
import com.yoho.datasync.message.MqMessageEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.lang.reflect.Field;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class ProducerRabbitMqMessageHelper {
private static final Logger LOGGER = LoggerFactory.getLogger(ProducerRabbitMqMessageHelper.class);
private volatile Map<String, Set<String>> modelFieldsMap = new ConcurrentHashMap<>();
private static final String MODEL_CLASS_PACKAGE = "com.yoho.datasync.producer.common.model.";
@Resource
private MessageHelper messageHelper;
@Resource
private MqMessageDataVersionManager dataVersionManager;
@Resource
private MqBeansResgister mqBeansResgister;
/**
*
* @param eventType 事件类型
* @param canalDbName binlog里对应的数据库名称
* @param canalTableName binlog里对应的表名称
* @param rowData binlog变更数据
*/
public void sendRabbitMqMessage(CanalEntry.EventType eventType, String canalDbName, String canalTableName,
CanalEntry.RowData rowData) {
MqMessageEntity mqMessage = null;
switch (eventType){
case DELETE:
mqMessage = this.getDeleteData(rowData, canalDbName, canalTableName);
break;
case INSERT:
mqMessage = this.getUpdateData(rowData, canalDbName, canalTableName, DatasyncConstant.ACTION_INSERT);
break;
case UPDATE:
mqMessage = this.getUpdateData(rowData, canalDbName, canalTableName, DatasyncConstant.ACTION_UPDATE);
}
if (mqMessage == null || mqMessage.getData() == null || mqMessage.getData().isEmpty()) {
LOGGER.error("covert mqMessage fail,eventType is {},canalDbName is{},canalTableName is {},rowData is {},",
eventType, canalDbName, canalTableName, rowData == null ? "" : rowData.toString());
return;
}
// 获取真实队列名称
String realQueueName = messageHelper.getRealQueueName(canalDbName,canalTableName, mqMessage);
LOGGER.info("covert mqMessage success, queue is[{}] ,mqMessage is [{}]",realQueueName, JSON.toJSONString(mqMessage));
// 发送消息
mqBeansResgister.getRabbitTemplate().convertAndSend(realQueueName, mqMessage);
}
private MqMessageEntity getDeleteData(CanalEntry.RowData rowData, String canalDbName, String canalTableName) {
try {
long tableDataVersion = dataVersionManager.genTableDataVersion(canalDbName,canalTableName);
return new MqMessageEntity(DatasyncConstant.ACTION_DELETE, canalDbName,canalTableName, tableDataVersion, getData(rowData.getBeforeColumnsList(), canalDbName, canalTableName));
} catch (Exception e) {
LOGGER.error(e.getMessage(),e);
return null;
}
}
private MqMessageEntity getUpdateData(CanalEntry.RowData rowData, String canalDbName, String canalTableName, String action) {
try {
long tableDataVersion = dataVersionManager.genTableDataVersion(canalDbName,canalTableName);
return new MqMessageEntity(action, canalDbName, canalTableName, tableDataVersion, getData(rowData.getAfterColumnsList(), canalDbName, canalTableName));
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
return null;
}
}
private Map<String, Object> getData(List<CanalEntry.Column> columns, String canalDbName, String canalTableName) {
try {
Map<String, Object> data = new HashMap<>();
Set<String> fieldSet = getModelFields(canalDbName, canalTableName);
String fieldName;
for (CanalEntry.Column column : columns) {
// 有些数据库的字段不是有下划线的
if (column.getName().contains("_")) {
fieldName = CharUtils.underlineToCamelhump(column.getName());
} else {
fieldName = column.getName();
}
if (fieldSet.contains(fieldName)) {
data.put(fieldName, column.getValue());
}
}
return data;
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
return null;
}
}
/**
* 根据dbname和tablename将model中的字段进行反射
* @param canalDbName
* @param canalTableName
*/
private Set<String> getModelFields(String canalDbName, String canalTableName) {
String tableKey = canalDbName + "." + canalTableName;
return modelFieldsMap.computeIfAbsent(tableKey , Key -> {
Class<?> bean = null;
Set<String> fieldSet = new HashSet<>();
try {
bean = Class.forName(MODEL_CLASS_PACKAGE + CharUtils.getBeanName(CharUtils.underlineToCamelhump(canalTableName)));
Field[] fields = bean.getDeclaredFields();
for (Field field : fields) {
fieldSet.add(field.getName());
}
} catch (ClassNotFoundException e) {
//TODO
LOGGER.error("This Bean name {} cannot found" , MODEL_CLASS_PACKAGE + CharUtils.getBeanName(CharUtils.underlineToCamelhump(canalTableName)));
}
return fieldSet;
});
}
}