...
|
...
|
@@ -70,7 +70,8 @@ public class ProducerRabbitMqMessageHelper { |
|
|
private MqMessageEntity getDeleteData(CanalEntry.RowData rowData, String canalDbName, String canalTableName) {
|
|
|
try {
|
|
|
long tableDataVersion = dataVersionManager.genTableDataVersion(canalDbName, canalTableName);
|
|
|
return new MqMessageEntity(DatasyncConstant.ACTION_DELETE, canalDbName, canalTableName, tableDataVersion, getData(rowData.getBeforeColumnsList(), canalDbName, canalTableName));
|
|
|
return new MqMessageEntity(DatasyncConstant.ACTION_DELETE, canalDbName, canalTableName, tableDataVersion,
|
|
|
getData(rowData.getBeforeColumnsList(), canalDbName, canalTableName));
|
|
|
} catch (Exception e) {
|
|
|
LOGGER.error(e.getMessage(), e);
|
|
|
return null;
|
...
|
...
|
@@ -80,7 +81,8 @@ public class ProducerRabbitMqMessageHelper { |
|
|
private MqMessageEntity getUpdateData(CanalEntry.RowData rowData, String canalDbName, String canalTableName, String action) {
|
|
|
try {
|
|
|
long tableDataVersion = dataVersionManager.genTableDataVersion(canalDbName, canalTableName);
|
|
|
return new MqMessageEntity(action, canalDbName, canalTableName, tableDataVersion, getData(rowData.getAfterColumnsList(), canalDbName, canalTableName));
|
|
|
return new MqMessageEntity(action, canalDbName, canalTableName, tableDataVersion,
|
|
|
getData(rowData.getAfterColumnsList(), canalDbName, canalTableName));
|
|
|
} catch (Exception e) {
|
|
|
LOGGER.error(e.getMessage(), e);
|
|
|
return null;
|
...
|
...
|
|