ProducerRabbitMqMessageHelper.java 5.73 KB
package com.yoho.datasync.producer.canal.common;

import com.alibaba.fastjson.JSON;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.yoho.datasync.core.DatasyncConstant;
import com.yoho.datasync.message.CharUtils;
import com.yoho.datasync.message.MessageHelper;
import com.yoho.datasync.message.MqBeansResgister;
import com.yoho.datasync.message.MqMessageEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.lang.reflect.Field;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

@Component
public class ProducerRabbitMqMessageHelper {

    private static final Logger LOGGER = LoggerFactory.getLogger(ProducerRabbitMqMessageHelper.class);

    private volatile Map<String, Set<String>> modelFieldsMap = new ConcurrentHashMap<>();

    private static final String MODEL_CLASS_PACKAGE = "com.yoho.datasync.producer.common.model.";

    @Resource
    private MessageHelper messageHelper;

    @Resource
    private MqMessageDataVersionManager dataVersionManager;

    @Resource
    private MqBeansResgister mqBeansResgister;

    /**
     *
     * @param eventType 事件类型
     * @param canalDbName binlog里对应的数据库名称
     * @param canalTableName binlog里对应的表名称
     * @param rowData binlog变更数据
     */
    public void sendRabbitMqMessage(CanalEntry.EventType eventType, String canalDbName, String canalTableName,
                                    CanalEntry.RowData rowData) {
        MqMessageEntity mqMessage = null;
        switch (eventType){
            case DELETE:
                mqMessage = this.getDeleteData(rowData, canalDbName, canalTableName);
                break;
            case INSERT:
                mqMessage = this.getUpdateData(rowData, canalDbName, canalTableName, DatasyncConstant.ACTION_INSERT);
                break;
            case UPDATE:
                mqMessage = this.getUpdateData(rowData, canalDbName, canalTableName, DatasyncConstant.ACTION_UPDATE);
        }

        if (mqMessage == null || mqMessage.getData() == null || mqMessage.getData().isEmpty()) {
            LOGGER.error("covert mqMessage fail,eventType is {},canalDbName is{},canalTableName is {},rowData is {},",
                    eventType, canalDbName, canalTableName, rowData == null ? "" : rowData.toString());
            return;
        }
        // 获取真实队列名称
        String realQueueName = messageHelper.getRealQueueName(canalDbName,canalTableName, mqMessage);
        LOGGER.info("covert mqMessage success, queue is[{}] ,mqMessage is [{}]",realQueueName, JSON.toJSONString(mqMessage));
        // 发送消息
        mqBeansResgister.getRabbitTemplate().convertAndSend(realQueueName, mqMessage);
    }


    private MqMessageEntity getDeleteData(CanalEntry.RowData rowData, String canalDbName, String canalTableName) {
        try {
            long tableDataVersion = dataVersionManager.genTableDataVersion(canalDbName,canalTableName);
            return new MqMessageEntity(DatasyncConstant.ACTION_DELETE, canalDbName,canalTableName, tableDataVersion, getData(rowData.getBeforeColumnsList(), canalDbName, canalTableName));
        } catch (Exception e) {
            LOGGER.error(e.getMessage(),e);
            return null;
        }
    }

    private MqMessageEntity getUpdateData(CanalEntry.RowData rowData, String canalDbName, String canalTableName, String action) {
        try {
            long tableDataVersion = dataVersionManager.genTableDataVersion(canalDbName,canalTableName);
            return new MqMessageEntity(action, canalDbName, canalTableName, tableDataVersion, getData(rowData.getAfterColumnsList(), canalDbName, canalTableName));
        } catch (Exception e) {
            LOGGER.error(e.getMessage(), e);
            return null;
        }
    }

    private Map<String, Object> getData(List<CanalEntry.Column> columns, String canalDbName, String canalTableName) {
        try {
            Map<String, Object> data = new HashMap<>();
            Set<String> fieldSet = getModelFields(canalDbName, canalTableName);
            String fieldName;
            for (CanalEntry.Column column : columns) {
                // 有些数据库的字段不是有下划线的
                if (column.getName().contains("_")) {
                    fieldName = CharUtils.underlineToCamelhump(column.getName());
                } else {
                    fieldName = column.getName();
                }
                if (fieldSet.contains(fieldName)) {
                    data.put(fieldName, column.getValue());
                }
            }
            return data;
        } catch (Exception e) {
            LOGGER.error(e.getMessage(), e);
            return null;
        }
    }

    /**
     * 根据dbname和tablename将model中的字段进行反射
     * @param canalDbName
     * @param canalTableName
     */
    private Set<String> getModelFields(String canalDbName, String canalTableName) {
        String tableKey = canalDbName + "." + canalTableName;
        return modelFieldsMap.computeIfAbsent(tableKey , Key -> {
            Class<?> bean = null;
            Set<String> fieldSet = new HashSet<>();
            try {
                bean = Class.forName(MODEL_CLASS_PACKAGE + CharUtils.getBeanName(CharUtils.underlineToCamelhump(canalTableName)));
                Field[] fields = bean.getDeclaredFields();
                for (Field field : fields) {
                    fieldSet.add(field.getName());
                }
            } catch (ClassNotFoundException e) {
                //TODO
                LOGGER.error("This Bean name {} cannot found" , MODEL_CLASS_PACKAGE + CharUtils.getBeanName(CharUtils.underlineToCamelhump(canalTableName)));
            }
            return fieldSet;
        });
    }

}