Authored by hugufei

列表中的页面元素聚合使用异步的方式

1 package com.yoho.search.common.cache; 1 package com.yoho.search.common.cache;
2 2
3 -import java.util.Map;  
4 -import java.util.concurrent.ConcurrentHashMap;  
5 -  
6 -import org.springframework.beans.factory.annotation.Autowired;  
7 -import org.springframework.stereotype.Service;  
8 -  
9 import com.yoho.search.common.cache.aop.SearchCacheAble; 3 import com.yoho.search.common.cache.aop.SearchCacheAble;
10 import com.yoho.search.common.cache.impls.CacheInterface; 4 import com.yoho.search.common.cache.impls.CacheInterface;
11 import com.yoho.search.common.cache.impls.EhCache; 5 import com.yoho.search.common.cache.impls.EhCache;
12 import com.yoho.search.common.cache.impls.SearchRedis; 6 import com.yoho.search.common.cache.impls.SearchRedis;
13 import com.yoho.search.common.cache.impls.YohoRedis; 7 import com.yoho.search.common.cache.impls.YohoRedis;
14 import com.yoho.search.common.cache.model.SearchCache; 8 import com.yoho.search.common.cache.model.SearchCache;
  9 +import org.springframework.beans.factory.annotation.Autowired;
  10 +import org.springframework.stereotype.Service;
  11 +
  12 +import java.util.Map;
  13 +import java.util.concurrent.ConcurrentHashMap;
