MessageHelper.java
4.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package com.yoho.datasync.producer.common.helper;
import com.yoho.datasync.producer.common.config.TableConfigLoader;
import com.yoho.datasync.producer.common.entity.MqMessageEntity;
import com.yoho.datasync.producer.common.entity.TableConfig;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@Component
public class MessageHelper {
private static final Logger logger = LoggerFactory.getLogger(MessageHelper.class);
private static final String QUEUE_PERIFX = "yohodatasync.";
@Resource
private TableConfigLoader tableConfigLoader;
private String getQueuePerfix(String dbName, String tableName) {
StringBuilder queuePerfix = new StringBuilder(QUEUE_PERIFX);
if (StringUtils.isNotBlank(dbName)) {
queuePerfix.append(dbName);
queuePerfix.append(".");
}
if (StringUtils.isNotBlank(tableName)) {
queuePerfix.append(tableName);
}
return queuePerfix.toString();
}
/**
* 获取更新时的真实的队列名
* yohodatasync.shops.user_account
*/
public String getRealQueueName(String dbName, String tableName) {
StringBuilder queuePerfix = new StringBuilder(QUEUE_PERIFX);
if (StringUtils.isNotBlank(dbName)) {
queuePerfix.append(dbName);
queuePerfix.append(".");
}
if (StringUtils.isNotBlank(tableName)) {
queuePerfix.append(tableName);
}
return queuePerfix.toString();
}
private String getPrimaryKeysValue(TableConfig tableConfig, Map<String, Object> dataMap) {
String[] primaryKeys = tableConfig.getPrimaryKeys();
StringBuilder primaryKeysValue = new StringBuilder();
for (String primaryKey : primaryKeys) {
primaryKeysValue.append(dataMap.get(primaryKey));
}
return primaryKeysValue.toString();
}
/**
* 当前表对应的队列是否是单队列
*
* @param tableConfig
* @return
*/
private boolean isSingleQueue(TableConfig tableConfig) {
if (tableConfig == null) {
return true;
}
int queueSize = tableConfig.getQueueSize();
if (queueSize <= 1) {
return true;
}
String[] primaryKeys = tableConfig.getPrimaryKeys();
return primaryKeys == null;
}
private String genRealQueueName(String queuePerfix, int index) {
return queuePerfix + "." + index;
}
/**
* 根据表名获取队列名
*
* @param tableName
* @return
*/
public List<String> getTableQueueNames(String dbName, String tableName) {
String queuePerfix = this.getQueuePerfix(dbName, tableName);
TableConfig tableConfig = tableConfigLoader.getTableConfigByKey(dbName, tableName);
if (this.isSingleQueue(tableConfig)) {
return Arrays.asList(this.genRealQueueName(queuePerfix, 0));
}
List<String> results = new ArrayList<>();
for (int i = 0; i < tableConfig.getQueueSize(); i++) {
results.add(this.genRealQueueName(queuePerfix, i));
}
return results;
}
/**
* 获取更新时的真实的队列名
*
* @param mqMessage
* @param tableName
* @param mqMessage
* @return
*/
public String getRealQueueName(String dbName, String tableName, MqMessageEntity mqMessage) {
String queuePerfix = this.getQueuePerfix(dbName, tableName);
try {
TableConfig tableConfig = tableConfigLoader.getTableConfigByKey(dbName, tableName);
Map<String, Object> dataMap = mqMessage.getData();
if (this.isSingleQueue(tableConfig) || dataMap==null) {
return this.genRealQueueName(queuePerfix, 0);
}
String primaryKeysValue = getPrimaryKeysValue(tableConfig, dataMap);
if (StringUtils.isBlank(primaryKeysValue)) {
return this.genRealQueueName(queuePerfix, 0);
}
int queueIndex = Math.abs(primaryKeysValue.hashCode()) % tableConfig.getQueueSize();
return this.genRealQueueName(queuePerfix, queueIndex);
} catch (Exception e) {
logger.error(e.getMessage(), e);
return this.genRealQueueName(queuePerfix, 0);
}
}
}