Authored by 胡古飞

update

... ... @@ -4,6 +4,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
... ... @@ -37,83 +38,95 @@ public class PromotionProductMqListener extends AbstractMqListener implements Ch
@Override
public void onMessage(Message message, Channel channel) throws Exception {
try {
final String key = UUID.randomUUID().toString();
String messageStr = new String(message.getBody(), "UTF-8");
logger.info("[model=PromotionProductMqListener][key={}][message={}]", key, messageStr);
// 如果在重建索引等待
this.waitingRebuildingIndex();
final String key = UUID.randomUUID().toString();
String messageStr = new String(message.getBody(), "UTF-8");
logger.info("[model=PromotionProductMqListener][key={}][message={}]", key, messageStr);
// 如果在重建索引等待
this.waitingRebuildingIndex();
JSONObject json = JSONObject.parseObject(messageStr);
if (ISearchConstants.ACTION_DELETE.equals(json.getString("action"))) {
deleteData(json.getString("data"), key);
} else if (ISearchConstants.ACTION_UPDATE.equals(json.getString("action"))) {
updateData(json.getObject("data", Map.class), key);
} else {
updateData(json.getObject("data", Map.class), key);
}
} catch (Exception e) {
publisher.publishEvent(new SearchEvent(EventReportEnum.PROMOTIONPRODUCTMQLISTENER_ONMESSAGE.getEventName(), EventReportEnum.PROMOTIONPRODUCTMQLISTENER_ONMESSAGE
.getFunctionName(), EventReportEnum.PROMOTIONPRODUCTMQLISTENER_ONMESSAGE.getMoudleName(), "exception", IgnoreSomeException.filterSomeException(e), null));
Thread.sleep(1000);
throw e;
}
JSONObject json = JSONObject.parseObject(messageStr);
if (ISearchConstants.ACTION_DELETE.equals(json.getString("action"))) {
deleteData(json.getString("data"), key);
} else if (ISearchConstants.ACTION_UPDATE.equals(json.getString("action"))) {
updateData(json.getObject("data", Map.class), key, true);
} else {
updateData(json.getObject("data", Map.class), key, false);
}
} catch (Exception e) {
publisher.publishEvent(new SearchEvent(EventReportEnum.PROMOTIONPRODUCTMQLISTENER_ONMESSAGE.getEventName(), EventReportEnum.PROMOTIONPRODUCTMQLISTENER_ONMESSAGE
.getFunctionName(), EventReportEnum.PROMOTIONPRODUCTMQLISTENER_ONMESSAGE.getMoudleName(), "exception", IgnoreSomeException.filterSomeException(e), null));
Thread.sleep(1000);
throw e;
}
}
public void deleteData(final String id, final String key) throws Exception{
public void deleteData(final String id, final String key) throws Exception {
long begin = System.currentTimeMillis();
PromotionProduct promotionProduct = promotionProductService.selectById(Integer.parseInt(id));
if(promotionProduct==null){
return ;
if (promotionProduct == null || promotionProduct.getProductSkn()==null){
return;
}
Integer skn=promotionProduct.getProductSkn();
//1、删除数据库信息
promotionProductService.delete(Integer.parseInt(id));
logger.info("[func=deleteData][step=saveToBb][key={}][cost={}ms]", key, System.currentTimeMillis() - begin);
this.updateProductIndex(skn, System.currentTimeMillis(), key);
logger.info("[func=deleteData][step=updateProductIndex][productSkn={}][cost={}ms]", promotionProduct.getProductSkn(), System.currentTimeMillis() - begin);
//2、更新ES索引
Integer skn = promotionProduct.getProductSkn();
this.updateProductIndex(skn, System.currentTimeMillis(), key);
logger.info("[func=deleteData][step=updateProductIndex][productSkn={}][cost={}ms]",skn, System.currentTimeMillis() - begin);
}
@SuppressWarnings({ "unchecked", "rawtypes" })
public void updateData(final Map data, final String key) throws Exception {
public void updateData(final Map data, final String key, boolean isUpdate) throws Exception {
long begin = System.currentTimeMillis();
PromotionProduct promotionProduct = (PromotionProduct) ConvertUtils.transMap2Bean(PromotionProduct.class, data);
if (promotionProduct == null || promotionProduct.getPromotionId()== null|| promotionProduct.getProductSkn()== null) {
return;
}
boolean sknIsChange=false;
Integer oldSkn=null;
// 1、获取数据库原数据
if(promotionProduct.getId()!=null){
PromotionProduct promotionProductOld = promotionProductService.selectById(promotionProduct.getId());
if(!(promotionProductOld.getProductSkn().equals(promotionProduct.getProductSkn()))){
//修改了skn,需要改原skn的promotionIds
sknIsChange=true;
oldSkn=promotionProductOld.getProductSkn();
}
}
// 2、保存至数据库
promotionProductService.saveOrUpdate(promotionProduct);
logger.info("[func=updateData][step=saveToBb][key={}][cost={}ms]", key, System.currentTimeMillis() - begin);
if (promotionProduct == null || promotionProduct.getPromotionId() == null || promotionProduct.getProductSkn() == null || promotionProduct.getId() == null) {
return;
}
// 1、获取变更的新的SKN和老的SKN信息
Integer newProductSkn = promotionProduct.getProductSkn();
Integer oldProductSkn = null;
PromotionProduct promotionProductOld = null;
if (isUpdate) {
promotionProductOld = promotionProductService.selectById(promotionProduct.getId());
}
if (promotionProductOld != null) {
oldProductSkn = promotionProductOld.getProductSkn();
}
logger.info("[func=updateData][step=getSknInfo][key={}][cost={}ms]", key, System.currentTimeMillis() - begin);
// 3、更新ProductIndex
this.updateProductIndex(promotionProduct.getProductSkn(), System.currentTimeMillis(), key);
logger.info("[func=updateData][step=updateProductIndex][productSkn={}][cost={}ms]", promotionProduct.getProductSkn(), System.currentTimeMillis() - begin);
if(sknIsChange){
this.updateProductIndex(oldSkn, System.currentTimeMillis(), key);
logger.info("[func=updateData][step=updateProductIndex][productSkn={}][cost={}ms]", oldSkn, System.currentTimeMillis() - begin);
}
// 2、保存至数据库
begin = System.currentTimeMillis();
promotionProductService.saveOrUpdate(promotionProduct);
logger.info("[func=updateData][step=saveToBb][key={}][cost={}ms]", key, System.currentTimeMillis() - begin);
// 3、更新ProductIndex
this.updateProductIndex(newProductSkn, System.currentTimeMillis(), key);
logger.info("[func=updateData][step=updateProductIndex][productSkn={}][cost={}ms]",newProductSkn, System.currentTimeMillis() - begin);
// 4、如果老的存在并且被更新掉了,则同时更新老SKN的信息
if (oldProductSkn != null && !oldProductSkn.equals(newProductSkn)) {
this.updateProductIndex(oldProductSkn, System.currentTimeMillis(), key);
logger.info("[func=updateData][step=updateProductIndex][productSkn={}][cost={}ms]",oldProductSkn, System.currentTimeMillis() - begin);
}
}
private void updateProductIndex(Integer productSkn, long currentTimeMillis,String key){
String promotionIds=promotionProductService.getPromotionIds(productSkn);
Integer productId=productService.selectProductIdBySkn(productSkn);
logger.info("[func=updateProductIndex][key={}][productId={}][promotionIds={}]", key,productId, promotionIds);
private void updateProductIndex(Integer productSkn, long currentTimeMillis, String key) {
Integer productId = productService.selectProductIdBySkn(productSkn);
if(productId==null){
return;
}
String promotionIds = promotionProductService.getPromotionIds(productSkn);
if(StringUtils.isBlank(promotionIds)){
promotionIds = "";
}
logger.info("[func=updateProductIndex][key={}][productId={}][promotionIds={}]", key, productId, promotionIds);
Map<String, Object> indexData = new HashMap<String, Object>();
indexData.put("productId", productId);
indexData.put("promotionIds", promotionIds);
indexData.put("productId", productId);
indexData.put("promotionIds", promotionIds);
// 更新商品索引
this.updateProductIndexWithDataMap(indexData, productId, key, System.currentTimeMillis());
this.updateProductIndexWithDataMap(indexData, productId, key, System.currentTimeMillis());
}
}
... ...