15 14
16 @Service 15 @Service
17 public class SearchCacheFactory { 16 public class SearchCacheFactory {
@@ -56,17 +55,6 @@ public class SearchCacheFactory { @@ -56,17 +55,6 @@ public class SearchCacheFactory {
56 } 55 }
57 56
58 /** 57 /**
59 - * 获取聚合相关的缓存  
60 - *  
61 - * @return  
62 - */  
63 - public SearchCache getPagePersionalFactorCache() {  
64 - CacheType cacheType = CacheType.EHCACHE;  
65 - int cacheInMinute = 60;  
66 - return this.getOrCreateSearchCache("PAGE_PERSIONAL_FACTOR", cacheType, cacheInMinute);  
67 - }  
68 -  
69 - /**  
70 * 获取默认的搜索缓存 58 * 获取默认的搜索缓存
71 * 59 *
72 * @return 60 * @return
  1 +package com.yoho.search.recall.scene.beans.persional;
  2 +
  3 +import com.yoho.core.redis.cluster.operations.serializer.RedisKeyBuilder;
  4 +import com.yoho.search.common.cache.impls.EhCache;
  5 +import com.yoho.search.common.cache.model.CacheObject;
  6 +import com.yoho.search.recall.scene.models.common.ParamQueryFilter;
  7 +import org.slf4j.Logger;
  8 +import org.slf4j.LoggerFactory;
  9 +import org.springframework.beans.factory.annotation.Autowired;
  10 +
  11 +import javax.annotation.PostConstruct;
  12 +import java.util.concurrent.ConcurrentHashMap;
  13 +import java.util.concurrent.ExecutorService;
  14 +import java.util.concurrent.Executors;
  15 +
  16 +public abstract class AbstractPageComponent {
  17 +
  18 + private static final Logger logger = LoggerFactory.getLogger(AbstractPageComponent.class);
  19 +
  20 + @Autowired
  21 + private EhCache ehCache;
  22 +
  23 + private ConcurrentHashMap<String, Integer> mapLock;
  24 + private ExecutorService executorService;
  25 +
  26 + @PostConstruct
  27 + void init() {
  28 + mapLock = new ConcurrentHashMap(20);//使用一个Map来限流
  29 + executorService = Executors.newFixedThreadPool(5);
  30 + }
  31 +
  32 + public Object queryWithCache(ParamQueryFilter paramQueryFilter) {
  33 + //1、生成RedisKeyBuilder
  34 + RedisKeyBuilder redisKeyBuilder = this.genRedisKeyBuilder(paramQueryFilter);
  35 + if (redisKeyBuilder == null) {
  36 + return null;
  37 + }
  38 + //2、缓存命中,则直接返回
  39 + CacheObject cacheObject = ehCache.get(redisKeyBuilder);
  40 + if (cacheObject != null) {
  41 + return cacheObject;
  42 + }
  43 + //3、限流判断以及重复请求预防
  44 + if (mapLock.size() > 20 || mapLock.putIfAbsent(redisKeyBuilder.getKey(), 1) != null) {
  45 + return null;
  46 + }
  47 + //4、异步执行查询并加入缓存
  48 + executorService.submit(() -> {
  49 + try {
  50 + Object queryResult = doRealQuery(paramQueryFilter);
  51 + CacheObject toCacheResult = new CacheObject(queryResult);
  52 + ehCache.addOrUpdate(redisKeyBuilder, toCacheResult, this.cacheTimeInSecond() / 60);
  53 + } catch (Exception e) {
  54 + logger.error(e.getMessage(), e);
  55 + } finally {
  56 + mapLock.remove(redisKeyBuilder.getKey());
  57 + }
  58 + });
  59 + return null;
  60 + }
  61 +
  62 + protected abstract RedisKeyBuilder genRedisKeyBuilder(ParamQueryFilter paramQueryFilter);
  63 +
  64 + protected abstract int cacheTimeInSecond();
  65 +
  66 + protected abstract Object doRealQuery(ParamQueryFilter paramQueryFilter);
  67 +
  68 +}
@@ -3,15 +3,12 @@ package com.yoho.search.recall.scene.beans.persional; @@ -3,15 +3,12 @@ package com.yoho.search.recall.scene.beans.persional;
3 import com.yoho.core.redis.cluster.operations.serializer.RedisKeyBuilder; 3 import com.yoho.core.redis.cluster.operations.serializer.RedisKeyBuilder;
4 import com.yoho.search.base.utils.ISearchConstants; 4 import com.yoho.search.base.utils.ISearchConstants;
5 import com.yoho.search.base.utils.ProductIndexEsField; 5 import com.yoho.search.base.utils.ProductIndexEsField;
6 -import com.yoho.search.common.cache.SearchCacheFactory;  
7 -import com.yoho.search.common.cache.model.SearchCache;  
8 import com.yoho.search.core.es.model.SearchParam; 6 import com.yoho.search.core.es.model.SearchParam;
9 import com.yoho.search.core.es.model.SearchResult; 7 import com.yoho.search.core.es.model.SearchResult;
  8 +import com.yoho.search.recall.scene.constants.CacheTimeConstants;
10 import com.yoho.search.recall.scene.models.common.ParamQueryFilter; 9 import com.yoho.search.recall.scene.models.common.ParamQueryFilter;
11 import com.yoho.search.recall.scene.models.personal.PagePersonalFactor; 10 import com.yoho.search.recall.scene.models.personal.PagePersonalFactor;
12 import com.yoho.search.recall.scene.models.personal.PageBrandSorts; 11 import com.yoho.search.recall.scene.models.personal.PageBrandSorts;
13 -import com.yoho.search.recall.scene.models.personal.PageSortPriceAreas;  
14 -import com.yoho.search.service.base.SearchCacheService;  
15 import com.yoho.search.service.base.SearchCommonService; 12 import com.yoho.search.service.base.SearchCommonService;
16 import org.elasticsearch.search.aggregations.AbstractAggregationBuilder; 13 import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
17 import org.elasticsearch.search.aggregations.Aggregation; 14 import org.elasticsearch.search.aggregations.Aggregation;
@@ -21,33 +18,36 @@ import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilde @@ -21,33 +18,36 @@ import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilde
21 import org.springframework.beans.factory.annotation.Autowired; 18 import org.springframework.beans.factory.annotation.Autowired;
22 import org.springframework.stereotype.Component; 19 import org.springframework.stereotype.Component;
23 20
24 -import javax.annotation.PostConstruct;  
25 import java.util.*; 21 import java.util.*;
26 22
27 @Component 23 @Component
28 -public class PagePersionalFactorComponent { 24 +public class PagePersionalFactorComponent extends AbstractPageComponent {
29 25
30 @Autowired 26 @Autowired
31 private SearchCommonService searchCommonService; 27 private SearchCommonService searchCommonService;
32 - @Autowired  
33 - private SearchCacheService searchCacheService;  
34 - @Autowired  
35 - private SearchCacheFactory searchCacheFactory;  
36 -  
37 - private SearchCache searchCache;  
38 -  
39 - @PostConstruct  
40 - void init(){  
41 - searchCache = searchCacheFactory.getPagePersionalFactorCache();  
42 - }  
43 28
44 /** 29 /**
45 - * 获取链接中的个性化因子-品牌+品类  
46 - * 30 + * 查询个性化因子
47 * @param paramQueryFilter 31 * @param paramQueryFilter
48 * @return 32 * @return
49 */ 33 */
50 public PagePersonalFactor queryPagePersionalFactor(ParamQueryFilter paramQueryFilter) { 34 public PagePersonalFactor queryPagePersionalFactor(ParamQueryFilter paramQueryFilter) {
  35 + Object value = super.queryWithCache(paramQueryFilter);
  36 + return value==null?null:(PagePersonalFactor)value;
  37 + }
  38 +
  39 + @Override
  40 + protected RedisKeyBuilder genRedisKeyBuilder(ParamQueryFilter paramQueryFilter) {
  41 + return RedisKeyBuilder.newInstance().appendFixed("YOHOSEARCH:").appendFixed("PAGE_FACTOR:").appendVar(paramQueryFilter.getParamMd5Key());
  42 + }
  43 +
  44 + @Override
  45 + protected int cacheTimeInSecond() {
  46 + return CacheTimeConstants.PAGE_PERSIONAL_FACTOR;
  47 + }
  48 +
  49 + @Override
  50 + protected Object doRealQuery(ParamQueryFilter paramQueryFilter) {
51 //1、构造参数 51 //1、构造参数
52 SearchParam searchParam = new SearchParam(); 52 SearchParam searchParam = new SearchParam();
53 searchParam.setQuery(paramQueryFilter.getParamQuery()); 53 searchParam.setQuery(paramQueryFilter.getParamQuery());
@@ -59,28 +59,17 @@ public class PagePersionalFactorComponent { @@ -59,28 +59,17 @@ public class PagePersionalFactorComponent {
59 aggregationBuilders.add(brandSortAggBuilder());//品类-品牌聚合 59 aggregationBuilders.add(brandSortAggBuilder());//品类-品牌聚合
60 searchParam.setAggregationBuilders(aggregationBuilders); 60 searchParam.setAggregationBuilders(aggregationBuilders);
61 61
62 - //3、缓存中获取  
63 - RedisKeyBuilder redisKeyBuilder = searchCacheService.genSearchParamString(ISearchConstants.INDEX_NAME_PRODUCT_INDEX,searchParam);  
64 - PagePersonalFactor pagePagePersonalFactor = searchCacheService.getSerializableObjectFromCache(searchCache,redisKeyBuilder,PagePersonalFactor.class,true);  
65 - if(pagePagePersonalFactor !=null) {  
66 - return pagePagePersonalFactor;  
67 - }  
68 -  
69 - //4、执行查询 62 + //3、执行查询
70 SearchResult searchResult = searchCommonService.doSearch(ISearchConstants.INDEX_NAME_PRODUCT_INDEX, searchParam); 63 SearchResult searchResult = searchCommonService.doSearch(ISearchConstants.INDEX_NAME_PRODUCT_INDEX, searchParam);
71 64
72 - //5、构造结果 65 + //4、构造结果
73 Map<String, Aggregation> aggregationMap = searchResult.getAggMaps(); 66 Map<String, Aggregation> aggregationMap = searchResult.getAggMaps();
74 List<PageBrandSorts> sortBrands = this.getBrandSortsFromAggregationMap(aggregationMap); 67 List<PageBrandSorts> sortBrands = this.getBrandSortsFromAggregationMap(aggregationMap);
75 -  
76 - pagePagePersonalFactor = new PagePersonalFactor(sortBrands);  
77 - //6、加入缓存  
78 - searchCacheService.addSerializableObjectToCache(searchCache,redisKeyBuilder, pagePagePersonalFactor,true);  
79 - return pagePagePersonalFactor; 68 + return new PagePersonalFactor(sortBrands);
80 } 69 }
81 70
82 /** 71 /**
83 - * 品类-品牌聚合 72 + * 品类+品牌聚合
84 * 73 *
85 * @return 74 * @return
86 */ 75 */
1 package com.yoho.search.recall.scene.beans.persional; 1 package com.yoho.search.recall.scene.beans.persional;
2 2
3 -import com.google.common.hash.BloomFilter;  
4 -import com.google.common.hash.Funnels;  
5 import com.yoho.core.redis.cluster.operations.serializer.RedisKeyBuilder; 3 import com.yoho.core.redis.cluster.operations.serializer.RedisKeyBuilder;
6 -import com.yoho.search.common.cache.impls.EhCache;  
7 -import com.yoho.search.common.cache.model.CacheObject; 4 +import com.yoho.search.base.utils.ISearchConstants;
  5 +import com.yoho.search.base.utils.ProductIndexEsField;
  6 +import com.yoho.search.core.es.model.SearchParam;
  7 +import com.yoho.search.core.es.model.SearchResult;
  8 +import com.yoho.search.recall.scene.constants.CacheTimeConstants;
8 import com.yoho.search.recall.scene.models.common.ParamQueryFilter; 9 import com.yoho.search.recall.scene.models.common.ParamQueryFilter;
9 import com.yoho.search.recall.scene.models.personal.PageSknBitSet; 10 import com.yoho.search.recall.scene.models.personal.PageSknBitSet;
10 -import org.apache.lucene.util.RamUsageEstimator; 11 +import com.yoho.search.service.base.SearchCommonService;
  12 +import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
  13 +import org.elasticsearch.search.aggregations.Aggregation;
  14 +import org.elasticsearch.search.aggregations.AggregationBuilders;
  15 +import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
  16 +import org.elasticsearch.search.aggregations.bucket.terms.Terms;
11 import org.springframework.beans.factory.annotation.Autowired; 17 import org.springframework.beans.factory.annotation.Autowired;
12 import org.springframework.stereotype.Component; 18 import org.springframework.stereotype.Component;
13 19
14 -import java.nio.charset.Charset; 20 +import java.util.ArrayList;
  21 +import java.util.Iterator;
  22 +import java.util.List;
  23 +import java.util.Map;
15 24
16 @Component 25 @Component
17 -public class PageSknBitSetComponent { 26 +public class PageSknBitSetComponent extends AbstractPageComponent {
18 27
19 @Autowired 28 @Autowired
20 - private EhCache ehCache; 29 + private SearchCommonService searchCommonService;
21 30
22 /** 31 /**
23 - * 获取页面上的skn列表 32 + * 获取页面上的skn的bitset
24 * 33 *
25 * @param paramQueryFilter 34 * @param paramQueryFilter
26 * @return 35 * @return
27 */ 36 */
28 public PageSknBitSet queryPageSknBitSet(ParamQueryFilter paramQueryFilter) { 37 public PageSknBitSet queryPageSknBitSet(ParamQueryFilter paramQueryFilter) {
29 - RedisKeyBuilder redisKeyBuilder = RedisKeyBuilder.newInstance();  
30 - redisKeyBuilder.appendFixed("YOHOSEARCH:").appendFixed("PAGESKN").appendVar(paramQueryFilter.getParamMd5Key());  
31 - CacheObject cacheObject = ehCache.get(redisKeyBuilder);  
32 - if(cacheObject!=null){  
33 - return (PageSknBitSet)cacheObject.toObject();  
34 - }  
35 - PageSknBitSet pageSknBitSet = new PageSknBitSet();  
36 - cacheObject = new CacheObject(pageSknBitSet);  
37 - ehCache.addOrUpdate(redisKeyBuilder,cacheObject,30); 38 + Object value = super.queryWithCache(paramQueryFilter);
  39 + return value==null?null:(PageSknBitSet)value;
  40 + }
  41 +
  42 + @Override
  43 + protected RedisKeyBuilder genRedisKeyBuilder(ParamQueryFilter paramQueryFilter) {
  44 + return RedisKeyBuilder.newInstance().appendFixed("YOHOSEARCH:PAGE_SKN_BITSET:").appendVar(paramQueryFilter.getParamMd5Key());
  45 + }
  46 +
  47 + @Override
  48 + protected int cacheTimeInSecond() {
  49 + return CacheTimeConstants.PAGE_SKN_BITSET;
  50 + }
  51 +
  52 + @Override
  53 + protected Object doRealQuery(ParamQueryFilter paramQueryFilter) {
  54 + //1、构造请求参数
  55 + SearchParam searchParam = new SearchParam();
  56 + searchParam.setQuery(paramQueryFilter.getParamQuery());
  57 + searchParam.setFiter(paramQueryFilter.getParamFilter());
  58 + searchParam.setSize(0);
  59 +
  60 + //2、构造聚合参数
  61 + List<AbstractAggregationBuilder<?>> aggregationBuilders = new ArrayList<>();
  62 + aggregationBuilders.add(AggregationBuilders.terms("productIdAgg").field(ProductIndexEsField.productId).size(10000).order(Terms.Order.term(false)));//品类-品牌聚合
  63 + searchParam.setAggregationBuilders(aggregationBuilders);
  64 +
  65 + //3、执行查询
  66 + SearchResult searchResult = searchCommonService.doSearch(ISearchConstants.INDEX_NAME_PRODUCT_INDEX, searchParam);
  67 +
  68 + //4、构造结果
  69 + Map<String, Aggregation> aggregationMap = searchResult.getAggMaps();
  70 + PageSknBitSet pageSknBitSet = this.getPageSknBitSetFromAggregationMap(aggregationMap,"productIdAgg");
38 return pageSknBitSet; 71 return pageSknBitSet;
39 } 72 }
40 73
41 - public static void main(String[] args) {  
42 - Charset charset = Charset.forName("utf-8");  
43 - BloomFilter<String> bloomFilter = BloomFilter.create(Funnels.stringFunnel(charset),2<<21);//指定bloomFilter的容量  
44 - for(int i =0;i<100000;i++){  
45 - bloomFilter.put(i+""); 74 + private PageSknBitSet getPageSknBitSetFromAggregationMap(Map<String, Aggregation> aggregationMap,String firstAggName){
  75 + if(!aggregationMap.containsKey(firstAggName)){
  76 + return null;
  77 + }
  78 + List<Integer> productIdList = new ArrayList<Integer>();
  79 + MultiBucketsAggregation firstAggregation = (MultiBucketsAggregation) aggregationMap.get(firstAggName);
  80 + Iterator<? extends MultiBucketsAggregation.Bucket> firstAggregationIterator = firstAggregation.getBuckets().iterator();
  81 + while (firstAggregationIterator.hasNext()) {
  82 + MultiBucketsAggregation.Bucket bucket = firstAggregationIterator.next();
  83 + Integer value = Integer.valueOf(bucket.getKeyAsString());
  84 + productIdList.add(value);
46 } 85 }
47 - System.out.println(bloomFilter.mightContain("1000000")); 86 + PageSknBitSet pageSknBitSet = new PageSknBitSet();
  87 + for (Integer productId: productIdList) {
  88 + pageSknBitSet.add(productId);
  89 + }
  90 + return pageSknBitSet;
48 } 91 }
49 92
  93 +
50 } 94 }
@@ -2,21 +2,21 @@ package com.yoho.search.recall.scene.beans.strategy; @@ -2,21 +2,21 @@ package com.yoho.search.recall.scene.beans.strategy;
2 2
3 public enum StrategyEnum { 3 public enum StrategyEnum {
4 4
5 - FIRST_SKN(110),  
6 - DIRECT_TRAIN(109), 5 + FIRST_SKN(110),//配置的skn
  6 + DIRECT_TRAIN(109),//直通车
7 7
8 - RECOMMEND_SKN(99), 8 + RECOMMEND_SKN(99),//推荐的skn,线上还没有
9 9
10 - SORT_BRAND_HEAT_VALUE(31),  
11 - SORT_BRAND_REDUCE_PRICE(32),  
12 - SORT_BRAND_PROMOTION(33),  
13 - SORT_BRAND_NEW(34), 10 + SORT_BRAND_HEAT_VALUE(31),//品牌+品类的人气值
  11 + SORT_BRAND_REDUCE_PRICE(32),//品牌+品类的最新降价
  12 + SORT_BRAND_PROMOTION(33),//品牌+品类的新开促销
  13 + SORT_BRAND_NEW(34),//品牌+品类的新品
14 14
15 - ADD_FLOW(12),  
16 - NEW_SHOP(11), 15 + ADD_FLOW(12),//流量补偿
  16 + NEW_SHOP(11),//新开店铺
17 17
18 - COMMON(1),  
19 - DEFAULT(0); 18 + COMMON(1),//整个页面的人气兜底
  19 + DEFAULT(0);//其他,无视即可
20 20
21 private Integer priority; 21 private Integer priority;
22 22
@@ -6,20 +6,25 @@ package com.yoho.search.recall.scene.constants; @@ -6,20 +6,25 @@ package com.yoho.search.recall.scene.constants;
6 */ 6 */
7 public class CacheTimeConstants { 7 public class CacheTimeConstants {
8 8
  9 + //通用召回的缓存 - 10分钟
  10 + public static final int COMMON_RECALL_STRATEGY_CACHE_TIME = 10 * 60;
9 11
10 - //通用召回的缓存  
11 - public static final int COMMON_RECALL_STRATEGY_CACHE_TIME = 10 * 60;//兜底的缓存  
12 -  
13 - //品类+品牌的缓存 12 + //品类+品牌的缓存 - 60分钟
14 public static final int SORT_BRAND_RECALL_STRATEGY_CACHE_TIME = 60 * 60; 13 public static final int SORT_BRAND_RECALL_STRATEGY_CACHE_TIME = 60 * 60;
15 14
16 - //SKN的缓存  
17 - public static final int SKN_INFO = 15 * 60;  
18 -  
19 - //SKN向量的缓存 15 + //SKN向量的缓存-一个小时
20 public static final int SKN_VECTOR = 60 * 60; 16 public static final int SKN_VECTOR = 60 * 60;
21 17
22 - //用户召回结果的缓存  
23 - public static final int USER_RECALL_SKN_LIST = 5 * 60; 18 + //SKN信息的缓存 - 10分钟
  19 + public static final int SKN_INFO = 10 * 60;
  20 +
  21 + //用户召回结果的缓存-三分钟
  22 + public static final int USER_RECALL_SKN_LIST = 3 * 60;
  23 +
  24 + //页面skn的bitset缓存
  25 + public static final int PAGE_SKN_BITSET = 60 * 60;
  26 +
  27 + //页面个性化因子的缓存
  28 + public static final int PAGE_PERSIONAL_FACTOR = 60 * 60;
24 29
25 } 30 }
@@ -5,21 +5,23 @@ import java.util.BitSet; @@ -5,21 +5,23 @@ import java.util.BitSet;
5 5
6 public class PageSknBitSet implements Serializable{ 6 public class PageSknBitSet implements Serializable{
7 7
  8 + public static final int maxValue = 2000000;
  9 +
8 private static final long serialVersionUID = 7185024266096124078L; 10 private static final long serialVersionUID = 7185024266096124078L;
9 private BitSet sknBitSet; 11 private BitSet sknBitSet;
10 12
11 - public BitSet getSknBitSet() {  
12 - return sknBitSet; 13 + public PageSknBitSet(){
  14 + this.sknBitSet = new BitSet(maxValue);
13 } 15 }
14 16
15 - public void setSknBitSet(BitSet sknBitSet) {  
16 - this.sknBitSet = sknBitSet; 17 + public void add(int bitSetIndex) {
  18 + if(bitSetIndex>maxValue){
  19 + return;
  20 + }
  21 + this.sknBitSet.set(bitSetIndex);
17 } 22 }
18 23
19 public boolean exist(int bitSetIndex) { 24 public boolean exist(int bitSetIndex) {
20 - if(this.sknBitSet==null){  
21 - return false;  
22 - }  
23 return sknBitSet.get(bitSetIndex); 25 return sknBitSet.get(bitSetIndex);
24 } 26 }
25 } 27 }