Authored by wangnan

商品索引添加商品热度字段,支持全量和定时增量。fix

@@ -20,5 +20,9 @@ public interface ProductHeatValueMapper { @@ -20,5 +20,9 @@ public interface ProductHeatValueMapper {
20 20
21 Integer selectLatestDateId(); 21 Integer selectLatestDateId();
22 22
23 - List<ProductHeatValue> selectHeatValueOfLatestDate(@Param(value="sknList")List<Integer> sknList, @Param(value="dateId")Integer dateId); 23 + Integer count(@Param(value="dateId")Integer dateId);
  24 +
  25 + List<ProductHeatValue> selectHeatValueOfLatestDateBySkn(@Param(value="sknList")List<Integer> sknList, @Param(value="dateId")Integer dateId);
  26 +
  27 + List<ProductHeatValue> selectHeatValueOfLatestDate(@Param(value="dateId")Integer dateId,@Param(value = "offset") Integer offset, @Param(value = "pageSize") Integer pageSize);
24 } 28 }
@@ -79,7 +79,7 @@ @@ -79,7 +79,7 @@
79 last_update_time = #{lastUpdateTime,jdbcType=INTEGER} 79 last_update_time = #{lastUpdateTime,jdbcType=INTEGER}
80 where product_skn = #{productSkn,jdbcType=INTEGER} 80 where product_skn = #{productSkn,jdbcType=INTEGER}
81 </update> 81 </update>
82 - <select id="selectHeatValueOfLatestDate" resultMap="BaseResultMap" timeout="20000"> 82 + <select id="selectHeatValueOfLatestDateBySkn" resultMap="BaseResultMap" timeout="20000">
83 <![CDATA[ 83 <![CDATA[
84 select * from product_heat_value p 84 select * from product_heat_value p
85 where 85 where
@@ -92,7 +92,18 @@ @@ -92,7 +92,18 @@
92 #{item} 92 #{item}
93 </foreach> 93 </foreach>
94 </select> 94 </select>
  95 + <select id="selectHeatValueOfLatestDate" resultMap="BaseResultMap" timeout="20000">
  96 + select * from product_heat_value p
  97 + where
  98 + date_id = #{dateId}
  99 + limit #{offset},#{pageSize}
  100 + </select>
95 <select id="selectLatestDateId" resultType="java.lang.Integer"> 101 <select id="selectLatestDateId" resultType="java.lang.Integer">
96 select max(date_id) from product_heat_value 102 select max(date_id) from product_heat_value
97 </select> 103 </select>
  104 + <select id="count" resultType="java.lang.Integer" timeout="20000">
  105 + SELECT count(*) FROM product_heat_value
  106 + where
  107 + date_id = #{dateId}
  108 + </select>
98 </mapper> 109 </mapper>
  1 +package com.yoho.search.consumer.index.increment.flow;
  2 +
  3 +import com.alibaba.fastjson.JSONObject;
  4 +import com.yoho.search.base.utils.ISearchConstants;
  5 +import com.yoho.search.consumer.index.common.IYohoIndexService;
  6 +import com.yoho.search.consumer.index.rebuild.RebuildFlagService;
  7 +import com.yoho.search.consumer.service.base.ProductService;
  8 +import com.yoho.search.consumer.suggests.common.RetryBusinessFlow;
  9 +import com.yoho.search.core.es.IElasticsearchClient;
  10 +import com.yoho.search.core.es.model.ESBluk;
  11 +import com.yoho.search.dal.ProductHeatValueMapper;
  12 +import com.yoho.search.dal.model.Product;
  13 +import com.yoho.search.dal.model.ProductHeatValue;
  14 +import org.apache.commons.collections.CollectionUtils;
  15 +import org.elasticsearch.action.bulk.BulkResponse;
  16 +import org.slf4j.Logger;
  17 +import org.slf4j.LoggerFactory;
  18 +import org.springframework.beans.factory.annotation.Autowired;
  19 +import org.springframework.stereotype.Component;
  20 +
  21 +import java.util.ArrayList;
  22 +import java.util.List;
  23 +import java.util.Map;
  24 +import java.util.stream.Collectors;
  25 +
  26 +/**
  27 + * Created by wangnan on 2017/4/10.
  28 + */
  29 +@Component
  30 +public class ProductIndexHeatValueUpdateFlow implements RetryBusinessFlow {
  31 +
  32 + private static final Logger logger = LoggerFactory.getLogger("FLOW_EXECUTOR");
  33 +
  34 + @Autowired
  35 + private RebuildFlagService rebuildFlagService;
  36 + @Autowired
  37 + private ProductHeatValueMapper productHeatValueMapper;
  38 + @Autowired
  39 + private ProductService productService;
  40 + @Autowired
  41 + private IYohoIndexService yohoIndexService;
  42 +
  43 + @Override
  44 + public String flowName() {
  45 + return this.getClass().getSimpleName();
  46 + }
  47 +
  48 + @Override
  49 + public void init() {
  50 + rebuildFlagService.waitingRebuildingIndex();
  51 + rebuildFlagService.updateIsBuildingTrue();
  52 + }
  53 +
  54 + @Override
  55 + public int getTotalCount() {
  56 + return productHeatValueMapper.count(productHeatValueMapper.selectLatestDateId());
  57 + }
  58 +
  59 + @Override
  60 + public boolean doBusiness(int pageNo, int batchSize) {
  61 + int start = (pageNo - 1) * batchSize;
  62 + List<ProductHeatValue> productHeatValueList = productHeatValueMapper.selectHeatValueOfLatestDate(productHeatValueMapper.selectLatestDateId(),start, batchSize);
  63 + if (CollectionUtils.isEmpty(productHeatValueList)) {
  64 + return true;
  65 + }
  66 + List<Integer> sknList = productHeatValueList.stream().map(ProductHeatValue::getProductSkn).collect(Collectors.toList());
  67 + List<Product> productList = productService.selectListSkns(sknList);
  68 + Map<Integer, Integer> skn2IDMap = productList.stream().collect(Collectors.toMap(Product::getErpProductId, Product::getId));
  69 + logger.info("[{} business][pageNo={}][productHeatValueList={}]", flowName(), pageNo, productHeatValueList.size());
  70 + List<JSONObject> dataList = new ArrayList<JSONObject>();
  71 + productHeatValueList.stream().forEach((p)->{
  72 + if (skn2IDMap.get(p.getProductSkn()) != null) {
  73 + JSONObject product = new JSONObject();
  74 + product.put("id", skn2IDMap.get(p.getProductSkn()));
  75 + product.put("heatValue", p.getHeatValue());
  76 + dataList.add(product);
  77 + }
  78 + });
  79 + if (CollectionUtils.isEmpty(dataList)) {
  80 + return true;
  81 + }
  82 + final String indexName = ISearchConstants.INDEX_NAME_PRODUCT_INDEX;
  83 + List<ESBluk> bluks = new ArrayList<>(dataList.size());
  84 + for (JSONObject product : dataList) {
  85 + bluks.add(new ESBluk(product.toJSONString(), product.getInteger("id").toString(), indexName, indexName, false));
  86 + }
  87 + IElasticsearchClient client = yohoIndexService.getIndex(indexName).getIndexClient();
  88 + BulkResponse bulkResponse = client.bulk(bluks);
  89 + if (bulkResponse.hasFailures()) {
  90 + throw new RuntimeException(
  91 + String.format("bulk has failure,[yohoIndexName=[%s]],[pageNo=%s],[failureMessage=%s]", indexName, pageNo, bulkResponse.buildFailureMessage()));
  92 + }
  93 + logger.info("[{} business][pageNo={}][message=bulk succeed]", flowName(), pageNo);
  94 + return true;
  95 + }
  96 +
  97 +
  98 + @Override
  99 + public void finish(boolean doBusinessResult, Exception exception) {
  100 + logger.info("[{} business][doBusinessResult={}][exception={}]", flowName(), doBusinessResult, exception);
  101 + rebuildFlagService.updateIsBuildingFalse();
  102 + }
  103 +}
  1 +package com.yoho.search.consumer.job;
  2 +
  3 +import com.yoho.search.base.utils.ISearchConstants;
  4 +import com.yoho.search.consumer.index.increment.flow.ProductIndexHeatValueUpdateFlow;
  5 +import com.yoho.search.consumer.suggests.common.RetryBusinessFlowExecutor;
  6 +import org.slf4j.Logger;
  7 +import org.slf4j.LoggerFactory;
  8 +import org.springframework.scheduling.annotation.Scheduled;
  9 +import org.springframework.stereotype.Component;
  10 +
  11 +/**
  12 + * Created by wangnan on 2017/4/10.
  13 + */
  14 +@Component
  15 +public class ProductHeatValueJob {
  16 + private static final Logger logger = LoggerFactory.getLogger(ProductHeatValueJob.class);
  17 +
  18 + private ProductIndexHeatValueUpdateFlow productIndexHeatValueUpdateFlow;
  19 +
  20 + /**
  21 + * 每隔半个小时
  22 + */
  23 + @Scheduled(cron = "0 0/30 * * * ?")
  24 + public void doUpdateProductIndexHeatValue(){
  25 + long begin = System.currentTimeMillis();
  26 + RetryBusinessFlowExecutor flowExecutor = new RetryBusinessFlowExecutor(productIndexHeatValueUpdateFlow, ISearchConstants.SEARCH_INDEX_BATCH_MAX_THREAD_SIZE, ISearchConstants.SEARCH_INDEX_BATCH_LIMIT);
  27 + boolean result = flowExecutor.execute();
  28 + logger.info("ProductHeatValueJob.doUpdateProductIndexHeatValue end----[result={}][cost={}]", result, System.currentTimeMillis() - begin);
  29 + }
  30 +
  31 + /**
  32 + * 每天1时
  33 + */
  34 + @Scheduled(cron = "0 0 1 * * ?")
  35 + public void clean(){
  36 +
  37 + }
  38 +
  39 +}
1 package com.yoho.search.consumer.restapi; 1 package com.yoho.search.consumer.restapi;
2 2
  3 +import com.yoho.search.base.utils.ISearchConstants;
  4 +import com.yoho.search.consumer.index.increment.flow.ProductIndexHeatValueUpdateFlow;
  5 +import com.yoho.search.consumer.job.ProductHeatValueJob;
3 import com.yoho.search.consumer.service.logic.tbl.TblBrandRelationLogicService; 6 import com.yoho.search.consumer.service.logic.tbl.TblBrandRelationLogicService;
4 import com.yoho.search.consumer.service.logic.tbl.TblSortRelationLogicService; 7 import com.yoho.search.consumer.service.logic.tbl.TblSortRelationLogicService;
  8 +import com.yoho.search.consumer.suggests.common.RetryBusinessFlowExecutor;
  9 +import org.slf4j.Logger;
  10 +import org.slf4j.LoggerFactory;
5 import org.springframework.beans.factory.annotation.Autowired; 11 import org.springframework.beans.factory.annotation.Autowired;
6 import org.springframework.stereotype.Controller; 12 import org.springframework.stereotype.Controller;
7 import org.springframework.web.bind.annotation.RequestMapping; 13 import org.springframework.web.bind.annotation.RequestMapping;
@@ -15,11 +21,37 @@ import java.util.Map; @@ -15,11 +21,37 @@ import java.util.Map;
15 */ 21 */
16 @Controller 22 @Controller
17 public class TblRelationController { 23 public class TblRelationController {
  24 +
  25 + private static final Logger logger = LoggerFactory.getLogger(ProductHeatValueJob.class);
  26 +
  27 +
18 @Autowired 28 @Autowired
19 private TblSortRelationLogicService tblSortRelationLogicService; 29 private TblSortRelationLogicService tblSortRelationLogicService;
20 @Autowired 30 @Autowired
21 private TblBrandRelationLogicService tblBrandRelationLogicService; 31 private TblBrandRelationLogicService tblBrandRelationLogicService;
22 32
  33 + private ProductIndexHeatValueUpdateFlow productIndexHeatValueUpdateFlow;
  34 +
  35 + @RequestMapping(value = "/doUpdateProductIndexHeatValue")
  36 + @ResponseBody
  37 + public Map<String, Object> doUpdateProductIndexHeatValue() {
  38 + Map<String, Object> rt = new HashMap<String, Object>();
  39 + try {
  40 + long begin = System.currentTimeMillis();
  41 + RetryBusinessFlowExecutor flowExecutor = new RetryBusinessFlowExecutor(productIndexHeatValueUpdateFlow, ISearchConstants.SEARCH_INDEX_BATCH_MAX_THREAD_SIZE, ISearchConstants.SEARCH_INDEX_BATCH_LIMIT);
  42 + boolean result = flowExecutor.execute();
  43 + logger.info("ProductHeatValueJob.doUpdateProductIndexHeatValue end----[result={}][cost={}]", result, System.currentTimeMillis() - begin);
  44 + rt.put("code", 200);
  45 + rt.put("message", "success");
  46 + } catch (Exception e) {
  47 + e.printStackTrace();
  48 + rt.put("code", 400);
  49 + rt.put("message", e.getMessage());
  50 + }
  51 + return rt;
  52 + }
  53 +
  54 +
23 @RequestMapping(value = "/tblSortRelation") 55 @RequestMapping(value = "/tblSortRelation")
24 @ResponseBody 56 @ResponseBody
25 public Map<String, Object> loadSortRelationData() { 57 public Map<String, Object> loadSortRelationData() {
@@ -55,4 +55,6 @@ public class ProductService { @@ -55,4 +55,6 @@ public class ProductService {
55 public Product getBySkn(Integer skn){return productMapper.selectBySkn(skn); } 55 public Product getBySkn(Integer skn){return productMapper.selectBySkn(skn); }
56 56
57 public List<Product> selectListByIds(List<Integer> ProductIds){return productMapper.selectListByIds(ProductIds);} 57 public List<Product> selectListByIds(List<Integer> ProductIds){return productMapper.selectListByIds(ProductIds);}
  58 +
  59 + public List<Product> selectListSkns(List<Integer> ProductSkns){return productMapper.selectBySkns(ProductSkns);}
58 } 60 }
@@ -22,7 +22,7 @@ public class HeatValueBuilder implements ViewBuilder { @@ -22,7 +22,7 @@ public class HeatValueBuilder implements ViewBuilder {
22 22
23 @Override 23 @Override
24 public void build(List<ProductIndexBO> productIndexBOs, List<Integer> ids, List<Integer> sknList) { 24 public void build(List<ProductIndexBO> productIndexBOs, List<Integer> ids, List<Integer> sknList) {
25 - List<ProductHeatValue> productHeatValueList = productHeatValueMapper.selectHeatValueOfLatestDate(sknList, productHeatValueMapper.selectLatestDateId()); 25 + List<ProductHeatValue> productHeatValueList = productHeatValueMapper.selectHeatValueOfLatestDateBySkn(sknList, productHeatValueMapper.selectLatestDateId());
26 Map<Integer, BigDecimal> productHeatValueMap = productHeatValueList.parallelStream().collect(Collectors.toMap(ProductHeatValue::getProductSkn, ProductHeatValue::getHeatValue)); 26 Map<Integer, BigDecimal> productHeatValueMap = productHeatValueList.parallelStream().collect(Collectors.toMap(ProductHeatValue::getProductSkn, ProductHeatValue::getHeatValue));
27 productIndexBOs.stream().forEach((p) -> {p.setHeatValue(productHeatValueMap.get(p.getProductSkn()));}); 27 productIndexBOs.stream().forEach((p) -> {p.setHeatValue(productHeatValueMap.get(p.getProductSkn()));});
28 } 28 }