MessageHelper.java 4.45 KB
package com.yoho.datasync.producer.common.helper;

import com.yoho.datasync.producer.common.config.TableConfigLoader;
import com.yoho.datasync.producer.common.entity.MqMessageEntity;
import com.yoho.datasync.producer.common.entity.TableConfig;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;


@Component
public class MessageHelper {


    private static final Logger logger = LoggerFactory.getLogger(MessageHelper.class);

    private static final String QUEUE_PERIFX = "yohodatasync.";

    @Resource
    private TableConfigLoader tableConfigLoader;

    private String getQueuePerfix(String dbName, String tableName) {
        StringBuilder queuePerfix = new StringBuilder(QUEUE_PERIFX);
        if (StringUtils.isNotBlank(dbName)) {
            queuePerfix.append(dbName);
            queuePerfix.append(".");
        }
        if (StringUtils.isNotBlank(tableName)) {
            queuePerfix.append(tableName);
        }
        return queuePerfix.toString();
    }

    /**
     * 获取更新时的真实的队列名
     * yohodatasync.shops.user_account
     */
    public String getRealQueueName(String dbName, String tableName) {
        StringBuilder queuePerfix = new StringBuilder(QUEUE_PERIFX);
        if (StringUtils.isNotBlank(dbName)) {
            queuePerfix.append(dbName);
            queuePerfix.append(".");
        }
        if (StringUtils.isNotBlank(tableName)) {
            queuePerfix.append(tableName);
        }
        return queuePerfix.toString();
    }

    private String getPrimaryKeysValue(TableConfig tableConfig, Map<String, Object> dataMap) {
        String[] primaryKeys = tableConfig.getPrimaryKeys();
        StringBuilder primaryKeysValue = new StringBuilder();
        for (String primaryKey : primaryKeys) {
            primaryKeysValue.append(dataMap.get(primaryKey));
        }
        return primaryKeysValue.toString();
    }

    /**
     * 当前表对应的队列是否是单队列
     *
     * @param tableConfig
     * @return
     */
    private boolean isSingleQueue(TableConfig tableConfig) {
        if (tableConfig == null) {
            return true;
        }
        int queueSize = tableConfig.getQueueSize();
        if (queueSize <= 1) {
            return true;
        }
        String[] primaryKeys = tableConfig.getPrimaryKeys();
        return primaryKeys == null;
    }

    private String genRealQueueName(String queuePerfix, int index) {
        return queuePerfix + "." + index;
    }


    /**
     * 根据表名获取队列名
     *
     * @param tableName
     * @return
     */
    public List<String> getTableQueueNames(String dbName, String tableName) {
        String queuePerfix = this.getQueuePerfix(dbName, tableName);
        TableConfig tableConfig = tableConfigLoader.getTableConfigByKey(dbName, tableName);
        if (this.isSingleQueue(tableConfig)) {
            return Arrays.asList(this.genRealQueueName(queuePerfix, 0));
        }
        List<String> results = new ArrayList<>();
        for (int i = 0; i < tableConfig.getQueueSize(); i++) {
            results.add(this.genRealQueueName(queuePerfix, i));
        }
        return results;
    }

    /**
     * 获取更新时的真实的队列名
     *
     * @param mqMessage
     * @param tableName
     * @param mqMessage
     * @return
     */
    public String getRealQueueName(String dbName, String tableName, MqMessageEntity mqMessage) {
        String queuePerfix = this.getQueuePerfix(dbName, tableName);
        try {
            TableConfig tableConfig = tableConfigLoader.getTableConfigByKey(dbName, tableName);
            Map<String, Object> dataMap = mqMessage.getData();
            if (this.isSingleQueue(tableConfig) || dataMap==null) {
                return this.genRealQueueName(queuePerfix, 0);
            }
            String primaryKeysValue = getPrimaryKeysValue(tableConfig, dataMap);
            if (StringUtils.isBlank(primaryKeysValue)) {
                return this.genRealQueueName(queuePerfix, 0);
            }
            int queueIndex = Math.abs(primaryKeysValue.hashCode()) % tableConfig.getQueueSize();
            return this.genRealQueueName(queuePerfix, queueIndex);
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            return this.genRealQueueName(queuePerfix, 0);
        }
    }



}