|
|
package com.yoho.datasync.producer.common.helper;
|
|
|
|
|
|
import com.yoho.datasync.producer.common.config.TableConfigLoader;
|
|
|
import com.yoho.datasync.producer.common.entity.MqMessageEntity;
|
|
|
import com.yoho.datasync.producer.common.entity.TableConfig;
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
import javax.annotation.Resource;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
|
|
|
|
|
|
@Component
|
|
|
public class MessageHelper {
|
|
|
|
|
|
|
|
|
private static final Logger logger = LoggerFactory.getLogger(MessageHelper.class);
|
|
|
|
|
|
private static final String QUEUE_PERIFX = "yohodatasync.";
|
|
|
|
|
|
@Resource
|
|
|
private TableConfigLoader tableConfigLoader;
|
|
|
|
|
|
private String getQueuePerfix(String dbName, String tableName) {
|
|
|
StringBuilder queuePerfix = new StringBuilder(QUEUE_PERIFX);
|
|
|
if (StringUtils.isNotBlank(dbName)) {
|
|
|
queuePerfix.append(dbName);
|
|
|
queuePerfix.append(".");
|
|
|
}
|
|
|
if (StringUtils.isNotBlank(tableName)) {
|
|
|
queuePerfix.append(tableName);
|
|
|
}
|
|
|
return queuePerfix.toString();
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 获取更新时的真实的队列名
|
|
|
* yohodatasync.shops.user_account
|
|
|
*/
|
|
|
public String getRealQueueName(String dbName, String tableName) {
|
|
|
StringBuilder queuePerfix = new StringBuilder(QUEUE_PERIFX);
|
|
|
if (StringUtils.isNotBlank(dbName)) {
|
|
|
queuePerfix.append(dbName);
|
|
|
queuePerfix.append(".");
|
|
|
}
|
|
|
if (StringUtils.isNotBlank(tableName)) {
|
|
|
queuePerfix.append(tableName);
|
|
|
}
|
|
|
return queuePerfix.toString();
|
|
|
}
|
|
|
|
|
|
private String getPrimaryKeysValue(TableConfig tableConfig, Map<String, Object> dataMap) {
|
|
|
String[] primaryKeys = tableConfig.getPrimaryKeys();
|
|
|
StringBuilder primaryKeysValue = new StringBuilder();
|
|
|
for (String primaryKey : primaryKeys) {
|
|
|
primaryKeysValue.append(dataMap.get(primaryKey));
|
|
|
}
|
|
|
return primaryKeysValue.toString();
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 当前表对应的队列是否是单队列
|
|
|
*
|
|
|
* @param tableConfig
|
|
|
* @return
|
|
|
*/
|
|
|
private boolean isSingleQueue(TableConfig tableConfig) {
|
|
|
if (tableConfig == null) {
|
|
|
return true;
|
|
|
}
|
|
|
int queueSize = tableConfig.getQueueSize();
|
|
|
if (queueSize <= 1) {
|
|
|
return true;
|
|
|
}
|
|
|
String[] primaryKeys = tableConfig.getPrimaryKeys();
|
|
|
return primaryKeys == null;
|
|
|
}
|
|
|
|
|
|
private String genRealQueueName(String queuePerfix, int index) {
|
|
|
return queuePerfix + "." + index;
|
|
|
}
|
|
|
|
|
|
|
|
|
/**
|
|
|
* 根据表名获取队列名
|
|
|
*
|
|
|
* @param tableName
|
|
|
* @return
|
|
|
*/
|
|
|
public List<String> getTableQueueNames(String dbName, String tableName) {
|
|
|
String queuePerfix = this.getQueuePerfix(dbName, tableName);
|
|
|
TableConfig tableConfig = tableConfigLoader.getTableConfigByKey(dbName, tableName);
|
|
|
if (this.isSingleQueue(tableConfig)) {
|
|
|
return Arrays.asList(this.genRealQueueName(queuePerfix, 0));
|
|
|
}
|
|
|
List<String> results = new ArrayList<>();
|
|
|
for (int i = 0; i < tableConfig.getQueueSize(); i++) {
|
|
|
results.add(this.genRealQueueName(queuePerfix, i));
|
|
|
}
|
|
|
return results;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 获取更新时的真实的队列名
|
|
|
*
|
|
|
* @param mqMessage
|
|
|
* @param tableName
|
|
|
* @param mqMessage
|
|
|
* @return
|
|
|
*/
|
|
|
public String getRealQueueName(String dbName, String tableName, MqMessageEntity mqMessage) {
|
|
|
String queuePerfix = this.getQueuePerfix(dbName, tableName);
|
|
|
try {
|
|
|
TableConfig tableConfig = tableConfigLoader.getTableConfigByKey(dbName, tableName);
|
|
|
Map<String, Object> dataMap = mqMessage.getData();
|
|
|
if (this.isSingleQueue(tableConfig) || dataMap==null) {
|
|
|
return this.genRealQueueName(queuePerfix, 0);
|
|
|
}
|
|
|
String primaryKeysValue = getPrimaryKeysValue(tableConfig, dataMap);
|
|
|
if (StringUtils.isBlank(primaryKeysValue)) {
|
|
|
return this.genRealQueueName(queuePerfix, 0);
|
|
|
}
|
|
|
int queueIndex = Math.abs(primaryKeysValue.hashCode()) % tableConfig.getQueueSize();
|
|
|
return this.genRealQueueName(queuePerfix, queueIndex);
|
|
|
} catch (Exception e) {
|
|
|
logger.error(e.getMessage(), e);
|
|
|
return this.genRealQueueName(queuePerfix, 0);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
} |