Authored by wangnan

优化mqlinster代码结构,减少重复代码

... ... @@ -30,8 +30,8 @@ import java.util.concurrent.Executors;
* Created by wangnan on 2016/12/23.
*/
@Component
public class GlobalndexBulkService implements ApplicationEventPublisherAware {
private static final Logger logger = LoggerFactory.getLogger(GlobalndexBulkService.class);
public class GlobalIndexBulkService implements ApplicationEventPublisherAware {
private static final Logger logger = LoggerFactory.getLogger(GlobalIndexBulkService.class);
protected ApplicationEventPublisher publisher;
... ...
package com.yoho.search.consumer.index.increment.otherIndex;
package com.yoho.search.consumer.index.increment.db;
import com.rabbitmq.client.Channel;
import com.yoho.error.event.SearchEvent;
... ... @@ -17,8 +17,6 @@ import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
... ... @@ -71,11 +69,8 @@ public class ShopsBrandsMqListener extends AbstractMqListener implements Channel
return;
}
long begin = System.currentTimeMillis();
ShopsBrands shopsBrands = shopsBrandsService.getById(Integer.valueOf(id));
shopsBrandsService.delete(Integer.valueOf(id));
logger.info("[func=deleteData][step=deleteFromDb][indexName={}] [id={}][cost={}ms]", indexName, id, (System.currentTimeMillis() - begin));
List<Integer> ids = new ArrayList<>();
ids.add(shopsBrands.getShopsId());
}
}
... ...
... ... @@ -81,7 +81,7 @@ public class ShopsMqListener extends AbstractMqListener implements ChannelAwareM
List<ESBluk> results = new ArrayList<ESBluk>();
results.add(new ESBluk(JSONObject.toJSONString(this.beanToMap(shops)), shops.getShopsId().toString(), indexName, indexName, false));
indexService.bulk(results);
logger.info("[func=updateData][step=success][indexName={}] [id={}][cost={}ms]", indexName, idValue, (System.currentTimeMillis() - begin));
logger.info("[func=updateData][step=updateIndex][indexName={}] [id={}][cost={}ms]", indexName, idValue, (System.currentTimeMillis() - begin));
}
@Override
... ... @@ -95,11 +95,11 @@ public class ShopsMqListener extends AbstractMqListener implements ChannelAwareM
List<ESBluk> results = new ArrayList<ESBluk>();
results.add(new ESBluk(null, id, indexName, indexName, true));
indexService.bulk(results);
logger.info("[func=deleteData][step=success][indexName={}][id={}][cost={}ms]", indexName, id, (System.currentTimeMillis() - begin));
logger.info("[func=deleteData][step=updateIndex][indexName={}][id={}][cost={}ms]", indexName, id, (System.currentTimeMillis() - begin));
}
private Map<String, Object> beanToMap(Shops shops) {
JSONObject josnoJsonObject = (JSONObject) JSONObject.toJSON(shops);
return josnoJsonObject;
JSONObject jsonJsonObject = (JSONObject) JSONObject.toJSON(shops);
return jsonJsonObject;
}
}
... ...
... ... @@ -71,7 +71,7 @@ public class SizeMqListener extends AbstractMqListener implements ChannelAwareMe
sizeService.saveOrUpdate(size);
logger.info("[func=updateData][step=saveToDb][indexName={}] [id={}][cost={}ms]", indexName, idValue, (System.currentTimeMillis() - begin));
indexService.updateIndexData(indexName, idValue, size);
logger.info("[func=updateData][step=success][indexName={}] [id={}][cost={}ms]", indexName, idValue, (System.currentTimeMillis() - begin));
logger.info("[func=updateData][step=updateIndex][indexName={}] [id={}][cost={}ms]", indexName, idValue, (System.currentTimeMillis() - begin));
}
@Override
... ... @@ -83,6 +83,6 @@ public class SizeMqListener extends AbstractMqListener implements ChannelAwareMe
sizeService.delete(Integer.valueOf(id));
logger.info("[func=deleteData][step=deleteFromDb][indexName={}] [id={}][cost={}ms]", indexName, id, (System.currentTimeMillis() - begin));
indexService.deleteIndexData(indexName, id);
logger.info("[func=deleteData][step=success][indexName={}][id={}][cost={}ms]", indexName, id, (System.currentTimeMillis() - begin));
logger.info("[func=deleteData][step=updateIndex][indexName={}][id={}][cost={}ms]", indexName, id, (System.currentTimeMillis() - begin));
}
}
... ...
... ... @@ -62,7 +62,7 @@ public class StyleMqListener extends AbstractMqListener implements ChannelAwareM
styleService.saveOrUpdate(style);
String idValue = style.getId().toString();
indexService.updateIndexData(indexName, idValue, style);
logger.info("[func=updateData][step=success][indexName={}][id={}][cost={}ms]", indexName, idValue, (System.currentTimeMillis() - begin));
logger.info("[func=updateData][indexName={}][id={}][cost={}ms]", indexName, idValue, (System.currentTimeMillis() - begin));
}
@Override
... ... @@ -73,6 +73,6 @@ public class StyleMqListener extends AbstractMqListener implements ChannelAwareM
long begin = System.currentTimeMillis();
styleService.delete(Integer.valueOf(id));
indexService.deleteIndexData(indexName, id);
logger.info("[func=deleteData][step=success][indexName={}][id={}][cost={}ms]", indexName, id, (System.currentTimeMillis() - begin));
logger.info("[func=deleteData][indexName={}][id={}][cost={}ms]", indexName, id, (System.currentTimeMillis() - begin));
}
}
... ...
... ... @@ -61,8 +61,7 @@ public class SuggestConversionCustomMqListener extends AbstractMqListener implem
}
// 更新数据库
suggestConversionCustomService.saveOrUpdate(suggestConversionCustom);
logger.info("[func=updateData][step=success][id={}][cost={}ms]", suggestConversionCustom.getId(), (System.currentTimeMillis() - begin));
logger.info("[func=updateData][id={}][cost={}ms]", suggestConversionCustom.getId(), (System.currentTimeMillis() - begin));
// 更新索引
Map<String, Object> dataMap = new HashMap<>();
dataMap.put("source", suggestConversionCustom.getSource());
... ... @@ -70,7 +69,7 @@ public class SuggestConversionCustomMqListener extends AbstractMqListener implem
dataMap.put("updateTime", suggestConversionCustom.getUpdateTime());
dataMap.put("status", suggestConversionCustom.getStatus());
indexService.updateIndexData(ISearchConstants.INDEX_NAME_CONVERSION, MD5Util.string2MD5(dataMap.get("source").toString().trim().toLowerCase()), dataMap);
logger.info("[func=updateDataToES][step=success][source={}][cost={}ms]", suggestConversionCustom.getSource(), (System.currentTimeMillis() - begin));
logger.info("[func=updateDataToES][source={}][cost={}ms]", suggestConversionCustom.getSource(), (System.currentTimeMillis() - begin));
}
@Override
... ... @@ -79,14 +78,12 @@ public class SuggestConversionCustomMqListener extends AbstractMqListener implem
return;
}
long begin = System.currentTimeMillis();
SuggestConversionCustom sourceData = suggestConversionCustomService.selectByPrimaryKey(Integer.valueOf(id));
SuggestConversion suggestConversion;
if (sourceData != null) {
// 删除数据
suggestConversionCustomService.delete(Integer.valueOf(id));
logger.info("[func=deleteData][step=success][id={}][cost={}ms]", id, System.currentTimeMillis() - begin);
logger.info("[func=deleteData][id={}][cost={}ms]", id, System.currentTimeMillis() - begin);
if ((suggestConversion = suggestConversionService.selectBySource(sourceData.getSource())) != null) {
// 还原成自动爬虫生成的数据
Map<String, Object> dataMap = new HashMap<>();
... ... @@ -95,10 +92,10 @@ public class SuggestConversionCustomMqListener extends AbstractMqListener implem
dataMap.put("updateTime", suggestConversion.getUpdateTime());
dataMap.put("status", SuggestionConstants.VALID_STATUS);
indexService.updateIndexData(ISearchConstants.INDEX_NAME_CONVERSION, MD5Util.string2MD5(sourceData.getSource().trim().toLowerCase()), dataMap);
logger.info("[func=updateDataToES][step=success][id={}][cost={}ms]", id, System.currentTimeMillis() - begin);
logger.info("[func=updateDataToES][id={}][cost={}ms]", id, System.currentTimeMillis() - begin);
} else {
indexService.deleteIndexData(ISearchConstants.INDEX_NAME_CONVERSION, MD5Util.string2MD5(sourceData.getSource().trim().toLowerCase()));
logger.info("[func=deleteDataFromES][step=success][id={}][cost={}ms]", id, System.currentTimeMillis() - begin);
logger.info("[func=deleteDataFromES][id={}][cost={}ms]", id, System.currentTimeMillis() - begin);
}
}
}
... ...
... ... @@ -67,7 +67,7 @@ public class SuggestWordCustomMqListener extends AbstractMqListener implements C
// 更新数据库
suggestWordCustomService.saveOrUpdate(suggestWordCustom);
logger.info("[func=updateData][step=success][id={}][cost={}ms]", suggestWordCustom.getId(), (System.currentTimeMillis() - begin));
logger.info("[func=updateData][id={}][cost={}ms]", suggestWordCustom.getId(), (System.currentTimeMillis() - begin));
// 更新索引
String standardKeyword = CharUtils.standardized(suggestWordCustom.getKeyword());
... ... @@ -87,7 +87,7 @@ public class SuggestWordCustomMqListener extends AbstractMqListener implements C
}
indexService.updateIndexData(ISearchConstants.INDEX_NAME_SUGGEST, MD5Util.string2MD5(standardKeyword), dataMap);
logger.info("[func=updateDataToES][step=success][keyword={}][cost={}ms]", suggestWordCustom.getKeyword(), (System.currentTimeMillis() - begin));
logger.info("[func=updateDataToES][keyword={}][cost={}ms]", suggestWordCustom.getKeyword(), (System.currentTimeMillis() - begin));
}
@Override
... ... @@ -120,10 +120,10 @@ public class SuggestWordCustomMqListener extends AbstractMqListener implements C
dataMap.put("weight", suggestWordDef.getWeight());
dataMap.put("status", suggestWordDef.getStatus());
indexService.updateIndexData(ISearchConstants.INDEX_NAME_SUGGEST, MD5Util.string2MD5(standardKeyword), dataMap);
logger.info("[func=updateDataToES][step=success][id={}][cost={}ms]", id, System.currentTimeMillis() - begin);
logger.info("[func=updateDataToES][id={}][cost={}ms]", id, System.currentTimeMillis() - begin);
} else {
indexService.deleteIndexData(ISearchConstants.INDEX_NAME_SUGGEST, MD5Util.string2MD5(CharUtils.standardized(sourceData.getKeyword())));
logger.info("[func=deleteDataFromES][step=success][id={}][cost={}ms]", id, System.currentTimeMillis() - begin);
logger.info("[func=deleteDataFromES][id={}][cost={}ms]", id, System.currentTimeMillis() - begin);
}
}
}
... ...
... ... @@ -88,18 +88,20 @@ public class ActivityProductMqListener extends AbstractMqListener implements Cha
return;
}
List<ActivityProduct> activityProducts = activityProductService.getBySkn(productSkn);
if (CollectionUtils.isEmpty(activityProducts)) {
return;
}
JSONArray jsonArray = new JSONArray();
for (ActivityProduct activityProduct : activityProducts) {
jsonArray.add(activityProductService.getActivities(activityProduct));
}
Map<String, Object> indexData = new HashMap<>();
indexData.put("productId", productId);
indexData.put("activities", jsonArray);
if (CollectionUtils.isEmpty(activityProducts)) {
indexData.put("activities", "");
}else{
for (ActivityProduct activityProduct : activityProducts) {
jsonArray.add(activityProductService.getActivities(activityProduct));
}
indexData.put("activities", jsonArray);
}
this.updateProductIndexWithDataMap(indexData, Integer.valueOf(productId), key, begin);
logger.info(" [func=updateProductIndex][key={}][productId={}][cost={}ms]", key, productId,
(System.currentTimeMillis() - begin));
}
}
... ...
... ... @@ -7,7 +7,7 @@ import com.yoho.search.base.utils.ConvertUtils;
import com.yoho.search.base.utils.EventReportEnum;
import com.yoho.search.base.utils.ISearchConstants;
import com.yoho.search.consumer.index.increment.AbstractMqListener;
import com.yoho.search.consumer.index.increment.bulks.GlobalndexBulkService;
import com.yoho.search.consumer.index.increment.bulks.GlobalIndexBulkService;
import com.yoho.search.consumer.service.base.TblProductService;
import com.yoho.search.core.es.utils.IgnoreSomeException;
import com.yoho.search.dal.model.TblProduct;
... ... @@ -35,7 +35,7 @@ public class TblProductMqListener extends AbstractMqListener implements ChannelA
@Autowired
private TblProductService tblProductService;
@Autowired
private GlobalndexBulkService globalndexBulkService;
private GlobalIndexBulkService globalIndexBulkService;
protected ApplicationEventPublisher publisher;
... ... @@ -75,7 +75,7 @@ public class TblProductMqListener extends AbstractMqListener implements ChannelA
return;
}
tblProductService.saveOrUpdate(tblProduct);
globalndexBulkService.updateGlobalIndex(tblProduct.getProductSkn(), System.currentTimeMillis(), key);
globalIndexBulkService.updateGlobalIndex(tblProduct.getProductSkn(), System.currentTimeMillis(), key);
logger.info("[func=TblProductMqListener.updateData][skn={}][cost={}ms]", tblProduct.getProductSkn(), (System.currentTimeMillis() - begin));
}
... ... @@ -87,7 +87,7 @@ public class TblProductMqListener extends AbstractMqListener implements ChannelA
long begin = System.currentTimeMillis();
tblProductService.delete(Integer.valueOf(id));
Integer tblId = Integer.valueOf(id) * (-1);
globalndexBulkService.deleteGlobalIndex(tblId, System.currentTimeMillis(), key);
globalIndexBulkService.deleteGlobalIndex(tblId, System.currentTimeMillis(), key);
logger.info("[func=deleteData][step=success][tableName=tblProduct][id={}][cost={}ms]", id, (System.currentTimeMillis() - begin));
}
... ...
... ... @@ -7,7 +7,7 @@ import com.yoho.search.base.utils.ConvertUtils;
import com.yoho.search.base.utils.EventReportEnum;
import com.yoho.search.base.utils.ISearchConstants;
import com.yoho.search.consumer.index.increment.AbstractMqListener;
import com.yoho.search.consumer.index.increment.bulks.GlobalndexBulkService;
import com.yoho.search.consumer.index.increment.bulks.GlobalIndexBulkService;
import com.yoho.search.consumer.service.base.TblProductSkcService;
import com.yoho.search.core.es.utils.IgnoreSomeException;
import com.yoho.search.dal.model.TblProductSkc;
... ... @@ -34,7 +34,7 @@ public class TblProductSkcMqListener extends AbstractMqListener implements Chann
@Autowired
private TblProductSkcService tblProductSkcService;
@Autowired
private GlobalndexBulkService globalndexBulkService;
private GlobalIndexBulkService globalIndexBulkService;
protected ApplicationEventPublisher publisher;
... ... @@ -69,7 +69,7 @@ public class TblProductSkcMqListener extends AbstractMqListener implements Chann
//update DB
tblProductSkcService.saveOrUpdate(tblProductSkc);
//update productIndex
globalndexBulkService.updateGlobalIndex(tblProductSkc.getProductSkn(), System.currentTimeMillis(), key);
globalIndexBulkService.updateGlobalIndex(tblProductSkc.getProductSkn(), System.currentTimeMillis(), key);
logger.info("[func=updateData][step=success][tableName=tblProductSkc][skn={}][cost={}ms]", tblProductSkc.getProductSkn(), (System.currentTimeMillis() - begin));
}
... ... @@ -83,7 +83,7 @@ public class TblProductSkcMqListener extends AbstractMqListener implements Chann
//delete DB
tblProductSkcService.delete(Integer.valueOf(id));
//update productIndex
globalndexBulkService.updateGlobalIndex(tblProductSkc.getProductSkn(), System.currentTimeMillis(), key);
globalIndexBulkService.updateGlobalIndex(tblProductSkc.getProductSkn(), System.currentTimeMillis(), key);
logger.info("[func=deleteData][step=success][tableName=tblProductSkc][id={}][cost={}ms]", id, (System.currentTimeMillis() - begin));
}
... ...
... ... @@ -7,7 +7,7 @@ import com.yoho.search.base.utils.ConvertUtils;
import com.yoho.search.base.utils.EventReportEnum;
import com.yoho.search.base.utils.ISearchConstants;
import com.yoho.search.consumer.index.increment.AbstractMqListener;
import com.yoho.search.consumer.index.increment.bulks.GlobalndexBulkService;
import com.yoho.search.consumer.index.increment.bulks.GlobalIndexBulkService;
import com.yoho.search.consumer.service.base.TblProductSkuService;
import com.yoho.search.core.es.utils.IgnoreSomeException;
import com.yoho.search.dal.model.TblProductSku;
... ... @@ -37,7 +37,7 @@ public class TblProductSkuMqListener extends AbstractMqListener implements Chann
protected ApplicationEventPublisher publisher;
@Autowired
private GlobalndexBulkService globalndexBulkService;
private GlobalIndexBulkService globalIndexBulkService;
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
... ... @@ -70,7 +70,7 @@ public class TblProductSkuMqListener extends AbstractMqListener implements Chann
//update DB
tblProductSkuService.saveOrUpdate(tblProductSku);
//update ProductIndex
globalndexBulkService.updateGlobalIndex(tblProductSku.getProductSkn(), System.currentTimeMillis(), key);
globalIndexBulkService.updateGlobalIndex(tblProductSku.getProductSkn(), System.currentTimeMillis(), key);
logger.info("[func=updateData][step=success][tableName=tblProductSku][id={}][cost={}ms]", tblProductSku.getProductSkn(), (System.currentTimeMillis() - begin));
}
... ... @@ -83,7 +83,7 @@ public class TblProductSkuMqListener extends AbstractMqListener implements Chann
TblProductSku tblProductSku = tblProductSkuService.getById(Integer.valueOf(id));
//delete DB
tblProductSkuService.delete(Integer.valueOf(id));
globalndexBulkService.updateGlobalIndex(tblProductSku.getProductSkn(), System.currentTimeMillis(), key);
globalIndexBulkService.updateGlobalIndex(tblProductSku.getProductSkn(), System.currentTimeMillis(), key);
logger.info("[func=deleteData][step=success][tableName=tblProductSku][id={}][cost={}ms]", id, (System.currentTimeMillis() - begin));
}
... ...
... ... @@ -8,7 +8,7 @@ import com.yoho.search.base.utils.EventReportEnum;
import com.yoho.search.base.utils.ISearchConstants;
import com.yoho.search.consumer.index.fullbuild.TblProductNewIndexBuilder;
import com.yoho.search.consumer.index.increment.AbstractMqListener;
import com.yoho.search.consumer.index.increment.bulks.GlobalndexBulkService;
import com.yoho.search.consumer.index.increment.bulks.GlobalIndexBulkService;
import com.yoho.search.consumer.service.base.TblProductService;
import com.yoho.search.consumer.service.base.TblSiteService;
import com.yoho.search.core.es.utils.IgnoreSomeException;
... ... @@ -39,7 +39,7 @@ public class TblSiteMqListener extends AbstractMqListener implements ChannelAwar
@Autowired
private TblProductService tblProductService;
@Autowired
private GlobalndexBulkService globalndexBulkService;
private GlobalIndexBulkService globalIndexBulkService;
@Autowired
private TblProductNewIndexBuilder tblProductNewIndexBuilder;
... ... @@ -80,7 +80,7 @@ public class TblSiteMqListener extends AbstractMqListener implements ChannelAwar
// update global
List<Integer> skns = tblProductService.getSknsBySiteId(tblSite.getSiteId());
for (Integer skn : skns) {
globalndexBulkService.updateGlobalIndex(skn, begin, key);
globalIndexBulkService.updateGlobalIndex(skn, begin, key);
}
logger.info("[func=updateData][step=success][tableName=tblSite][id={}][cost={}ms]", tblSite.getSiteId(), (System.currentTimeMillis() - begin));
}
... ... @@ -98,7 +98,7 @@ public class TblSiteMqListener extends AbstractMqListener implements ChannelAwar
if (tblSite != null) {
List<Integer> skns = tblProductService.getSknsBySiteId(tblSite.getSiteId());
for (Integer skn : skns) {
globalndexBulkService.updateGlobalIndex(skn, begin, key);
globalIndexBulkService.updateGlobalIndex(skn, begin, key);
}
}
logger.info("[func=deleteData][step=success][tableName=tblSite][id={}][cost={}ms]", id, (System.currentTimeMillis() - begin));
... ...