Authored by hugufei

ufo增量索引优化

... ... @@ -59,21 +59,16 @@ public class UfoProductIndexBuilder extends IIndexBuilder implements Application
/**
* 增量
*/
public UfoProductIndexBO buildUfoProductIndexBOIncrease(Integer id) {
Product product = ufoProductMapper.selectByPrimaryKey(id);
if (product == null) {
return null;
public List<UfoProductIndexBO> buildUfoProductIndexBOIncrease(List<Integer> ids) {
List<Product> productList = ufoProductMapper.selectByIdList(ids);
if (productList == null || productList.isEmpty()) {
return new ArrayList<>();
}
List<Product> productList = Arrays.asList(product);
List<UfoProductIndexBO> ufoProductIndexBOList = buildUfoProductIndexBOList(productList);
if (CollectionUtils.isEmpty(ufoProductIndexBOList)) {
return null;
return new ArrayList<>();
}
UfoProductIndexBO ufoProductIndexBO = ufoProductIndexBOList.get(0);
if (ufoProductIndexBO == null) {
return null;
}
return ufoProductIndexBO;
return ufoProductIndexBOList;
}
/**
... ...
... ... @@ -106,17 +106,16 @@ public class UfoToYohoIndexBuilder extends IIndexBuilder {
/**
* 增量构建
*/
public ProductIndexBO buildProductIndexBOIncrease(Integer ufoProductId) {
Product product = ufoProductMapper.selectByPrimaryKey(ufoProductId);
if (product == null) {
return null;
public List<ProductIndexBO> buildProductIndexBOIncrease(List<Integer> ufoProductIdList) {
List<Product> ufoProductList = ufoProductMapper.selectByIdList(ufoProductIdList);
if (ufoProductList == null) {
return new ArrayList<>();
}
List<Product> productList = new ArrayList<>(Arrays.asList(product));
List<ProductIndexBO> productIndexBOList = buildProductIndexBOList(productList);
List<ProductIndexBO> productIndexBOList = buildProductIndexBOList(ufoProductList);
if (CollectionUtils.isEmpty(productIndexBOList)) {
return null;
return new ArrayList<>();
}
return productIndexBOList.get(0);
return productIndexBOList;
}
private List<ProductIndexBO> buildProductIndexBOList(List<Product> productList) {
... ...
... ... @@ -43,31 +43,27 @@ public class CommonBulkService implements ApplicationEventPublisherAware {
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(() -> {
while (true) {
doBulk();
}
});
}
private void doBulk() {
try {
long begin = System.currentTimeMillis();
try {
long begin = System.currentTimeMillis();
//1、从队列中获取全部数据
List<ESBluk> blukList = new ArrayList<>();
queue.drainTo(blukList);
//1、从队列中获取全部数据
List<ESBluk> blukList = new ArrayList<>();
queue.drainTo(blukList);
//2、批量更新Es
if (CollectionUtils.isNotEmpty(blukList)) {
yohoIndexService.bulk(blukList);
logger.info("doBulk, the blukList size is {} and cost {} ms,", blukList.size(), System.currentTimeMillis() - begin);
Thread.sleep(50);
} else {
Thread.sleep(1000);
//2、批量更新Es
if (CollectionUtils.isNotEmpty(blukList)) {
yohoIndexService.bulk(blukList);
logger.info("doBulk, the blukList size is {} and cost {} ms,", blukList.size(), System.currentTimeMillis() - begin);
Thread.sleep(50);
} else {
Thread.sleep(1000);
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
publisher.publishEvent(new SearchEvent("doBulk", "CommonBulkService.doBulk", MoudleEnum.consumer, "exception", IgnoreSomeException.filterSomeException(e), null));
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
publisher.publishEvent(new SearchEvent("doBulk", "CommonBulkService.doBulk", MoudleEnum.consumer, "exception", IgnoreSomeException.filterSomeException(e), null));
}
});
}
public void add(ESBluk esBluk) {
... ...
... ... @@ -5,12 +5,12 @@ import com.yoho.error.event.SearchEvent;
import com.yoho.search.base.utils.EventReportEnum;
import com.yoho.search.base.utils.ISearchConstants;
import com.yoho.search.base.utils.MoudleEnum;
import com.yoho.search.consumer.service.utils.CostStatistics;
import com.yoho.search.consumer.common.IYohoIndexService;
import com.yoho.search.consumer.index.fullbuild.tbl.TblProductIndexNewBuilder;
import com.yoho.search.consumer.index.fullbuild.tbl.TblToYohoIndexBuilder;
import com.yoho.search.consumer.service.bo.ProductIndexBO;
import com.yoho.search.consumer.service.logicService.ProductIndexBOToMapService;
import com.yoho.search.consumer.service.utils.CostStatistics;
import com.yoho.search.core.es.model.ESBluk;
import com.yoho.search.core.es.utils.IgnoreSomeException;
import org.apache.commons.collections.CollectionUtils;
... ... @@ -35,6 +35,7 @@ import java.util.concurrent.Executors;
*/
@Component
public class GlobalIndexBulkService implements ApplicationEventPublisherAware {
private static final Logger logger = LoggerFactory.getLogger(GlobalIndexBulkService.class);
protected ApplicationEventPublisher publisher;
... ... @@ -42,7 +43,6 @@ public class GlobalIndexBulkService implements ApplicationEventPublisherAware {
private static final String indexNameProductIndex = ISearchConstants.INDEX_NAME_PRODUCT_INDEX;
private static final String indexNameTblProductNew = ISearchConstants.INDEX_NAME_TBLPRODUCT_NEW;
private static final long THREAD_SLEEP_IDLE = 2000;
private static final long THREAD_SLEEP_WORK = 50;
... ... @@ -191,7 +191,7 @@ public class GlobalIndexBulkService implements ApplicationEventPublisherAware {
} catch (Exception e) {
logger.error(e.getMessage(), e);
publisher.publishEvent(new SearchEvent("doBulkDeleteGlobalIndex","doBulkDeleteGlobalIndex.GlobalIndexBulkService", MoudleEnum.consumer, "exception", IgnoreSomeException.filterSomeException(e), null));
publisher.publishEvent(new SearchEvent("doBulkDeleteGlobalIndex", "doBulkDeleteGlobalIndex.GlobalIndexBulkService", MoudleEnum.consumer, "exception", IgnoreSomeException.filterSomeException(e), null));
return THREAD_SLEEP_WORK;
}
}
... ...
... ... @@ -179,4 +179,5 @@ public class PromotionBulkService {
}
return results;
}
}
... ...
package com.yoho.search.consumer.index.increment.bulks;
import com.alibaba.fastjson.JSON;
import com.yoho.error.event.SearchEvent;
import com.yoho.search.base.utils.ISearchConstants;
import com.yoho.search.base.utils.MoudleEnum;
import com.yoho.search.consumer.index.fullbuild.ufo.UfoToYohoIndexBuilder;
import com.yoho.search.consumer.service.bo.ProductIndexBO;
import com.yoho.search.consumer.service.logicService.ProductIndexBOToMapService;
import com.yoho.search.core.es.model.ESBluk;
import com.yoho.search.core.es.utils.IgnoreSomeException;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Component
public class Ufo2YohoBulkService implements ApplicationEventPublisherAware {
private static final Logger logger = LoggerFactory.getLogger(UfoCommonBulkService.class);
private final ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(200);
@Autowired
private CommonBulkService commonBulkService;
@Autowired
private UfoToYohoIndexBuilder ufoToYohoIndexBuilder;
@Autowired
private ProductIndexBOToMapService productIndexBOToMapService;
protected ApplicationEventPublisher publisher;
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.publisher = applicationEventPublisher;
}
@PostConstruct
void init() {
// 批量更新ES
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(() -> {
while (true) {
try {
long begin = System.currentTimeMillis();
//1、从队列中获取全部数据
List<Integer> idList = new ArrayList<>();
queue.drainTo(idList);
//2、批量更新Es
if (CollectionUtils.isNotEmpty(idList)) {
doBulkToEs(idList);
logger.info("doBulk, the blukList size is {} and cost {} ms,", idList.size(), System.currentTimeMillis() - begin);
Thread.sleep(100);
} else {
Thread.sleep(2000);
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
publisher.publishEvent(new SearchEvent("doBulk", "UfoCommonBulkByIdService.doBulkToEs", MoudleEnum.consumer, "exception", IgnoreSomeException.filterSomeException(e), null));
}
}
});
}
private void doBulkToEs(List<Integer> ufoProductIdList) {
List<ProductIndexBO> productIndexBOList = ufoToYohoIndexBuilder.buildProductIndexBOIncrease(ufoProductIdList);
if (productIndexBOList == null || productIndexBOList.isEmpty()) {
return;
}
//转成有货索引
for (ProductIndexBO productIndexBO : productIndexBOList) {
Map<String, Object> dataMap = productIndexBOToMapService.beanToMap(productIndexBO);
commonBulkService.add(new ESBluk(JSON.toJSONString(dataMap), productIndexBO.getId(), ISearchConstants.INDEX_NAME_PRODUCT_INDEX, ISearchConstants.INDEX_NAME_PRODUCT_INDEX, false));
}
}
public void addUpdateUfoProductId(Integer productId) {
try {
queue.put(productId);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
... ...
package com.yoho.search.consumer.index.increment.bulks;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yoho.error.event.SearchEvent;
import com.yoho.search.base.utils.ISearchConstants;
import com.yoho.search.base.utils.MoudleEnum;
import com.yoho.search.consumer.common.IYohoIndexService;
import com.yoho.search.consumer.index.fullbuild.ufo.UfoProductIndexBuilder;
import com.yoho.search.consumer.service.bo.UfoProductIndexBO;
import com.yoho.search.core.es.model.ESBluk;
import com.yoho.search.core.es.utils.IgnoreSomeException;
import org.apache.commons.collections.CollectionUtils;
... ... @@ -26,10 +30,12 @@ public class UfoCommonBulkService implements ApplicationEventPublisherAware {
private static final Logger logger = LoggerFactory.getLogger(UfoCommonBulkService.class);
private final ArrayBlockingQueue<ESBluk> queue = new ArrayBlockingQueue<>(200);
private final ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(200);
@Autowired
private IYohoIndexService yohoIndexService;
@Autowired
private UfoProductIndexBuilder ufoProductIndexBuilder;
protected ApplicationEventPublisher publisher;
... ... @@ -44,39 +50,59 @@ public class UfoCommonBulkService implements ApplicationEventPublisherAware {
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(() -> {
while (true) {
doBulk();
try {
long begin = System.currentTimeMillis();
//1、从队列中获取全部数据
List<Integer> idList = new ArrayList<>();
queue.drainTo(idList);
//2、批量更新Es
if (CollectionUtils.isNotEmpty(idList)) {
doBulkToEs(idList);
logger.info("doBulk, the blukList size is {} and cost {} ms,", idList.size(), System.currentTimeMillis() - begin);
Thread.sleep(100);
} else {
Thread.sleep(2000);
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
publisher.publishEvent(new SearchEvent("doBulk", "UfoCommonBulkByIdService.doBulkToEs", MoudleEnum.consumer, "exception", IgnoreSomeException.filterSomeException(e), null));
}
}
});
}
private void doBulk() {
try {
long begin = System.currentTimeMillis();
//1、从队列中获取全部数据
List<ESBluk> blukList = new ArrayList<>();
queue.drainTo(blukList);
private void doBulkToEs(List<Integer> idList) {
List<UfoProductIndexBO> ufoProductIndexBOList = ufoProductIndexBuilder.buildUfoProductIndexBOIncrease(idList);
if (ufoProductIndexBOList == null || ufoProductIndexBOList.isEmpty()) {
return;
}
List<ESBluk> esBlukList = new ArrayList<>();
for (UfoProductIndexBO ufoProductIndexBO : ufoProductIndexBOList) {
JSONObject ufoProductIndexBOJSONObject = (JSONObject) JSON.toJSON(ufoProductIndexBO);
ESBluk esBluk = new ESBluk(ufoProductIndexBOJSONObject.toJSONString(), ufoProductIndexBO.getId().toString(), ISearchConstants.INDEX_NAME_UFO_PRODUCT_INDEX, ISearchConstants.INDEX_NAME_UFO_PRODUCT_INDEX, false);
esBlukList.add(esBluk);
}
yohoIndexService.bulk(esBlukList);
}
//2、批量更新Es
if (CollectionUtils.isNotEmpty(blukList)) {
yohoIndexService.bulk(blukList);
logger.info("doBulk, the blukList size is {} and cost {} ms,", blukList.size(), System.currentTimeMillis() - begin);
Thread.sleep(50);
} else {
Thread.sleep(2000);
}
public void addUpdateData(String jsonString, Integer productId) {
try {
// ESBluk esBluk = new ESBluk(jsonString, productId.toString(), ISearchConstants.INDEX_NAME_UFO_PRODUCT_INDEX, ISearchConstants.INDEX_NAME_UFO_PRODUCT_INDEX, false);
// dataIncrease.put(esBluk);
queue.put(productId);
} catch (Exception e) {
logger.error(e.getMessage(), e);
publisher.publishEvent(new SearchEvent("doBulk", "UfoCommonBulkService.doBulk", MoudleEnum.consumer, "exception", IgnoreSomeException.filterSomeException(e), null));
}
}
public void addUpdateData(String jsonString,Integer id) {
public void addUfoUpdateProductId(Integer ufoProductId) {
try {
ESBluk esBluk = new ESBluk(jsonString, id.toString(), ISearchConstants.INDEX_NAME_UFO_PRODUCT_INDEX, ISearchConstants.INDEX_NAME_UFO_PRODUCT_INDEX, false);
queue.put(esBluk);
queue.put(ufoProductId);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
... ...
... ... @@ -34,7 +34,6 @@ public class UfoStoragePriceMqListener extends AbstractMqListener {
ufoIndexUpdateHelper.updateUfoIndexByUfoProductId(storagePrice.getProductId());
ufo2YohoIndexUpdateHelper.updateYohoIndex(storagePrice.getProductId());
}
}
@Override
... ...
package com.yoho.search.consumer.index.mix;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yoho.search.base.utils.ISearchConstants;
import com.yoho.search.base.utils.UfoProductIndexEsField;
import com.yoho.search.consumer.index.fullbuild.ufo.UfoProductIndexBuilder;
import com.yoho.search.consumer.index.fullbuild.ufo.UfoToYohoIndexBuilder;
import com.yoho.search.consumer.index.fullbuild.ufo.YohoToUfoIndexBuilder;
import com.yoho.search.consumer.index.increment.bulks.CommonBulkService;
import com.yoho.search.consumer.index.increment.bulks.UfoCommonBulkService;
import com.yoho.search.consumer.service.bo.ProductGoodsBO;
import com.yoho.search.consumer.service.bo.ProductIndexBO;
import com.yoho.search.consumer.service.bo.UfoProductIndexBO;
import com.yoho.search.consumer.service.daoService.ProductService;
import com.yoho.search.consumer.service.logicService.ProductGoodsLogicService;
import com.yoho.search.consumer.service.logicService.ProductIndexBOToMapService;
import com.yoho.search.consumer.service.logicService.tbl.util.StringUtils;
import com.yoho.search.consumer.service.logicService.ufo.yoho2ufo.Yoho2UfoColorService;
import com.yoho.search.consumer.service.logicService.ufo.yoho2ufo.Yoho2UfoSizeService;
import com.yoho.search.core.es.model.ESBluk;
import com.yoho.search.dal.Yoho2ufoProductMapper;
import com.yoho.search.dal.model.Product;
import com.yoho.search.dal.model.ProductKeywords;
import com.yoho.search.dal.model.ufo_product.ProductColor;
import com.yoho.search.dal.model.ufo_product.Yoho2ufoProduct;
import org.apache.commons.collections.CollectionUtils;
import com.yoho.search.consumer.index.increment.bulks.Ufo2YohoBulkService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static java.util.stream.Collectors.joining;
/**
* @author wangnan
* @version 2018/9/14
*/
@Component
public class Ufo2YohoIndexUpdateHelper {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private UfoToYohoIndexBuilder ufoToYohoIndexBuilder;
@Autowired
private ProductIndexBOToMapService productIndexBOToMapService;
@Autowired
private CommonBulkService commonBulkService;
private Ufo2YohoBulkService ufo2YohuoBulkService;
/**
* ufo相关表增量更新yoho索引
... ... @@ -60,12 +21,7 @@ public class Ufo2YohoIndexUpdateHelper {
*/
public void updateYohoIndex(Integer ufoProductId) {
try {
ProductIndexBO productIndexBO = ufoToYohoIndexBuilder.buildProductIndexBOIncrease(ufoProductId);
if (productIndexBO == null) {
return;
}
Map<String, Object> dataMap = productIndexBOToMapService.beanToMap(productIndexBO);
commonBulkService.add(new ESBluk(JSON.toJSONString(dataMap), productIndexBO.getId(), ISearchConstants.INDEX_NAME_PRODUCT_INDEX, ISearchConstants.INDEX_NAME_PRODUCT_INDEX, false));
ufo2YohuoBulkService.addUpdateUfoProductId(ufoProductId);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
... ...
package com.yoho.search.consumer.index.mix;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yoho.search.base.utils.ISearchConstants;
import com.yoho.search.base.utils.UfoProductIndexEsField;
import com.yoho.search.consumer.index.fullbuild.ufo.UfoProductIndexBuilder;
import com.yoho.search.consumer.index.fullbuild.ufo.UfoToYohoIndexBuilder;
import com.yoho.search.consumer.index.fullbuild.ufo.YohoToUfoIndexBuilder;
import com.yoho.search.consumer.index.increment.bulks.CommonBulkService;
import com.yoho.search.consumer.index.increment.bulks.UfoCommonBulkService;
import com.yoho.search.consumer.service.bo.ProductGoodsBO;
import com.yoho.search.consumer.service.bo.ProductIndexBO;
import com.yoho.search.consumer.service.bo.UfoProductIndexBO;
import com.yoho.search.consumer.service.daoService.ProductService;
import com.yoho.search.consumer.service.logicService.ProductGoodsLogicService;
import com.yoho.search.consumer.service.logicService.ProductIndexBOToMapService;
import com.yoho.search.consumer.service.logicService.tbl.util.StringUtils;
import com.yoho.search.consumer.service.logicService.ufo.yoho2ufo.Yoho2UfoColorService;
import com.yoho.search.consumer.service.logicService.ufo.yoho2ufo.Yoho2UfoSizeService;
import com.yoho.search.core.es.model.ESBluk;
import com.yoho.search.dal.Yoho2ufoProductMapper;
import com.yoho.search.dal.model.Product;
import com.yoho.search.dal.model.ProductKeywords;
import com.yoho.search.dal.model.ufo_product.ProductColor;
import com.yoho.search.dal.model.ufo_product.Yoho2ufoProduct;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static java.util.stream.Collectors.joining;
/**
* @author wangnan
* @version 2018/9/14
*/
@Component
public class UfoIndexUpdateHelper {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private UfoProductIndexBuilder ufoProductIndexBuilder;
@Autowired
private UfoCommonBulkService ufoCommonBulkService;
/**
* ufo相关表增量更新ufo索引
*/
public void updateUfoIndexByUfoProductId(Integer productId) {
public void updateUfoIndexByUfoProductId(Integer ufoProductId) {
try {
UfoProductIndexBO ufoProductIndexBO = ufoProductIndexBuilder.buildUfoProductIndexBOIncrease(productId);
if (ufoProductIndexBO == null) {
return;
}
updateUfoIndexByBO(ufoProductIndexBO);
ufoCommonBulkService.addUfoUpdateProductId(ufoProductId);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
private void updateUfoIndexByBO(UfoProductIndexBO ufoProductIndexBO) {
try {
if (ufoProductIndexBO == null) {
return;
}
JSONObject ufoProductIndexBOJSONObject = (JSONObject) JSON.toJSON(ufoProductIndexBO);
ufoCommonBulkService.addUpdateData(ufoProductIndexBOJSONObject.toJSONString(), ufoProductIndexBO.getId());
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
public void updateUfoIndexWithDataMap(Map<String, Object> indexData, Integer id) {
/**
* ufo相关表增量更新ufo索引
*/
public void updateUfoIndexWithDataMap(Map<String, Object> indexData, Integer ufoProductId) {
JSONObject jsonObject = new JSONObject(indexData);
ufoCommonBulkService.addUpdateData(jsonObject.toJSONString(),id);
ufoCommonBulkService.addUpdateData(jsonObject.toJSONString(),ufoProductId);
}
}
... ...