|
|
package com.yoho.search.comsumer.mq;
|
|
|
package com.yoho.search.mq;
|
|
|
|
|
|
import com.alibaba.fastjson.JSONObject;
|
|
|
import com.rabbitmq.client.Channel;
|
|
|
import com.yoho.search.comsumer.index.IYohoIndexService;
|
|
|
import com.yoho.search.comsumer.service.IProductIndexBuildService;
|
|
|
import com.yoho.search.base.utils.ConvertUtils;
|
|
|
import com.yoho.search.base.utils.EventReportEnum;
|
|
|
import com.yoho.search.dal.service.ProductPriceService;
|
|
|
import com.yoho.search.core.es.utils.IgnoreSomeException;
|
|
|
import com.yoho.error.event.SearchEvent;
|
|
|
import com.yoho.search.base.utils.ISearchConstans;
|
|
|
import com.yoho.search.common.CostStatistics;
|
|
|
import com.yoho.search.dal.model.ProductPoolDetail;
|
|
|
import com.yoho.search.dal.model.ProductPoolDetailSkn;
|
|
|
import com.yoho.search.dal.service.ProductPoolDetailService;
|
|
|
import com.yoho.search.dal.service.ProductService;
|
|
|
import com.yoho.search.index.service.IYohoIndexService;
|
|
|
import com.yoho.search.utils.ConvertUtils;
|
|
|
import com.yoho.search.utils.EventReportEnum;
|
|
|
import com.yoho.search.utils.ISearchConstans;
|
|
|
import com.yoho.search.utils.IgnoreSomeException;
|
|
|
import org.apache.commons.lang.StringUtils;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
...
|
...
|
@@ -22,9 +20,7 @@ import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; |
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
import java.util.HashMap;
|
|
|
import java.util.Map;
|
|
|
import java.util.UUID;
|
|
|
import java.util.*;
|
|
|
|
|
|
/**
|
|
|
* Created by wangnan on 2016/8/17.
|
...
|
...
|
@@ -44,17 +40,8 @@ public class ProductPoolDetailMqListener extends AbstractMqListener implements C |
|
|
private IYohoIndexService indexService;
|
|
|
|
|
|
@Autowired
|
|
|
private ProductPriceService productPriceService;
|
|
|
|
|
|
@Autowired
|
|
|
private IProductIndexBuildService iProductIndexBuildService;
|
|
|
|
|
|
@Autowired
|
|
|
private ProductPoolIndexService productPoolIndexService;
|
|
|
|
|
|
// @Autowired
|
|
|
// private Product15DaySalesNumService product15DaySalesNumService;
|
|
|
|
|
|
@Override
|
|
|
public void onMessage(Message message, Channel channel) throws Exception {
|
|
|
try {
|
...
|
...
|
@@ -88,7 +75,7 @@ public class ProductPoolDetailMqListener extends AbstractMqListener implements C |
|
|
* @param key
|
|
|
*/
|
|
|
public void updateData(final Map data, final String key) {
|
|
|
long begin = System.currentTimeMillis();
|
|
|
CostStatistics costStatistics = new CostStatistics();
|
|
|
ProductPoolDetail productPoolDetail = new ProductPoolDetail();
|
|
|
productPoolDetail = (ProductPoolDetail) ConvertUtils.toJavaBean(productPoolDetail, data);
|
|
|
if (productPoolDetail == null || productPoolDetail.getId() == null) {
|
...
|
...
|
@@ -104,17 +91,17 @@ public class ProductPoolDetailMqListener extends AbstractMqListener implements C |
|
|
} else if ("3".equals(productPoolDetail.getProductType())) {
|
|
|
productPoolDetail.setProductType("1");
|
|
|
}
|
|
|
//更新DB
|
|
|
//1)更新DB
|
|
|
productPoolDetailService.saveOrUpdate(productPoolDetail);
|
|
|
logger.info("[func=updateData][step=saveToBb][key={}][cost={}ms]", key, System.currentTimeMillis() - begin);
|
|
|
//更新ProductIndex
|
|
|
logger.info("[func=updateData][step(1)of(3)saveToBb][key={}][cost={}ms]", key, costStatistics.getCost());
|
|
|
//2)更新ProductIndex
|
|
|
updateProductIndex(productPoolDetail, System.currentTimeMillis(), key);
|
|
|
logger.info("[func=updateData][step=success][key={}][indexName={}][skn={}][cost={}ms]", key, ISearchConstans.INDEX_NAME_PRODUCT_INDEX, productPoolDetail.getProductSkn(),
|
|
|
(System.currentTimeMillis() - begin));
|
|
|
//更新ProductPoolIndex
|
|
|
logger.info("[func=updateData][step(2)of(3)update ProductIndex][key={}][indexName={}][skn={}][cost={}ms]", key, ISearchConstans.INDEX_NAME_PRODUCT_INDEX, productPoolDetail.getProductSkn(),
|
|
|
costStatistics.getCost());
|
|
|
//3)更新ProductPoolIndex
|
|
|
productPoolIndexService.updateProductPoolIndex(productPoolDetail.getId(), System.currentTimeMillis(), key);
|
|
|
logger.info("[func=updateData][step=success][key={}][indexName={}][skn={}][cost={}ms]", key, ISearchConstans.INDEX_NAME_PRODUCT_POOL, productPoolDetail.getProductSkn(),
|
|
|
(System.currentTimeMillis() - begin));
|
|
|
logger.info("[func=updateData][step(3)of(3)ProductPoolIndex][key={}][indexName={}][skn={}][cost={}ms]", key, ISearchConstans.INDEX_NAME_PRODUCT_POOL, productPoolDetail.getProductSkn(),
|
|
|
costStatistics.getCost());
|
|
|
}
|
|
|
|
|
|
/**
|
...
|
...
|
@@ -133,18 +120,18 @@ public class ProductPoolDetailMqListener extends AbstractMqListener implements C |
|
|
if (productPoolDetail == null || productPoolDetail.getId() == null) {
|
|
|
return;
|
|
|
}
|
|
|
//从DB中删除
|
|
|
//1)从DB中删除
|
|
|
productPoolDetailService.deleteByPrimaryKey(Integer.valueOf(id));
|
|
|
logger.info("[func=deleteData][step=deleteFromBb][key={}][cost={}ms]", key, System.currentTimeMillis() - begin);
|
|
|
//删除ProductIndex中poolId字段
|
|
|
logger.info("[func=deleteData][step(1)of(3)deleteFromBb][key={}][cost={}ms]", key, System.currentTimeMillis() - begin);
|
|
|
//2)删除ProductIndex中poolId字段
|
|
|
if (productPoolDetail != null && productPoolDetail.getProductSkn() != null) {
|
|
|
updateProductIndex(productPoolDetail, begin, key);
|
|
|
logger.info("[func=deleteData][step=success][key={}][indexName={}][id={}][cost={}ms]", key, ISearchConstans.INDEX_NAME_PRODUCT_INDEX, id,
|
|
|
logger.info("[func=deleteData][step(2)of(3) delete ProductIndex][key={}][indexName={}][id={}][cost={}ms]", key, ISearchConstans.INDEX_NAME_PRODUCT_INDEX, id,
|
|
|
(System.currentTimeMillis() - begin));
|
|
|
}
|
|
|
//删除ProductPool的记录
|
|
|
indexService.deleteIndexData(ISearchConstans.INDEX_NAME_PRODUCT_POOL, productPoolDetail.getId().toString());
|
|
|
logger.info("[func=deleteData][step=success][key={}][indexName={}][id={}][cost={}ms]", key, ISearchConstans.INDEX_NAME_PRODUCT_POOL, id,
|
|
|
//3)删除ProductPool的记录
|
|
|
productPoolIndexService.deleteProductPoolIndex(productPoolDetail.getId(), System.currentTimeMillis(), key);
|
|
|
logger.info("[func=deleteData][step(3)of(3) delete ProductPool][key={}][indexName={}][id={}][cost={}ms]", key, ISearchConstans.INDEX_NAME_PRODUCT_POOL, id,
|
|
|
(System.currentTimeMillis() - begin));
|
|
|
}
|
|
|
|
...
|
...
|
@@ -157,17 +144,31 @@ public class ProductPoolDetailMqListener extends AbstractMqListener implements C |
|
|
*/
|
|
|
private void updateProductIndex(ProductPoolDetail productPoolDetail, long begin, final String key) {
|
|
|
if (productPoolDetail.getProductSkn() != null) {
|
|
|
//查视图productPoolDetailSkns,通过productId更新ProductIndex中的poolId字段数据
|
|
|
ProductPoolDetailSkn productPoolDetailSkns = productPoolDetailService.getProductPoolDetailBySkn(productPoolDetail.getProductSkn());
|
|
|
if (productPoolDetailSkns != null) {
|
|
|
Integer productId = productService.selectProductIdBySkn(productPoolDetail.getProductSkn());
|
|
|
if (productId != null) {
|
|
|
Map<String, Object> indexData = new HashMap<String, Object>();
|
|
|
indexData.put("productId", productId);
|
|
|
indexData.put("poolId", productPoolDetailSkns.getPoolId());
|
|
|
this.updateProductIndexWithDataMap(indexData, productId, key, begin);
|
|
|
//把每个skn对应的多个productpool的poolid拼起来
|
|
|
List<ProductPoolDetail> productPoolDetails = productPoolDetailService.selectByProductSkn(productPoolDetail.getProductSkn());
|
|
|
//先根据poolId去重
|
|
|
Set<Integer> productPoolDetailsSet = new HashSet<>();
|
|
|
List<ProductPoolDetail> productPoolDetailsTemp = new ArrayList<>();
|
|
|
for (ProductPoolDetail p : productPoolDetails) {
|
|
|
if(!productPoolDetailsSet.contains(p.getPoolId())){
|
|
|
productPoolDetailsSet.add(p.getPoolId());
|
|
|
productPoolDetailsTemp.add(p);
|
|
|
}
|
|
|
}
|
|
|
StringBuffer poolId = new StringBuffer();
|
|
|
for (ProductPoolDetail p : productPoolDetailsTemp) {
|
|
|
if (p.getProductType().equals("1")) {
|
|
|
poolId.append(p.getPoolId());
|
|
|
poolId.append(",");
|
|
|
}
|
|
|
}
|
|
|
Integer productId = productService.selectProductIdBySkn(productPoolDetail.getProductSkn());
|
|
|
if (productId != null) {
|
|
|
Map<String, Object> indexData = new HashMap<String, Object>();
|
|
|
indexData.put("productId", productId);
|
|
|
indexData.put("poolId", poolId.toString());
|
|
|
this.updateProductIndexWithDataMap(indexData, productId, key, begin);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
...
|
...
|
|