Authored by wangnan

add

  1 +package com.yoho.search.consumer.index.increment;
  2 +
  3 +import com.alibaba.fastjson.JSONObject;
  4 +import com.rabbitmq.client.Channel;
  5 +import com.yoho.error.event.SearchEvent;
  6 +import com.yoho.search.base.utils.ConvertUtils;
  7 +import com.yoho.search.base.utils.EventReportEnum;
  8 +import com.yoho.search.base.utils.ISearchConstants;
  9 +import com.yoho.search.consumer.index.common.IYohoIndexService;
  10 +import com.yoho.search.consumer.service.base.ShopsBrandsService;
  11 +import com.yoho.search.core.es.model.ESBluk;
  12 +import com.yoho.search.core.es.utils.IgnoreSomeException;
  13 +import com.yoho.search.dal.model.ShopsBrands;
  14 +import org.apache.commons.lang.StringUtils;
  15 +import org.slf4j.Logger;
  16 +import org.slf4j.LoggerFactory;
  17 +import org.springframework.amqp.core.Message;
  18 +import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
  19 +import org.springframework.beans.factory.annotation.Autowired;
  20 +import org.springframework.stereotype.Component;
  21 +import org.springframework.util.CollectionUtils;
  22 +
  23 +import java.util.ArrayList;
  24 +import java.util.List;
  25 +import java.util.Map;
  26 +
  27 +/**
  28 + * Created by wangnan on 2016/12/1.
  29 + */
  30 +@Component
  31 +public class ShopsBrandsMqListener extends AbstractMqListener implements ChannelAwareMessageListener {
  32 + private static final Logger logger = LoggerFactory.getLogger(ShopsBrandsMqListener.class);
  33 + @Autowired
  34 + private IYohoIndexService indexService;
  35 + @Autowired
  36 + private ShopsBrandsService shopsBrandsService;
  37 + @Autowired
  38 + private ShopsLogicService shopsLogicService;
  39 +
  40 + @Override
  41 + public void onMessage(Message message, Channel channel) throws Exception {
  42 + try {
  43 + String messageStr = new String(message.getBody(), "UTF-8");
  44 + logger.info("[model=ShopsBrandsMqListener][message={}]", messageStr);
  45 + // 如果在重建索引等待
  46 + this.waitingRebuildingIndex();
  47 + JSONObject json = JSONObject.parseObject(messageStr);
  48 + String indexName = ISearchConstants.INDEX_NAME_SHOPS;
  49 + String idField = ISearchConstants.getKeyField(indexName);
  50 + if (ISearchConstants.ACTION_DELETE.equals(json.getString("action"))) {
  51 + deleteData(json.getString("data"), indexName, idField);
  52 + } else if (ISearchConstants.ACTION_UPDATE.equals(json.getString("action"))) {
  53 + updateData(json.getObject("data", Map.class), indexName, idField);
  54 + } else {
  55 + updateData(json.getObject("data", Map.class), indexName, idField);
  56 + }
  57 + } catch (Exception e) {
  58 + publisher.publishEvent(new SearchEvent(EventReportEnum.SHOPSBRADNSMQLISTENER_ONMESSAGE.getEventName(),
  59 + EventReportEnum.SHOPSBRADNSMQLISTENER_ONMESSAGE.getFunctionName(),
  60 + EventReportEnum.SHOPSBRADNSMQLISTENER_ONMESSAGE.getMoudleName(), "exception", IgnoreSomeException.filterSomeException(e), null));
  61 + Thread.sleep(1000);
  62 + throw e;
  63 + }
  64 + }
  65 +
  66 + @SuppressWarnings({"rawtypes", "unchecked"})
  67 + public void updateData(final Map data, final String indexName, final String idField) throws Exception {
  68 + long begin = System.currentTimeMillis();
  69 + ShopsBrands shopsBrands = new ShopsBrands();
  70 + shopsBrands = (ShopsBrands) ConvertUtils.toJavaBean(shopsBrands, data);
  71 + if (shopsBrands == null || shopsBrands.getShopsId() == null) {
  72 + return;
  73 + }
  74 + String idValue = shopsBrands.getId().toString();
  75 + shopsBrandsService.saveOrUpdate(shopsBrands);
  76 + logger.info("[func=updateData][step=saveToDb][indexName={}] [id={}][cost={}ms]", indexName, idValue, (System.currentTimeMillis() - begin));
  77 + List<Integer> ids = new ArrayList<>();
  78 + ids.add(shopsBrands.getShopsId());
  79 + List<ShopsBO> shopBOs = shopsLogicService.getShopsBOs(ids);
  80 + if (!CollectionUtils.isEmpty(shopBOs)) {
  81 + ShopsBO shopsBO = shopBOs.get(0);
  82 + List<ESBluk> results = new ArrayList<ESBluk>();
  83 + results.add(new ESBluk(JSONObject.toJSONString(this.beanToMap(shopsBO)), shopsBO.getShopsId().toString(), indexName, indexName, false));
  84 + indexService.bulk(results);
  85 + }
  86 + logger.info("[func=updateData][step=success][indexName={}] [id={}][cost={}ms]", indexName, idValue, (System.currentTimeMillis() - begin));
  87 + }
  88 +
  89 + public void deleteData(final String id, final String indexName, final String idField) throws Exception {
  90 + if (StringUtils.isBlank(id)) {
  91 + return;
  92 + }
  93 + long begin = System.currentTimeMillis();
  94 + ShopsBrands shopsBrands = shopsBrandsService.getById(Integer.valueOf(id));
  95 + shopsBrandsService.delete(Integer.valueOf(id));
  96 + logger.info("[func=deleteData][step=deleteFromDb][indexName={}] [id={}][cost={}ms]", indexName, id, (System.currentTimeMillis() - begin));
  97 + List<Integer> ids = new ArrayList<>();
  98 + ids.add(shopsBrands.getShopsId());
  99 + List<ShopsBO> shopBOs = shopsLogicService.getShopsBOs(ids);
  100 + if (!CollectionUtils.isEmpty(shopBOs)) {
  101 + ShopsBO shopsBO = shopBOs.get(0);
  102 + List<ESBluk> results = new ArrayList<ESBluk>();
  103 + results.add(new ESBluk(JSONObject.toJSONString(this.beanToMap(shopsBO)), shopsBO.getShopsId().toString(), indexName, indexName, false));
  104 + indexService.bulk(results);
  105 + }
  106 + logger.info("[func=deleteData][step=success][indexName={}][id={}][cost={}ms]", indexName, id, (System.currentTimeMillis() - begin));
  107 + }
  108 +
  109 + private Map<String, Object> beanToMap(ShopsBO shopsBO) {
  110 + JSONObject josnoJsonObject = (JSONObject) JSONObject.toJSON(shopsBO);
  111 + return josnoJsonObject;
  112 + }
  113 +}