Authored by wangnan

优化mqlinster代码结构,减少重复代码

package com.yoho.search.consumer.index.increment.productIndex;
package com.yoho.search.consumer.index.increment.db;
import com.rabbitmq.client.Channel;
import com.yoho.error.event.SearchEvent;
... ...
package com.yoho.search.consumer.index.increment.productIndex;
package com.yoho.search.consumer.index.increment.db;
import com.rabbitmq.client.Channel;
import com.yoho.error.event.SearchEvent;
import com.yoho.search.base.utils.ConvertUtils;
import com.yoho.search.base.utils.EventReportEnum;
import com.yoho.search.consumer.index.increment.AbstractMqListener;
import com.yoho.search.consumer.index.increment.productIndex.ProductAttributePropertyValuesMqListener;
import com.yoho.search.consumer.service.base.ProductAttributeService;
import com.yoho.search.core.es.utils.IgnoreSomeException;
import com.yoho.search.dal.model.ProductAttribute;
... ...
package com.yoho.search.consumer.index.increment.productIndex;
package com.yoho.search.consumer.index.increment.db;
import com.rabbitmq.client.Channel;
import com.yoho.error.event.SearchEvent;
... ...
... ... @@ -67,7 +67,7 @@ public class ProductExtMqListener extends AbstractMqListener implements ChannelA
Map<String, Object> esData = new HashMap<String, Object>();
esData.put("sknDefaultImg", productExtWithBLOBs.getSknDefaultImg() == null ? "" : productExtWithBLOBs.getSknDefaultImg());
indexService.updateIndexData(indexName, product.getId().toString(), esData);
logger.info("[func=updateData][step=success][indexName={}][skn={}][cost={}ms]", indexName, productExtWithBLOBs.getProductSkn(), (System.currentTimeMillis() - begin));
logger.info("[func=updateData][indexName={}][skn={}][cost={}ms]", indexName, productExtWithBLOBs.getProductSkn(), (System.currentTimeMillis() - begin));
}
@Override
... ... @@ -88,7 +88,7 @@ public class ProductExtMqListener extends AbstractMqListener implements ChannelA
Map<String, Object> data = new HashMap<String, Object>();
data.put("sknDefaultImg", "");
indexService.updateIndexData(indexName, product.getId().toString(), data);
logger.info("[func=deleteData][step=success][indexName={}][skn={}][cost={}ms]", indexName, skn, (System.currentTimeMillis() - begin));
logger.info("[func=deleteData][indexName={}][skn={}][cost={}ms]", indexName, skn, (System.currentTimeMillis() - begin));
}
@Override
... ...
... ... @@ -55,7 +55,7 @@ public class ProductKeywordsMqListener extends AbstractMqListener implements Cha
return;
}
productKeywordsService.saveOrUpdate(productKeywords);
logger.info("[func=updateData][step=saveToBb][key={}][cost={}ms]", key, this.getCost(begin));
logger.info("[func=updateData][key={}][cost={}ms]", key, this.getCost(begin));
updateProductIndex(productId, productKeywords.getProductKeyword(), System.currentTimeMillis(), key);
}
... ... @@ -70,8 +70,8 @@ public class ProductKeywordsMqListener extends AbstractMqListener implements Cha
return;
}
productKeywordsService.delete(productId);
logger.info("[func=deleteData][step=success][key={}][indexName={}][id={}][cost={}ms]", key, ISearchConstants.INDEX_NAME_PRODUCT_INDEX, id, this.getCost(begin));
updateProductIndex(productId, null, System.currentTimeMillis(), key);
logger.info("[func=deleteData][key={}][indexName={}][id={}][cost={}ms]", key, ISearchConstants.INDEX_NAME_PRODUCT_INDEX, id, this.getCost(begin));
updateProductIndex(productId, "", System.currentTimeMillis(), key);
}
@Override
... ...
... ... @@ -73,10 +73,10 @@ public class ProductPoolDetailMqListener extends AbstractMqListener implements C
}
// 1)更新DB
productPoolDetailService.saveOrUpdate(productPoolDetail);
logger.info("[func=updateData][step(1)of(2)saveToBb][key={}][cost={}ms]", key, costStatistics.getCost());
logger.info("[func=updateData][step(1/2)saveToBb][key={}][cost={}ms]", key, costStatistics.getCost());
// 2)更新ProductIndex
updateProductIndex(productPoolDetail, System.currentTimeMillis(), key);
logger.info("[func=updateData][step(2)of(2)update ProductIndexBO][key={}][indexName={}][skn={}][cost={}ms]", key, ISearchConstants.INDEX_NAME_PRODUCT_INDEX,
logger.info("[func=updateData][step(2/2)update ProductIndex][key={}][indexName={}][skn={}][cost={}ms]", key, ISearchConstants.INDEX_NAME_PRODUCT_INDEX,
productPoolDetail.getProductSkn(), costStatistics.getCost());
}
... ... @@ -92,11 +92,11 @@ public class ProductPoolDetailMqListener extends AbstractMqListener implements C
}
// 1)从DB中删除
productPoolDetailService.deleteByPrimaryKey(Integer.valueOf(id));
logger.info("[func=deleteData][step(1)of(2)deleteFromBb][key={}][cost={}ms]", key, System.currentTimeMillis() - begin);
logger.info("[func=deleteData][step(1/2)deleteFromBb][key={}][cost={}ms]", key, System.currentTimeMillis() - begin);
// 2)删除ProductIndex中poolId字段
if (productPoolDetail != null && productPoolDetail.getProductSkn() != null) {
updateProductIndex(productPoolDetail, begin, key);
logger.info("[func=deleteData][step(2)of(2) delete ProductIndexBO][key={}][indexName={}][id={}][cost={}ms]", key, ISearchConstants.INDEX_NAME_PRODUCT_INDEX, id,
logger.info("[func=deleteData][step(2/2)update ProductIndex][key={}][indexName={}][id={}][cost={}ms]", key, ISearchConstants.INDEX_NAME_PRODUCT_INDEX, id,
(System.currentTimeMillis() - begin));
}
}
... ...
... ... @@ -32,7 +32,7 @@ public class ProductStandardRelationMqListener extends AbstractMqListener implem
try {
final String key = UUID.randomUUID().toString();
String messageStr = new String(message.getBody(), "UTF-8");
logger.info("[model=ProductStandardRelationMqListener][key={}][message={}]", key, messageStr);
logger.info("[key={}][message={}]", key, messageStr);
// 如果在重建索引等待
this.waitingRebuildingIndex();
JSONObject json = JSONObject.parseObject(messageStr);
... ... @@ -52,7 +52,6 @@ public class ProductStandardRelationMqListener extends AbstractMqListener implem
}
}
@SuppressWarnings({ "rawtypes", "unchecked" })
public void updateData(final Map data, final String key) {
long begin = System.currentTimeMillis();
ProductStandardRelation productStandardRelation = new ProductStandardRelation();
... ... @@ -62,11 +61,10 @@ public class ProductStandardRelationMqListener extends AbstractMqListener implem
return;
}
productStandardRelationService.saveOrUpdate(productStandardRelation);
logger.info("[func=updateData][step=saveToBb][key={}][cost={}ms]", key, System.currentTimeMillis() - begin);
logger.info("[func=updateData][key={}][cost={}ms]", key, System.currentTimeMillis() - begin);
this.updateProductIndex(productId, System.currentTimeMillis(), key);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
public void deleteData(final Map data, final String key) {
long begin = System.currentTimeMillis();
ProductStandardRelation psr = new ProductStandardRelation();
... ... @@ -77,11 +75,10 @@ public class ProductStandardRelationMqListener extends AbstractMqListener implem
int productId = psr.getProductId();
Integer standardId = psr.getStandardId();
productStandardRelationService.delete(productId, standardId);
logger.info("[func=deleteData][step=deleteFromBb][key={}][cost={}ms]", key, System.currentTimeMillis() - begin);
logger.info("[func=deleteData][key={}][cost={}ms]", key, System.currentTimeMillis() - begin);
this.updateProductIndex(productId, System.currentTimeMillis(), key);
}
@SuppressWarnings("rawtypes")
private void updateProductIndex(Integer productId, long begin, final String key) {
List<Map> productStandards = productStandardRelationService.getStandardListByProductId(productId);
Set<String> standardIdSet = new HashSet<String>();
... ...
... ... @@ -52,7 +52,7 @@ public class ProductStyleRelationMqListener extends AbstractMqListener implement
return;
}
productStyleRelationService.saveOrUpdate(productStyleRelation);
logger.info("[func=updateData][step=saveToBb][key={}][cost={}ms]", key, System.currentTimeMillis() - begin);
logger.info("[func=updateData][key={}][cost={}ms]", key, System.currentTimeMillis() - begin);
updateProductIndex(productId, System.currentTimeMillis(), key);
}
... ... @@ -76,7 +76,7 @@ public class ProductStyleRelationMqListener extends AbstractMqListener implement
return;
}
productStyleRelationService.delete(relationId);
logger.info("[func=deleteData][step=deleteFromDb][key={}][cost={}ms]", key, System.currentTimeMillis() - begin);
logger.info("[func=deleteData][key={}][cost={}ms]", key, System.currentTimeMillis() - begin);
updateProductIndex(productId, begin, key);
}
... ... @@ -95,6 +95,6 @@ public class ProductStyleRelationMqListener extends AbstractMqListener implement
indexData.put("productId", productId);
indexData.put("styleIds", StringUtils.join(styleIdSet, ","));
this.updateProductIndexWithDataMap(indexData, productId, key, begin);
logger.info("[func=updateProductIndex][step=updateProductIndex][key={}][productId={}][cost={}ms]", key, productId,System.currentTimeMillis() - begin);
logger.info("[func=updateProductIndex][key={}][productId={}][cost={}ms]", key, productId,System.currentTimeMillis() - begin);
}
}
... ...