...
|
...
|
@@ -26,7 +26,7 @@ public abstract class AbstractMqListener<T> implements MessageListener { |
|
|
/**
|
|
|
* 这里是多线程的,同表共享一个Map
|
|
|
*/
|
|
|
private ConcurrentLinkedHashMap<String, Long> keyVersionMap = new ConcurrentLinkedHashMap.Builder<String , Long>()
|
|
|
private ConcurrentLinkedHashMap<String, Long> keyVersionMap = new ConcurrentLinkedHashMap.Builder<String, Long>()
|
|
|
.maximumWeightedCapacity(2000)
|
|
|
.weigher(Weighers.singleton())
|
|
|
.build();
|
...
|
...
|
@@ -57,7 +57,7 @@ public abstract class AbstractMqListener<T> implements MessageListener { |
|
|
private String getDataVersionKey(MqMessageEntity mqMessageEntity) {
|
|
|
String dbName = mqMessageEntity.getDbName();
|
|
|
String tableName = mqMessageEntity.getTableName();
|
|
|
String primaryKeysValue = messageHelper.getPrimaryKeysValue(tableConfigLoader.getTableConfig(dbName,tableName),
|
|
|
String primaryKeysValue = messageHelper.getPrimaryKeysValue(tableConfigLoader.getTableConfig(dbName, tableName),
|
|
|
mqMessageEntity.getData());
|
|
|
return dbName + "." + tableName + "." + primaryKeysValue;
|
|
|
}
|
...
|
...
|
@@ -104,18 +104,18 @@ public abstract class AbstractMqListener<T> implements MessageListener { |
|
|
}
|
|
|
|
|
|
// 对象强转
|
|
|
ParameterizedType t = (ParameterizedType)this.getClass().getGenericSuperclass(); //获得带有泛型的父类
|
|
|
T object = JSONObject.parseObject(JSON.toJSONString(messageJsonObject.getData()),t.getActualTypeArguments()[0]);
|
|
|
ParameterizedType t = (ParameterizedType) this.getClass().getGenericSuperclass(); //获得带有泛型的父类
|
|
|
T object = JSONObject.parseObject(JSON.toJSONString(messageJsonObject.getData()), t.getActualTypeArguments()[0]);
|
|
|
|
|
|
// 参数检测,不满足条件,隔一分钟再检查一次
|
|
|
Object checkResult = this.checkData(object);
|
|
|
if(checkResult==null){
|
|
|
Object checkResult = this.checkData(object, messageJsonObject.getAction());
|
|
|
if (checkResult == null) {
|
|
|
logger.info("checkData fail, wait 60s ......");
|
|
|
TimeUnit.SECONDS.sleep(60);
|
|
|
}
|
|
|
checkResult = this.checkData(object);
|
|
|
if(checkResult==null){
|
|
|
logger.error("checkData fail, direct return ......, data is [{}]",messageJsonObject);
|
|
|
checkResult = this.checkData(object, messageJsonObject.getAction());
|
|
|
if (checkResult == null) {
|
|
|
logger.error("checkData fail, direct return ......, data is [{}]", messageJsonObject);
|
|
|
return;
|
|
|
}
|
|
|
|
...
|
...
|
@@ -126,14 +126,14 @@ public abstract class AbstractMqListener<T> implements MessageListener { |
|
|
while (retryCount <= getMaxRetryTime()) {
|
|
|
try {
|
|
|
if (isDelete) {
|
|
|
this.doDeleteData(object,checkResult);
|
|
|
this.doDeleteData(object, checkResult);
|
|
|
} else {
|
|
|
this.doUpdateData(object,checkResult);
|
|
|
}
|
|
|
this.doUpdateData(object, checkResult);
|
|
|
}
|
|
|
break;
|
|
|
} catch (Exception e) {
|
|
|
Thread.sleep(1000);
|
|
|
logger.error("doConsume happen error, json is {},isDelete is {}, exception is :", messageJsonObject,isDelete, e);
|
|
|
logger.error("doConsume happen error, json is {},isDelete is {}, exception is :", messageJsonObject, isDelete, e);
|
|
|
if (retryCount > getMaxRetryTime()) {
|
|
|
throw e;
|
|
|
}
|
...
|
...
|
@@ -157,31 +157,31 @@ public abstract class AbstractMqListener<T> implements MessageListener { |
|
|
return 5;
|
|
|
}
|
|
|
|
|
|
private void doDeleteData(T object,Object checkResult) throws Exception {
|
|
|
private void doDeleteData(T object, Object checkResult) throws Exception {
|
|
|
long begin = System.currentTimeMillis();
|
|
|
this.deleteData(object,checkResult);
|
|
|
long cost = System.currentTimeMillis() - begin;
|
|
|
logger.info("[func=doDeleteData][cost={}ms][eventName={}]", cost,this.getEventReportEnum().getFunctionName());
|
|
|
this.deleteData(object, checkResult);
|
|
|
long cost = System.currentTimeMillis() - begin;
|
|
|
logger.info("[func=doDeleteData][cost={}ms][eventName={}]", cost, this.getEventReportEnum().getFunctionName());
|
|
|
}
|
|
|
|
|
|
private void doUpdateData(T object,Object checkResult) throws Exception {
|
|
|
private void doUpdateData(T object, Object checkResult) throws Exception {
|
|
|
long begin = System.currentTimeMillis();
|
|
|
this.updateData(object,checkResult);
|
|
|
long cost = System.currentTimeMillis() - begin;
|
|
|
logger.info("[func=doUpdateData][cost={}ms][eventName={}]", cost,this.getEventReportEnum().getFunctionName());
|
|
|
this.updateData(object, checkResult);
|
|
|
long cost = System.currentTimeMillis() - begin;
|
|
|
logger.info("[func=doUpdateData][cost={}ms][eventName={}]", cost, this.getEventReportEnum().getFunctionName());
|
|
|
}
|
|
|
|
|
|
|
|
|
/**
|
|
|
* 删除数据
|
|
|
*/
|
|
|
protected abstract void deleteData(final T sourceObject,final Object checkResult) throws Exception;
|
|
|
protected abstract void deleteData(final T sourceObject, final Object checkResult) throws Exception;
|
|
|
|
|
|
|
|
|
/**
|
|
|
* 更新数据
|
|
|
*/
|
|
|
protected abstract void updateData(final T sourceObject,final Object checkResult) throws Exception;
|
|
|
protected abstract void updateData(final T sourceObject, final Object checkResult) throws Exception;
|
|
|
|
|
|
/**
|
|
|
* 定义事件类型
|
...
|
...
|
@@ -189,9 +189,11 @@ public abstract class AbstractMqListener<T> implements MessageListener { |
|
|
protected abstract EventEnum getEventReportEnum();
|
|
|
|
|
|
/**
|
|
|
* 数据检测
|
|
|
* @param sourceObject
|
|
|
* @param action 操作
|
|
|
* @return
|
|
|
*/
|
|
|
protected Object checkData(T sourceObject) {
|
|
|
protected Object checkData(T sourceObject, String action) {
|
|
|
return sourceObject;
|
|
|
}
|
|
|
|
...
|
...
|
|