...
|
...
|
@@ -2,7 +2,7 @@ package com.yoho.datasync.producer.canal; |
|
|
|
|
|
import com.alibaba.fastjson.JSON;
|
|
|
import com.alibaba.otter.canal.protocol.CanalEntry;
|
|
|
import com.yoho.datasync.core.base.core.DatasyncConstant;
|
|
|
import com.yoho.datasync.core.base.constant.DatasyncConstant;
|
|
|
import com.yoho.datasync.core.base.message.CharUtils;
|
|
|
import com.yoho.datasync.core.base.message.MessageHelper;
|
|
|
import com.yoho.datasync.core.base.message.MqBeansResgister;
|
...
|
...
|
@@ -35,16 +35,15 @@ public class ProducerRabbitMqMessageHelper { |
|
|
private MqBeansResgister mqBeansResgister;
|
|
|
|
|
|
/**
|
|
|
*
|
|
|
* @param eventType 事件类型
|
|
|
* @param canalDbName binlog里对应的数据库名称
|
|
|
* @param eventType 事件类型
|
|
|
* @param canalDbName binlog里对应的数据库名称
|
|
|
* @param canalTableName binlog里对应的表名称
|
|
|
* @param rowData binlog变更数据
|
|
|
* @param rowData binlog变更数据
|
|
|
*/
|
|
|
public void sendRabbitMqMessage(CanalEntry.EventType eventType, String canalDbName, String canalTableName,
|
|
|
CanalEntry.RowData rowData) {
|
|
|
MqMessageEntity mqMessage = null;
|
|
|
switch (eventType){
|
|
|
switch (eventType) {
|
|
|
case DELETE:
|
|
|
mqMessage = this.getDeleteData(rowData, canalDbName, canalTableName);
|
|
|
break;
|
...
|
...
|
@@ -61,8 +60,8 @@ public class ProducerRabbitMqMessageHelper { |
|
|
return;
|
|
|
}
|
|
|
// 获取真实队列名称
|
|
|
String realQueueName = messageHelper.getRealQueueName(canalDbName,canalTableName, mqMessage);
|
|
|
LOGGER.info("covert mqMessage success, queue is[{}] ,mqMessage is [{}]",realQueueName, JSON.toJSONString(mqMessage));
|
|
|
String realQueueName = messageHelper.getRealQueueName(canalDbName, canalTableName, mqMessage);
|
|
|
LOGGER.info("covert mqMessage success, queue is[{}] ,mqMessage is [{}]", realQueueName, JSON.toJSONString(mqMessage));
|
|
|
// 发送消息
|
|
|
mqBeansResgister.getRabbitTemplate().convertAndSend(realQueueName, mqMessage);
|
|
|
}
|
...
|
...
|
@@ -70,17 +69,17 @@ public class ProducerRabbitMqMessageHelper { |
|
|
|
|
|
private MqMessageEntity getDeleteData(CanalEntry.RowData rowData, String canalDbName, String canalTableName) {
|
|
|
try {
|
|
|
long tableDataVersion = dataVersionManager.genTableDataVersion(canalDbName,canalTableName);
|
|
|
return new MqMessageEntity(DatasyncConstant.ACTION_DELETE, canalDbName,canalTableName, tableDataVersion, getData(rowData.getBeforeColumnsList(), canalDbName, canalTableName));
|
|
|
long tableDataVersion = dataVersionManager.genTableDataVersion(canalDbName, canalTableName);
|
|
|
return new MqMessageEntity(DatasyncConstant.ACTION_DELETE, canalDbName, canalTableName, tableDataVersion, getData(rowData.getBeforeColumnsList(), canalDbName, canalTableName));
|
|
|
} catch (Exception e) {
|
|
|
LOGGER.error(e.getMessage(),e);
|
|
|
LOGGER.error(e.getMessage(), e);
|
|
|
return null;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
private MqMessageEntity getUpdateData(CanalEntry.RowData rowData, String canalDbName, String canalTableName, String action) {
|
|
|
try {
|
|
|
long tableDataVersion = dataVersionManager.genTableDataVersion(canalDbName,canalTableName);
|
|
|
long tableDataVersion = dataVersionManager.genTableDataVersion(canalDbName, canalTableName);
|
|
|
return new MqMessageEntity(action, canalDbName, canalTableName, tableDataVersion, getData(rowData.getAfterColumnsList(), canalDbName, canalTableName));
|
|
|
} catch (Exception e) {
|
|
|
LOGGER.error(e.getMessage(), e);
|
...
|
...
|
@@ -96,6 +95,9 @@ public class ProducerRabbitMqMessageHelper { |
|
|
for (CanalEntry.Column column : columns) {
|
|
|
// 有些数据库的字段不是有下划线的
|
|
|
if (column.getName().contains("_")) {
|
|
|
/**
|
|
|
* 把所有字段都转成了驼峰形式
|
|
|
*/
|
|
|
fieldName = CharUtils.underlineToCamelhump(column.getName());
|
|
|
} else {
|
|
|
fieldName = column.getName();
|
...
|
...
|
@@ -113,12 +115,13 @@ public class ProducerRabbitMqMessageHelper { |
|
|
|
|
|
/**
|
|
|
* 根据dbname和tablename将model中的字段进行反射
|
|
|
*
|
|
|
* @param canalDbName
|
|
|
* @param canalTableName
|
|
|
*/
|
|
|
private Set<String> getModelFields(String canalDbName, String canalTableName) {
|
|
|
String tableKey = canalDbName + "." + canalTableName;
|
|
|
return modelFieldsMap.computeIfAbsent(tableKey , Key -> {
|
|
|
return modelFieldsMap.computeIfAbsent(tableKey, Key -> {
|
|
|
Class<?> bean = null;
|
|
|
Set<String> fieldSet = new HashSet<>();
|
|
|
try {
|
...
|
...
|
@@ -130,7 +133,7 @@ public class ProducerRabbitMqMessageHelper { |
|
|
}
|
|
|
} catch (ClassNotFoundException e) {
|
|
|
//TODO
|
|
|
LOGGER.error("This Bean name {} cannot found" , MODEL_CLASS_PACKAGE + CharUtils.getBeanName(CharUtils.underlineToCamelhump(canalTableName)));
|
|
|
LOGGER.error("This Bean name {} cannot found", MODEL_CLASS_PACKAGE + CharUtils.getBeanName(CharUtils.underlineToCamelhump(canalTableName)));
|
|
|
}
|
|
|
return fieldSet;
|
|
|
});
|
...
|
...
|
|