Authored by hugufei

fix

@@ -28,7 +28,6 @@ import org.springframework.context.ApplicationEventPublisherAware; @@ -28,7 +28,6 @@ import org.springframework.context.ApplicationEventPublisherAware;
28 28
29 import java.nio.charset.StandardCharsets; 29 import java.nio.charset.StandardCharsets;
30 import java.util.Map; 30 import java.util.Map;
31 -import java.util.UUID;  
32 import java.util.concurrent.ConcurrentHashMap; 31 import java.util.concurrent.ConcurrentHashMap;
33 32
34 public abstract class AbstractMqListener implements ApplicationEventPublisherAware, MessageListener { 33 public abstract class AbstractMqListener implements ApplicationEventPublisherAware, MessageListener {
@@ -83,9 +82,10 @@ public abstract class AbstractMqListener implements ApplicationEventPublisherAwa @@ -83,9 +82,10 @@ public abstract class AbstractMqListener implements ApplicationEventPublisherAwa
83 } 82 }
84 83
85 private String getDataVersionKey(SearchMqMessage searchMqMessage) { 84 private String getDataVersionKey(SearchMqMessage searchMqMessage) {
  85 + String dbName = searchMqMessage.getDbName();
86 String tableName = searchMqMessage.getTableName(); 86 String tableName = searchMqMessage.getTableName();
87 - String primaryKeysValue = searchMessageHelper.getPrimaryKeysValue(tableName, searchMqMessage.getData());  
88 - return new StringBuilder(tableName).append("_").append(primaryKeysValue).toString(); 87 + String primaryKeysValue = searchMessageHelper.getPrimaryKeysValue(searchMqMessage.getDbName(), tableName, searchMqMessage.getData());
  88 + return new StringBuilder(dbName).append("_").append(tableName).append("_").append("_").append(primaryKeysValue).toString();
89 } 89 }
90 90
91 private boolean checkDataVersion(SearchMqMessage searchMqMessage) { 91 private boolean checkDataVersion(SearchMqMessage searchMqMessage) {
@@ -218,19 +218,19 @@ public abstract class AbstractMqListener implements ApplicationEventPublisherAwa @@ -218,19 +218,19 @@ public abstract class AbstractMqListener implements ApplicationEventPublisherAwa
218 } 218 }
219 } 219 }
220 220
221 - private String getSinglePrimaryKeyValue(SearchMqMessage jsonObject) {  
222 - TableConfig tableConfig = tableConfigLoader.getTableConfig(jsonObject.getTableName()); 221 + private String getSinglePrimaryKeyValue(SearchMqMessage message) {
  222 + TableConfig tableConfig = tableConfigLoader.getTableConfig(message.getDbName(), message.getTableName());
223 if (tableConfig == null) { 223 if (tableConfig == null) {
224 - return null; 224 + tableConfig = tableConfigLoader.getTableConfig("", message.getTableName());
225 } 225 }
226 - if (tableConfig.getPrimaryKeys() == null || tableConfig.getPrimaryKeys().length != 1) { 226 + if (tableConfig == null || tableConfig.getPrimaryKeys() == null || tableConfig.getPrimaryKeys().length != 1) {
227 return null; 227 return null;
228 } 228 }
229 - String tableKey = tableConfig.getPrimaryKeys()[0];  
230 - if (StringUtils.isBlank(tableKey)) { 229 + String primaryKey = tableConfig.getPrimaryKeys()[0];
  230 + if (StringUtils.isBlank(primaryKey)) {
231 return null; 231 return null;
232 } 232 }
233 - return MapUtils.getString(jsonObject.getData(), tableKey, null); 233 + return MapUtils.getString(message.getData(), primaryKey, null);
234 } 234 }
235 235
236 protected final long getCost(long begin) { 236 protected final long getCost(long begin) {