Authored by Gino Zhang

增加一个定时任务 定时检查大数据是否推送完新的个性化数据 即时更新到索引中

... ... @@ -2,7 +2,7 @@ package com.yoho.search.consumer.index.common.impl;
import com.yoho.search.base.utils.ISearchConstants;
import com.yoho.search.consumer.index.common.IYohoIndexRebuildListener;
import com.yoho.search.consumer.service.logic.tools.ProductVectorFeatureLogicService;
import com.yoho.search.consumer.service.logic.ProductVectorFeatureLogicService;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
... ...
package com.yoho.search.consumer.index.increment.rule;
package com.yoho.search.consumer.index.increment;
import com.alibaba.fastjson.JSONArray;
import com.yoho.search.consumer.index.increment.AbstractMqListener;
import com.yoho.search.consumer.service.base.GoodsService;
import com.yoho.search.consumer.service.base.SizeService;
import com.yoho.search.consumer.service.base.StorageService;
... ...
... ... @@ -8,7 +8,6 @@ import com.yoho.search.base.utils.EventReportEnum;
import com.yoho.search.base.utils.ISearchConstants;
import com.yoho.search.consumer.common.CostStatistics;
import com.yoho.search.consumer.index.increment.bulks.StorageSkuIndexBulkService;
import com.yoho.search.consumer.index.increment.rule.AbstractStorageRelatedMqListener;
import com.yoho.search.consumer.service.base.GoodsService;
import com.yoho.search.consumer.service.base.ProductService;
import com.yoho.search.consumer.service.bo.StorageSkuBO;
... ...
... ... @@ -23,7 +23,6 @@ import com.yoho.search.base.utils.EventReportEnum;
import com.yoho.search.base.utils.ISearchConstants;
import com.yoho.search.consumer.common.CostStatistics;
import com.yoho.search.consumer.index.increment.bulks.StorageSkuIndexBulkService;
import com.yoho.search.consumer.index.increment.rule.AbstractStorageRelatedMqListener;
import com.yoho.search.consumer.service.base.ProductService;
import com.yoho.search.consumer.service.base.StorageService;
import com.yoho.search.consumer.service.logic.productIndex.StorageUpdateTimeLogicService;
... ...
package com.yoho.search.consumer.index.increment.flow;
import com.alibaba.fastjson.JSONObject;
import com.yoho.search.base.utils.ISearchConstants;
import com.yoho.search.consumer.index.common.IYohoIndexService;
import com.yoho.search.consumer.index.rebuild.RebuildFlagService;
import com.yoho.search.consumer.service.base.ProductIndexService;
import com.yoho.search.consumer.service.base.ProductService;
import com.yoho.search.consumer.service.base.ProductVectorFeatureService;
import com.yoho.search.consumer.service.bo.ProductIndexBO;
import com.yoho.search.consumer.service.logic.ProductVectorFeatureLogicService;
import com.yoho.search.consumer.suggests.common.RetryBusinessFlow;
import com.yoho.search.core.es.IElasticsearchClient;
import com.yoho.search.core.es.model.ESBluk;
import com.yoho.search.dal.model.Product;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.elasticsearch.action.bulk.BulkResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* Created by ginozhang on 2017/3/17.
*/
@Component
public class ProductIndexFeatureVectorUpdateFlow implements RetryBusinessFlow {
private static final Logger logger = LoggerFactory.getLogger("FLOW_EXECUTOR");
private static final int VALID_COUNT_LIMIT = 1000;
@Autowired
private ProductVectorFeatureService productVectorFeatureService;
@Autowired
private ProductVectorFeatureLogicService productVectorFeatureLogicService;
@Autowired
private ProductService productService;
@Autowired
private ProductIndexService productIndexService;
@Autowired
private IYohoIndexService yohoIndexService;
@Autowired
private RebuildFlagService rebuildFlagService;
private volatile String versionDate = null;
@Override
public String flowName() {
return this.getClass().getSimpleName();
}
@Override
public void init() {
rebuildFlagService.waitingRebuildingIndex();
rebuildFlagService.updateIsBuildingTrue();
productVectorFeatureLogicService.updateGenerateDate();
this.versionDate = productVectorFeatureLogicService.getGenerateDate();
}
@Override
public int getTotalCount() {
if (StringUtils.isEmpty(this.versionDate)) {
return 0;
}
// 如果查询的skn数据小于1000个 则可能不是完整的数据 不更新索引
int total = productVectorFeatureService.selectCount(this.versionDate);
if (total <= VALID_COUNT_LIMIT) {
return 0;
}
return productService.count();
}
@Override
public boolean doBusiness(int pageNo, int batchSize) {
int start = (pageNo - 1) * batchSize;
List<Product> productList = productService.getPageLists(start, batchSize);
if (CollectionUtils.isEmpty(productList)) {
return true;
}
List<Integer> sknList = productList.stream().map(Product::getErpProductId).collect(Collectors.toList());
Map<Integer, Integer> skn2IDMap = productList.stream().collect(Collectors.toMap(Product::getErpProductId, Product::getId));
Map<Integer, String> productVectorFeatureMapBaseSkn = productVectorFeatureLogicService.queryProductVectorFeatureMap(sknList);
logger.info("[{} business][pageNo={}][productVectorFeatureMapBaseSknSize={}]", flowName(), pageNo, productVectorFeatureMapBaseSkn.size());
List<ProductIndexBO> dataList = new ArrayList<>();
productVectorFeatureMapBaseSkn.forEach((skn, vector) -> {
if (skn2IDMap.get(skn) != null) {
ProductIndexBO temp = new ProductIndexBO();
temp.setId(skn2IDMap.get(skn));
temp.setProductFeatureFactor(vector);
dataList.add(temp);
}
});
if (CollectionUtils.isEmpty(dataList)) {
return true;
}
final String indexName = ISearchConstants.INDEX_NAME_PRODUCT_INDEX;
List<ESBluk> bluks = new ArrayList<>(dataList.size());
for (ProductIndexBO ob : dataList) {
bluks.add(new ESBluk(JSONObject.toJSONString(productIndexService.beanToMap(ob)), ob.getId().toString(), indexName, indexName, false));
}
IElasticsearchClient client = yohoIndexService.getIndex(indexName).getIndexClient();
BulkResponse bulkResponse = client.bulk(bluks);
if (bulkResponse.hasFailures()) {
throw new RuntimeException(String.format("bulk has failure,[yohoIndexName=[%s]],[pageNo=%s],[failureMessage=%s]", indexName, pageNo,
bulkResponse.buildFailureMessage()));
}
logger.info("[{} business][pageNo={}][message=bulk succeed]", flowName(), pageNo);
return true;
}
@Override
public void finish(boolean doBusinessResult, Exception exception) {
logger.info("[{} business][doBusinessResult={}][exception={}]", flowName(), doBusinessResult, exception);
rebuildFlagService.updateIsBuildingFalse();
if (exception == null) {
productVectorFeatureLogicService.publishGenerateDate();
}
}
}
... ...
... ... @@ -2,8 +2,11 @@ package com.yoho.search.consumer.job;
import com.yoho.search.base.utils.DateStyle;
import com.yoho.search.base.utils.DateUtil;
import com.yoho.search.base.utils.ISearchConstants;
import com.yoho.search.consumer.index.increment.flow.ProductIndexFeatureVectorUpdateFlow;
import com.yoho.search.consumer.service.base.ProductVectorFeatureService;
import com.yoho.search.consumer.service.logic.tools.ProductVectorFeatureLogicService;
import com.yoho.search.consumer.service.logic.ProductVectorFeatureLogicService;
import com.yoho.search.consumer.suggests.common.RetryBusinessFlowExecutor;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
... ... @@ -20,9 +23,9 @@ import java.util.List;
* Created by ginozhang on 2017/1/5.
*/
@Component
public class CleanExpiredProductVectorFeatureJob {
public class ProductVectorFeatureJob {
private static final Logger logger = LoggerFactory.getLogger(CleanExpiredProductVectorFeatureJob.class);
private static final Logger logger = LoggerFactory.getLogger("INDEX_REBULDER");
private static final int RESERVED_PRODUCT_FEATURE_DAYS = 2;
... ... @@ -36,11 +39,15 @@ public class CleanExpiredProductVectorFeatureJob {
@Autowired
private ProductVectorFeatureService productVectorFeatureService;
@Autowired
private ProductIndexFeatureVectorUpdateFlow productIndexFeatureVectorUpdateFlow;
@Scheduled(cron = "0 45 1 * * ?")
public void execute() {
public void clean() {
try {
// 每天清理过期的商品特征数据
long begin = System.currentTimeMillis();
logger.info("CleanExpiredProductVectorFeatureJob execute start----[begin={}]", begin);
logger.info("ProductVectorFeatureJob.clean start----[begin={}]", begin);
String generateDateStr = productVectorFeatureLogicService.getGenerateDate();
if (StringUtils.isEmpty(generateDateStr)) {
// 默认取前一天
... ... @@ -59,15 +66,15 @@ public class CleanExpiredProductVectorFeatureJob {
cleanRecordByDate(cleanDateStr);
}
logger.info("CleanExpiredProductVectorFeatureJob execute end----[cost={}][generateDate={}]", System.currentTimeMillis() - begin, generateDateStr);
logger.info("ProductVectorFeatureJob.clean end----[cost={}][generateDate={}]", System.currentTimeMillis() - begin, generateDateStr);
} catch (Exception e) {
logger.error("CleanExpiredProductVectorFeatureJob execute failed!", e);
logger.error("ProductVectorFeatureJob.clean failed!", e);
}
}
private void cleanRecordByDate(String clearDateStr) {
long begin = System.currentTimeMillis();
logger.info("CleanExpiredProductVectorFeatureJob cleanRecordByDate start----[clearDateStr={}][begin={}]", clearDateStr, begin);
logger.info("ProductVectorFeatureJob.cleanRecordByDate start----[clearDateStr={}][begin={}]", clearDateStr, begin);
List<Long> idList = productVectorFeatureService.selectByGenerateDate(clearDateStr, CLEAN_BATCH_COUNT);
while (CollectionUtils.isNotEmpty(idList)) {
... ... @@ -75,7 +82,26 @@ public class CleanExpiredProductVectorFeatureJob {
idList = productVectorFeatureService.selectByGenerateDate(clearDateStr, CLEAN_BATCH_COUNT);
}
logger.info("CleanExpiredProductVectorFeatureJob cleanRecordByDate end----[clearDateStr={}][cost={}]", clearDateStr, System.currentTimeMillis() - begin);
logger.info("ProductVectorFeatureJob.cleanRecordByDate end----[clearDateStr={}][cost={}]", clearDateStr, System.currentTimeMillis() - begin);
}
/**
* 定时检查大数据的个性化特征向量,以便即时获取最新数据。
* 每天凌晨04:05:00到07:55:00 每隔10分钟执行一次。
*/
@Scheduled(cron = "0 5/10 4-7 * * ?")
public void updateProductIndexVector() {
long begin = System.currentTimeMillis();
String bigDataRecomDateStr = productVectorFeatureLogicService.getBigDataRecomDateStr();
String lastGenerateVectorDate = productVectorFeatureLogicService.getGenerateDate();
logger.info("ProductVectorFeatureJob.updateProductIndexVector start----[bigDataRecomDateStr={}][lastGenerateVectorDate={}][begin={}]", bigDataRecomDateStr, lastGenerateVectorDate, begin);
if (StringUtils.isEmpty(bigDataRecomDateStr) || StringUtils.isEmpty(lastGenerateVectorDate) || bigDataRecomDateStr.equals(lastGenerateVectorDate)) {
logger.info("ProductVectorFeatureJob.updateProductIndexVector end----[cost={}]", System.currentTimeMillis() - begin);
return;
}
RetryBusinessFlowExecutor flowExecutor = new RetryBusinessFlowExecutor(productIndexFeatureVectorUpdateFlow, ISearchConstants.SEARCH_INDEX_BATCH_MAX_THREAD_SIZE, ISearchConstants.SEARCH_INDEX_BATCH_LIMIT);
boolean result = flowExecutor.execute();
logger.info("ProductVectorFeatureJob.updateProductIndexVector end----[result={}][cost={}]", result, System.currentTimeMillis() - begin);
}
}
... ...
... ... @@ -9,7 +9,7 @@ import com.yoho.search.base.utils.ISearchConstants;
import com.yoho.search.base.utils.MD5Util;
import com.yoho.search.consumer.index.common.IYohoIndexService;
import com.yoho.search.consumer.index.rebuild.RebuildFlagService;
import com.yoho.search.consumer.job.CleanExpiredProductVectorFeatureJob;
import com.yoho.search.consumer.job.ProductVectorFeatureJob;
import com.yoho.search.consumer.job.IndexRebuildJob;
import com.yoho.search.consumer.job.SuggestionJob;
import com.yoho.search.consumer.service.base.ProductIndexService;
... ... @@ -69,7 +69,7 @@ public class IndexController implements ApplicationEventPublisherAware {
@Autowired
private SuggestionJob suggestionJob;
@Autowired
private CleanExpiredProductVectorFeatureJob cleanExpiredProductVectorFeatureJob;
private ProductVectorFeatureJob productVectorFeatureJob;
@Autowired
private SuggestWordDefService suggestWordDefService;
@Autowired
... ... @@ -338,7 +338,22 @@ public class IndexController implements ApplicationEventPublisherAware {
@ResponseBody
public Map<String, Object> cleanExpiredProductVectorFeature() {
try {
cleanExpiredProductVectorFeatureJob.execute();
productVectorFeatureJob.clean();
return getResultMap(200, "success");
} catch (Exception e) {
logger.error(e.getMessage(), e);
Map<String, Object> rtnMap = new HashMap<String, Object>();
rtnMap.put("code", 400);
rtnMap.put("msg", e.getMessage());
return rtnMap;
}
}
@RequestMapping(value = "/index/updateProductIndexVector")
@ResponseBody
public Map<String, Object> updateProductIndexVector() {
try {
productVectorFeatureJob.updateProductIndexVector();
return getResultMap(200, "success");
} catch (Exception e) {
logger.error(e.getMessage(), e);
... ...
package com.yoho.search.consumer.service.logic.tools;
package com.yoho.search.consumer.service.logic;
import com.yoho.search.base.utils.DateUtil;
import com.yoho.search.consumer.service.base.ProductVectorFeatureService;
import com.yoho.search.core.personalized.BigDataRedisOper;
import com.yoho.search.dal.model.ProductVectorFeature;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
... ... @@ -15,6 +17,7 @@ import java.util.Calendar;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
/**
... ... @@ -34,10 +37,15 @@ public class ProductVectorFeatureLogicService {
@Autowired
private ProductVectorFeatureService productVectorFeatureService;
@Autowired
private BigDataRedisOper<?, ?> bigDataRedisOper;
@Resource(name = "curatorFramework")
private CuratorFramework client;
private String generateDate = null;
private volatile String generateDate = null;
private volatile AtomicBoolean rebuilding = new AtomicBoolean(false);
public Map<Integer, String> queryProductVectorFeatureMap(List<Integer> skns) {
Map<Integer, String> productVectorFeatureMapBaseSkn = new HashMap<>();
... ... @@ -60,11 +68,24 @@ public class ProductVectorFeatureLogicService {
* 重建索引开始之前先计算应该采用哪一天的个性化数据
*/
public synchronized void updateGenerateDate() {
if (!rebuilding.compareAndSet(false, true)) {
// 如果已经有再重建的 就不能再次发起
throw new RuntimeException("product index is rebuilding!");
}
// 1. 取大数据redis的recom_list_dataid 格式为yyyyMMdd
String dataid = getBigDataRecomDateStr();
if (StringUtils.isNotEmpty(dataid) && dataid.trim().length() == 8) {
this.generateDate = dataid;
INDEX_REBUILD_LOG.info("The date for product vector is {}.", dataid);
return;
}
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.DAY_OF_MONTH, -1);
calendar.set(Calendar.HOUR_OF_DAY, 0);
// 1. 首先看今天是否生成了前一天的个性化数据
// 2. 首先看今天是否生成了前一天的个性化数据
String previousDayTime = DateUtil.DateToString(calendar.getTime(), DB_DATE_FORMAT);
if (productVectorFeatureService.selectCount(previousDayTime) > 0) {
this.generateDate = previousDayTime;
... ... @@ -72,7 +93,7 @@ public class ProductVectorFeatureLogicService {
return;
}
// 2. 如果没有,则支持继续往前一天检查
// 3. 如果没有,则支持继续往前一天检查
calendar.add(Calendar.DAY_OF_MONTH, -1);
previousDayTime = DateUtil.DateToString(calendar.getTime(), DB_DATE_FORMAT);
if (productVectorFeatureService.selectCount(previousDayTime) > 0) {
... ... @@ -81,9 +102,13 @@ public class ProductVectorFeatureLogicService {
return;
}
// 3. 都没有的话表示不支持个性化搜索
// 4. 都没有的话表示不支持个性化搜索
this.generateDate = null;
//INDEX_REBUILD_LOG.warn("The generate date for product vector is null.");
INDEX_REBUILD_LOG.warn("The generate date for product vector is null.");
}
public String getBigDataRecomDateStr() {
return bigDataRedisOper.getValue("recom_list_dataid");
}
/**
... ... @@ -105,11 +130,12 @@ public class ProductVectorFeatureLogicService {
}
} catch (Exception e) {
INDEX_REBUILD_LOG.error("publish personalized feature generated date " + publishedValue + " to zk failed!", e);
} finally {
rebuilding.set(false);
}
}
public synchronized String getGenerateDate() {
return this.generateDate;
}
}
... ...
package com.yoho.search.consumer.service.logic.productIndex.viewBuilder;
import com.yoho.search.consumer.service.bo.ProductIndexBO;
import com.yoho.search.consumer.service.logic.tools.ProductVectorFeatureLogicService;
import com.yoho.search.consumer.service.logic.ProductVectorFeatureLogicService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
... ...