Authored by hugufei

使用CacheRequestResponseComponent重构召回时的缓存操作

Showing 14 changed files with 440 additions and 419 deletions
... ... @@ -9,8 +9,13 @@ import com.yoho.search.models.SearchApiResult;
import com.yoho.search.recall.scene.component.*;
import com.yoho.search.recall.scene.helper.SortBuilderHelper;
import com.yoho.search.recall.scene.models.*;
import com.yoho.search.recall.scene.models.CacheRecallRequestResponse;
import com.yoho.search.recall.scene.persional.PersionalFactor;
import com.yoho.search.recall.scene.persional.RecallPersionalService;
import com.yoho.search.recall.scene.request.BatchRequestsBuilder;
import com.yoho.search.recall.scene.request.BatchResponseBuilder;
import com.yoho.search.recall.scene.request.RecallParamsBuilder;
import com.yoho.search.recall.scene.request.RecallResultBuilder;
import com.yoho.search.service.base.SearchCommonService;
import com.yoho.search.service.base.index.ProductIndexBaseService;
import com.yoho.search.service.helper.SearchCommonHelper;
... ... @@ -51,7 +56,7 @@ public class SceneRecallService {
@Autowired
private SearchCommonService searchCommonService;
@Autowired
private BacthQueryBySknComponent bacthQueryBySknComponent;
private BacthSknInfoComponent bacthSknInfoComponent;
public SearchApiResult sceneRecall(Map<String, String> paramMap) {
try {
... ... @@ -94,7 +99,7 @@ public class SceneRecallService {
//1、构造请求
List<RecallRequest> batchRequests = batchRequestsBuilder.buildBatchRequests(param, persionalFactor);
//2、批量召回
List<RecallRequestResponse> requestResponses = bacthRecallComponent.batchRecallAndCache(batchRequests);
List<CacheRecallRequestResponse> requestResponses = bacthRecallComponent.batchRecallAndCache(batchRequests);
//3、获取skn列表[去重]
RecallResponseBatch recallResponseBatch = batchResponseBuilder.buildRecallResponseBatch(requestResponses);
//4、构造真实结果[排序,截取skn]
... ... @@ -106,7 +111,7 @@ public class SceneRecallService {
//1、
List<Integer> productSkns = recallResult.getSknList();
if (productSkns != null && !productSkns.isEmpty()) {
return bacthQueryBySknComponent.queryProductListBySkn(productSkns,productSkns.size());
return bacthSknInfoComponent.queryProductListBySkn(productSkns,productSkns.size());
}else{
return this.queryProductByFilterSkn(recallResult.getNotProductSkn(),recallResult.getRealPage(),recallParams.getPageSize());
}
... ...
package com.yoho.search.recall.scene.component;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yoho.core.redis.cluster.operations.serializer.RedisKeyBuilder;
import com.yoho.search.base.utils.ISearchConstants;
import com.yoho.search.base.utils.ProductIndexEsField;
import com.yoho.search.base.utils.Transfer;
import com.yoho.search.common.cache.impls.EhCache;
import com.yoho.search.common.cache.impls.SearchRedis;
import com.yoho.search.core.es.model.SearchParam;
import com.yoho.search.core.es.model.SearchResult;
import com.yoho.search.recall.scene.models.CacheRequestResponse;
import com.yoho.search.service.base.SearchCommonService;
import com.yoho.search.service.base.index.ProductIndexBaseService;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.elasticsearch.index.query.QueryBuilders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.*;
@Component
public class BacthQueryBySknComponent {
private static final Logger logger = LoggerFactory.getLogger(BacthQueryBySknComponent.class);
@Autowired
private SearchRedis searchRedis;
@Autowired
private EhCache ehCache;
@Autowired
private SearchCommonService searchCommonService;
@Autowired
private ProductIndexBaseService productIndexBaseService;
/**
* 按skn查询并按顺序返回
*
* @param productSkns
* @return
*/
public List<Map<String, Object>> queryProductListBySkn(List<Integer> productSkns,int size){
//1、构建请求
List<CacheRequestResponse<Integer, Map<String, Object>>> sknInfoRequestResponses = new ArrayList<>();
for (Integer productSkn : productSkns) {
sknInfoRequestResponses.add(new CacheRequestResponse(productSkn,redisKeyBuilderTransfer(),toValueTransfer(),fromValueTransfer()));
}
//2、批量从缓存中获取
sknInfoRequestResponses = this.batchQueryFromCache(sknInfoRequestResponses);
//3、获取未命中缓存的skn
List<Integer> notCacheSkns = new ArrayList<>();
for (CacheRequestResponse<Integer, Map<String, Object>> cacheRequestResponse : sknInfoRequestResponses) {
if(cacheRequestResponse.getValueObject()==null){
notCacheSkns.add(cacheRequestResponse.getKeyObject());
}
}
//4、执行批量查询
Map<Integer,Map<String, Object>> queryResults = this.batchQuery(notCacheSkns);
//5、填充查询结果
for (CacheRequestResponse<Integer, Map<String, Object>> cacheRequestResponse : sknInfoRequestResponses) {
if(cacheRequestResponse.getValueObject()==null){
cacheRequestResponse.setValueObject(queryResults.get(cacheRequestResponse.getKeyObject()));
cacheRequestResponse.setNeedRecache(true);
}
}
//6、将CacheRequestResponse中需要缓存的key加入缓存
this.batchAddToCache(sknInfoRequestResponses,10*60);//缓存10分钟
//7、构造返回结果
Map<Integer,Map<String, Object>> productInfoMap = new HashMap<>();
for (CacheRequestResponse<Integer, Map<String, Object>> sknInfoRequestResponse: sknInfoRequestResponses) {
productInfoMap.put(sknInfoRequestResponse.getKeyObject(),sknInfoRequestResponse.getValueObject());
}
List<Map<String, Object>> finalResults = new ArrayList<>();
for (Integer productSkn : productSkns) {
if(productInfoMap.get(productSkn)!=null){
finalResults.add(productInfoMap.get(productSkn));
}
if(finalResults.size()>=size){
break;
}
}
return finalResults;
}
private <K, V> List<CacheRequestResponse<K, V>> batchQueryFromCache(List<CacheRequestResponse<K, V>> cacheRequestResponses) {
try {
Collection<RedisKeyBuilder> keys = new ArrayList<>();
for (CacheRequestResponse<K, V> requestResponse : cacheRequestResponses) {
RedisKeyBuilder redisKeyBuilder = requestResponse.getRedisKeyBuilderTransfer().transfer(requestResponse.getKeyObject());
keys.add(redisKeyBuilder);
}
List<String> cachedValues = ehCache.mutiGet(keys);
//List<String> cachedValues = searchRedis.searchValueOperations.multiGet(keys);
for (int i = 0; i < cacheRequestResponses.size(); i++) {
CacheRequestResponse<K, V> requestResponse = cacheRequestResponses.get(i);
String cachedValue = cachedValues.get(i);
if (!StringUtils.isBlank(cachedValue)) {
requestResponse.setValueObject(requestResponse.getToValueTransfer().transfer(cachedValue));
requestResponse.setNeedRecache(false);
}
}
return cacheRequestResponses;
}catch (Exception e){
logger.error(e.getMessage(),e);
return cacheRequestResponses;
}
}
private <K, V> void batchAddToCache(List<CacheRequestResponse<K, V>> cacheRequestResponses,int timeOutInSecond){
try {
Map<RedisKeyBuilder, String> toCacheMap = new HashMap<>();
for (CacheRequestResponse<K, V> requestResponse : cacheRequestResponses) {
if(requestResponse.isNeedRecache() && requestResponse.getValueObject()!=null){
RedisKeyBuilder redisKeyBuilder = requestResponse.getRedisKeyBuilderTransfer().transfer(requestResponse.getKeyObject());
V value = requestResponse.getValueObject();
String cacheValue = requestResponse.getFromValueTransfer().transfer(value);
toCacheMap.put(redisKeyBuilder,cacheValue);
}
}
if(!toCacheMap.isEmpty()){
//searchRedis.searchRedisTemplate.mset(toCacheMap,timeOutInSecond);
ehCache.mutiSet(toCacheMap,timeOutInSecond);
}
}catch (Exception e){
logger.error(e.getMessage(),e);
}
}
private Transfer<Integer, RedisKeyBuilder> redisKeyBuilderTransfer() {
return new Transfer<Integer, RedisKeyBuilder>() {
@Override
public RedisKeyBuilder transfer(Integer productSkn) {
return RedisKeyBuilder.newInstance().appendFixed("YOHOSEARCH:").appendFixed("SKN:").appendVar(productSkn);
}
};
}
private Transfer<String, Map<String, Object>> toValueTransfer() {
return new Transfer<String, Map<String, Object>>() {
@Override
public Map<String, Object> transfer(String value) {
Map<String, Object> product = new HashMap<>();
product.putAll(JSONObject.parseObject(value));
return product;
}
};
}
private Transfer<Map<String, Object>, String> fromValueTransfer() {
return new Transfer<Map<String, Object>, String>() {
@Override
public String transfer( Map<String, Object> product) {
return JSON.toJSONString(product);
}
};
}
private Map<Integer,Map<String, Object>> batchQuery(List<Integer> productSkns) {
SearchParam searchParam = new SearchParam();
searchParam.setOffset(0);
searchParam.setSize(productSkns.size());
searchParam.setFiter(QueryBuilders.termsQuery(ProductIndexEsField.productSkn, productSkns));
searchParam.setIncludeFields(productIndexBaseService.getProductIndexIncludeFields());
SearchResult searchResult = searchCommonService.doSearch(ISearchConstants.INDEX_NAME_PRODUCT_INDEX, searchParam);
List<Map<String, Object>> productList = productIndexBaseService.getProductListWithPricePlan(searchResult.getResultList());
Map<Integer,Map<String, Object>> results = new HashMap<>();
for (Map<String, Object> product: productList) {
results.put(MapUtils.getIntValue(product,"product_skn",0),product);
}
System.out.println("do batch query by skn");
return results;
}
}
package com.yoho.search.recall.scene.component;
import com.yoho.core.redis.cluster.operations.serializer.RedisKeyBuilder;
import com.yoho.search.base.utils.ISearchConstants;
import com.yoho.search.base.utils.ProductIndexEsField;
import com.yoho.search.core.es.model.SearchParam;
import com.yoho.search.core.es.model.SearchResult;
import com.yoho.search.recall.scene.models.CacheSknInfoRequestResponse;
import com.yoho.search.service.base.SearchCommonService;
import com.yoho.search.service.base.index.ProductIndexBaseService;
import org.apache.commons.collections.MapUtils;
import org.elasticsearch.index.query.QueryBuilders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Component
public class BacthSknInfoComponent {
private static final Logger logger = LoggerFactory.getLogger(BacthSknInfoComponent.class);
@Autowired
private SearchCommonService searchCommonService;
@Autowired
private ProductIndexBaseService productIndexBaseService;
@Autowired
private CacheRequestResponseComponent cacheRequestResponseHelper;
private static final boolean useEhCache = true;
/**
* 按skn查询并按顺序返回
*
* @param productSkns
* @return
*/
public List<Map<String, Object>> queryProductListBySkn(List<Integer> productSkns,int size){
//1、批量查询sknxinxi
List<CacheSknInfoRequestResponse> sknInfoCacheRequestRespons = this.batchQuery(productSkns);
//2、构造返回结果
List<Map<String, Object>> finalResults = new ArrayList<>();
for (CacheSknInfoRequestResponse sknInfoCacheRequestResponse : sknInfoCacheRequestRespons) {
if(sknInfoCacheRequestResponse !=null && sknInfoCacheRequestResponse.getResponse()!=null){
finalResults.add(sknInfoCacheRequestResponse.getResponse());
}
if(finalResults.size()>=size){
break;
}
}
return finalResults;
}
private List<CacheSknInfoRequestResponse> batchQuery(List<Integer> productSkns){
//1、构建请求与返回结果
final List<CacheSknInfoRequestResponse> sknInfoCacheRequestRespons = new ArrayList<>();
for (Integer productSkn : productSkns) {
sknInfoCacheRequestRespons.add(new CacheSknInfoRequestResponse(productSkn,5 * 60 ));//缓存时间5分钟
}
//2、批量从缓存中获取
cacheRequestResponseHelper.batchFillResponseFromCache(sknInfoCacheRequestRespons,useEhCache);
//3、获取未命中缓存的skn
List<CacheSknInfoRequestResponse> missCacheRequests = cacheRequestResponseHelper.filterMissCacheRequests(sknInfoCacheRequestRespons);
//4、执行批量查询
Map<RedisKeyBuilder,Map<String, Object>> queryResults = this.batchQueryMissCacheRequests(missCacheRequests);
//5、填充查询结果
cacheRequestResponseHelper.batchFillResponseWithQueryResults(sknInfoCacheRequestRespons,queryResults);
//6、将CacheRequestResponse中需要缓存的key加入缓存
cacheRequestResponseHelper.batchAddResponseToCache(sknInfoCacheRequestRespons,useEhCache);
return sknInfoCacheRequestRespons;
}
private Map<RedisKeyBuilder,Map<String, Object>> batchQueryMissCacheRequests(List<CacheSknInfoRequestResponse> notCachedRequestResponse) {
//1、合法性判断
Map<RedisKeyBuilder,Map<String, Object>> results = new HashMap<>();
if(notCachedRequestResponse==null||notCachedRequestResponse.isEmpty()){
return results;
}
//2、获取skn
List<Integer> productSkns = new ArrayList<>();
for (CacheSknInfoRequestResponse sknInfoCacheRequestResponse : notCachedRequestResponse) {
productSkns.add(sknInfoCacheRequestResponse.getRequest());
}
//3、构建SearchParam
SearchParam searchParam = new SearchParam();
searchParam.setOffset(0);
searchParam.setSize(productSkns.size());
searchParam.setFiter(QueryBuilders.termsQuery(ProductIndexEsField.productSkn, productSkns));
searchParam.setIncludeFields(productIndexBaseService.getProductIndexIncludeFields());
SearchResult searchResult = searchCommonService.doSearch(ISearchConstants.INDEX_NAME_PRODUCT_INDEX, searchParam);
List<Map<String, Object>> productList = productIndexBaseService.getProductListWithPricePlan(searchResult.getResultList());
//4、构建SKN临时结果
Map<Integer,Map<String, Object>> productTempMap = new HashMap<>();
for (Map<String, Object> product: productList) {
productTempMap.put(MapUtils.getIntValue(product,"product_skn",0),product);
}
//5、构造最终结果
for (CacheSknInfoRequestResponse sknInfoCacheRequestResponse :notCachedRequestResponse ) {
results.put(sknInfoCacheRequestResponse.getRequestRedisKeyBuilder(),productTempMap.get(sknInfoCacheRequestResponse.getRequest()));
}
return results;
}
}
... ...
package com.yoho.search.recall.scene.component;
import com.yoho.core.redis.cluster.operations.serializer.RedisKeyBuilder;
import com.yoho.search.base.utils.CollectionUtils;
import com.yoho.search.base.utils.ISearchConstants;
import com.yoho.search.base.utils.Transfer;
import com.yoho.search.common.cache.impls.SearchRedis;
import com.yoho.search.base.utils.ProductIndexEsField;
import com.yoho.search.core.es.model.SearchParam;
import com.yoho.search.core.es.model.SearchResult;
import com.yoho.search.recall.scene.helper.RecallResponseHelper;
import com.yoho.search.recall.scene.models.RecallRequest;
import com.yoho.search.recall.scene.models.RecallRequestResponse;
import com.yoho.search.recall.scene.models.CacheRecallRequestResponse;
import com.yoho.search.recall.scene.models.RecallResponse;
import com.yoho.search.service.base.SearchCommonService;
import org.apache.commons.collections.MapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
... ... @@ -23,80 +23,49 @@ import java.util.Map;
@Component
public class BatchRecallComponent {
private static final Logger logger = LoggerFactory.getLogger(BatchRecallComponent.class);
@Autowired
private SearchRedis searchRedis;
private CacheRequestResponseComponent cacheRequestResponseHelper;
@Autowired
private SearchCommonService searchCommonService;
private static final boolean useEhCache = false;
/**
* 批量召回入口
*
* @param batchRequests
* @return
*/
public List<RecallRequestResponse> batchRecallAndCache(final List<RecallRequest> batchRequests) {
//1、先从缓存中获取数据,并且构建返回结果对象
final List<RecallRequestResponse> results = this.queryResultFromCache(batchRequests);
public List<CacheRecallRequestResponse> batchRecallAndCache(final List<RecallRequest> batchRequests) {
//1、构造返回结果
final List<CacheRecallRequestResponse> recallRequestResponses = new ArrayList<>();
for (RecallRequest request : batchRequests) {
recallRequestResponses.add(new CacheRecallRequestResponse(request));
}
//2、先从缓存中填充response
cacheRequestResponseHelper.batchFillResponseFromCache(recallRequestResponses,useEhCache);
//2、构造未命中缓存的请求-最多透传x个
final List<RecallRequest> notCachedRequests = this.buildNotCachedRequests(results, 10);
//3、构造未命中缓存的请求-最多透传x个
final List<CacheRecallRequestResponse> missCacheRequests =cacheRequestResponseHelper.filterMissCacheRequests(recallRequestResponses);
//3、如果remainRequests为空,则说明全部命中了缓存,直接返回即可
if (notCachedRequests.isEmpty()) {
return results;
//4、如果remainRequests为空,则说明全部命中了缓存,直接返回即可
if (missCacheRequests.isEmpty()) {
return recallRequestResponses;
}
//4、处理剩余请求
List<RecallRequestResponse> notCachedResults = this.queryNotCachedResult(notCachedRequests);
//5、处理剩余请求
Map<RedisKeyBuilder,RecallResponse> notCacheResults = this.queryNotCachedResult(missCacheRequests);
//5、将查出来对象加入缓存
this.addResultsToCache(notCachedResults);
//6、填充recallRequestResponses
cacheRequestResponseHelper.batchFillResponseWithQueryResults(recallRequestResponses,notCacheResults);
//6、填充results
this.fillResults(results,notCachedResults);
//7、将尚未缓存的对象加入缓存
cacheRequestResponseHelper.batchAddResponseToCache(recallRequestResponses,useEhCache);
return results;
}
/**
* 从缓存中批量获取
*
* @param requests
* @return
*/
private List<RecallRequestResponse> queryResultFromCache(final List<RecallRequest> requests) {
List<RecallRequestResponse> results = new ArrayList<>(requests.size());
List<RedisKeyBuilder> redisKeyBuilders = new ArrayList<>();
for (RecallRequest req : requests) {
redisKeyBuilders.add(req.redisKeyBuilder());
}
List<String> cacheValues = searchRedis.searchValueOperations.multiGet(redisKeyBuilders);
for (int i = 0; i < requests.size(); i++) {
RecallRequest request = requests.get(i);
String cacheValue = cacheValues.get(i);
RecallResponse response = RecallResponseHelper.buildResonse(cacheValue);
results.add(new RecallRequestResponse(request, response));
}
return results;
}
return recallRequestResponses;
/**
* 获取未命中缓存的请求
* @param requestResponseResults
* @return
*/
private List<RecallRequest> buildNotCachedRequests(List<RecallRequestResponse> requestResponseResults,int maxCount){
List<RecallRequest> remainRequests = new ArrayList<>();
for (RecallRequestResponse requestResponse : requestResponseResults) {
if (requestResponse.getResponse() == null) {
remainRequests.add(requestResponse.getRequest());
}
if(remainRequests.size()>=maxCount){
break;
}
}
return remainRequests;
}
/**
... ... @@ -104,78 +73,35 @@ public class BatchRecallComponent {
* @param notCachedRequests
* @return
*/
private List<RecallRequestResponse> queryNotCachedResult(List<RecallRequest> notCachedRequests) {
private Map<RedisKeyBuilder,RecallResponse> queryNotCachedResult(List<CacheRecallRequestResponse> notCachedRequests) {
//1、构造请求参数
List<SearchParam> searchParams = new ArrayList<>();
for (RecallRequest request : notCachedRequests) {
searchParams.add(request.searchParam());
for (CacheRecallRequestResponse requestResponse : notCachedRequests) {
searchParams.add(requestResponse.getRequest().searchParam());
}
//2、执行搜索
List<SearchResult> searchResults = searchCommonService.doMutiSearch(ISearchConstants.INDEX_NAME_PRODUCT_INDEX, searchParams);
//3、构造返回结果
List<RecallRequestResponse> results = new ArrayList<>();
Map<RedisKeyBuilder,RecallResponse> notCachedResults = new HashMap<>();
for (int i = 0; i < notCachedRequests.size(); i++) {
RecallRequest request = notCachedRequests.get(i);
RecallRequest request = notCachedRequests.get(i).getRequest();
SearchResult searchResult = searchResults.get(i);
RecallResponse response = RecallResponseHelper.buildResonse(searchResult);
results.add(new RecallRequestResponse(request,response));
}
return results;
}
/**
* 将请求结果加入缓存
* @param notCachedResults
*/
private void addResultsToCache(List<RecallRequestResponse> notCachedResults){
//1、按缓存时间分组
Map<Integer,List<RecallRequestResponse>> groupMap = CollectionUtils.toListMap(notCachedResults, new Transfer<RecallRequestResponse, Integer>() {
@Override
public Integer transfer(RecallRequestResponse recallRequestResponse) {
return recallRequestResponse.getRequest().cacheTimeInSecond();
}
});
//2、按缓存时间大小直接加入缓存
for (Map.Entry<Integer,List<RecallRequestResponse>> entry: groupMap.entrySet()) {
this.addRequestResponseToCache(entry.getValue(),entry.getKey());
RecallResponse response = this.buildResonse(searchResult);
notCachedResults.put(request.redisKeyBuilder(),response);
}
return notCachedResults;
}
private void addRequestResponseToCache(List<RecallRequestResponse> requestResponseList,int cacheTimeInSecond){
//1、构造缓存结果
Map<RedisKeyBuilder, String> toCacheResults = new HashMap<>();
for (RecallRequestResponse requestResponse :requestResponseList) {
RecallRequest request = requestResponse.getRequest();
RecallResponse response = requestResponse.getResponse();
toCacheResults.put(request.redisKeyBuilder(), RecallResponseHelper.serializerToString(response));
}
//2、加入缓存
searchRedis.searchRedisTemplate.mset(toCacheResults, cacheTimeInSecond);
}
/**
* 填充返回结果
* @param results
* @param remainRequestResults
*/
private void fillResults(List<RecallRequestResponse> results,List<RecallRequestResponse> remainRequestResults){
Map<String,RecallRequestResponse> remainRequestResponseMap = CollectionUtils.toMap(remainRequestResults, new Transfer<RecallRequestResponse,String>(){
@Override
public String transfer(RecallRequestResponse requestResponse) {
return requestResponse.getRequest().redisKeyBuilder().getKey();
}
});
for (RecallRequestResponse result: results) {
if(result.getResponse()!=null){
continue;
}
RecallRequestResponse requestResponse = remainRequestResponseMap.get(result.getRequest().redisKeyBuilder().getKey());
if(requestResponse==null){
continue;
}
result.setResponse(requestResponse.getResponse());
private RecallResponse buildResonse(SearchResult searchResult) {
List<Map<String, Object>> results = searchResult.getResultList();
List<RecallResponse.RecallSkn> recallSkns = new ArrayList<>();
for (Map<String, Object> result : results) {
Integer productSkn = MapUtils.getInteger(result, ProductIndexEsField.productSkn, 0);
Integer brandId = MapUtils.getInteger(result, ProductIndexEsField.brandId, 0);
Integer middleSortId = MapUtils.getInteger(result, ProductIndexEsField.middleSortId, 0);
recallSkns.add(new RecallResponse.RecallSkn(productSkn, brandId, middleSortId));
}
return new RecallResponse(searchResult.getTotal(), recallSkns);
}
}
... ...
package com.yoho.search.recall.scene.component;
import com.yoho.core.redis.cluster.operations.serializer.RedisKeyBuilder;
import com.yoho.search.base.utils.CollectionUtils;
import com.yoho.search.base.utils.Transfer;
import com.yoho.search.common.cache.impls.EhCache;
import com.yoho.search.common.cache.impls.SearchRedis;
import com.yoho.search.recall.scene.models.CacheRequestResponse;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.*;
@Component
public class CacheRequestResponseComponent {
private static final Logger logger = LoggerFactory.getLogger(CacheRequestResponseComponent.class);
@Autowired
private SearchRedis searchRedis;
@Autowired
private EhCache ehCache;
/**
* 从缓存中获取response
* @param cacheRequestResponses
* @param useEhCache
* @param <K>
* @param <V>
* @param <T>
*/
public <K, V, T extends CacheRequestResponse<K, V>> void batchFillResponseFromCache(final List<T> cacheRequestResponses, boolean useEhCache) {
try {
Collection<RedisKeyBuilder> keys = new ArrayList<>();
for (T requestResponse : cacheRequestResponses) {
RedisKeyBuilder redisKeyBuilder = requestResponse.getRequestRedisKeyBuilder();
keys.add(redisKeyBuilder);
}
List<String> cachedValues = this.mutiGetFromCache(keys, useEhCache);
for (int i = 0; i < cacheRequestResponses.size(); i++) {
T requestResponse = cacheRequestResponses.get(i);
String cachedValue = cachedValues.get(i);
if (!StringUtils.isBlank(cachedValue)) {
V response = requestResponse.getToResponseTransfer().transfer(cachedValue);
requestResponse.setResponse(response, false);
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
/**
* 过滤出未命中缓存的结果
* @param cacheRequestResponses
* @param <K>
* @param <V>
* @param <T>
* @return
*/
public <K, V, T extends CacheRequestResponse<K, V>> List<T> filterMissCacheRequests(List<T> cacheRequestResponses){
List<T> notCachedRequests = new ArrayList<>();
for (T requestResponse : cacheRequestResponses) {
if (requestResponse != null && requestResponse.getResponse()==null) {
notCachedRequests.add(requestResponse);
}
}
return notCachedRequests;
}
/**
* 使用查询结果填充response
* @param cacheRequestResponses
* @param queryResults
* @param <K>
* @param <V>
* @param <T>
*/
public <K, V, T extends CacheRequestResponse<K, V>> void batchFillResponseWithQueryResults(List<T> cacheRequestResponses,Map<RedisKeyBuilder,V> queryResults){
for (T sknInfoRequestResponse : cacheRequestResponses) {
if(sknInfoRequestResponse.getResponse()!=null){
continue;
}
RedisKeyBuilder redisKeyBuilder = sknInfoRequestResponse.getRequestRedisKeyBuilder();
V response = queryResults.get(redisKeyBuilder);
if(response!=null){
sknInfoRequestResponse.setResponse(response,true);
}
}
}
/**
* 将未缓存的response添加至缓存
* @param cacheRequestResponses
* @param useEhCache
* @param <K>
* @param <V>
* @param <T>
*/
public <K, V, T extends CacheRequestResponse<K, V>> void batchAddResponseToCache(List<T> cacheRequestResponses, boolean useEhCache) {
//1、按缓存时间分组
Map<Integer, List<T>> groupMap = CollectionUtils.toListMap(cacheRequestResponses, new Transfer<T, Integer>() {
@Override
public Integer transfer(T t) {
return t.getCacheTimeInSecond();
}
});
//2、按缓存时间大小直接加入缓存
for (Map.Entry<Integer, List<T>> entry : groupMap.entrySet()) {
this.batchAddResponseToCache(entry.getValue(), useEhCache, entry.getKey());
}
}
private <K, V, T extends CacheRequestResponse<K, V>> void batchAddResponseToCache(List<T> cacheRequestResponses, boolean useEhCache, int timeOutInSecond) {
try {
Map<RedisKeyBuilder, String> toCacheMap = new HashMap<>();
for (T requestResponse : cacheRequestResponses) {
if (!requestResponse.isNeedRecache()){
continue;
}
RedisKeyBuilder redisKeyBuilder = requestResponse.getRequestRedisKeyBuilder();
V response = requestResponse.getResponse();
if(redisKeyBuilder==null || response==null){
continue;
}
String cacheValue = requestResponse.getFromResponseTransfer().transfer(response);
toCacheMap.put(redisKeyBuilder, cacheValue);
}
if (toCacheMap.isEmpty()) {
return;
}
this.batchAddToCache(toCacheMap, useEhCache, timeOutInSecond);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
private List<String> mutiGetFromCache(Collection<RedisKeyBuilder> keys, boolean useEhCache) {
if (useEhCache) {
return ehCache.mutiGet(keys);
} else {
return searchRedis.searchValueOperations.multiGet(keys);
}
}
private void batchAddToCache(Map<RedisKeyBuilder, String> toCacheMap, boolean useEhCache, int timeOutInSecond) {
if (useEhCache) {
ehCache.mutiSet(toCacheMap, timeOutInSecond);
} else {
searchRedis.searchRedisTemplate.mset(toCacheMap, timeOutInSecond);
}
}
}
... ...
package com.yoho.search.recall.scene.helper;
import com.alibaba.fastjson.JSON;
import com.yoho.search.base.utils.ProductIndexEsField;
import com.yoho.search.core.es.model.SearchResult;
import com.yoho.search.recall.scene.models.RecallRequest;
import com.yoho.search.recall.scene.models.RecallResponse;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class RecallResponseHelper {
public static String serializerToString(RecallResponse response) {
return JSON.toJSONString(response);
}
public static RecallResponse buildResonse(String value) {
if(StringUtils.isBlank(value)){
return null;
}
try {
return JSON.parseObject(value, RecallResponse.class);
} catch (Exception e) {
return null;
}
}
public static RecallResponse buildResonse(SearchResult searchResult) {
List<Map<String, Object>> results = searchResult.getResultList();
List<RecallResponse.RecallSkn> recallSkns = new ArrayList<>();
for (Map<String, Object> result : results) {
Integer productSkn = MapUtils.getInteger(result, ProductIndexEsField.productSkn, 0);
Integer brandId = MapUtils.getInteger(result, ProductIndexEsField.brandId, 0);
Integer middleSortId = MapUtils.getInteger(result, ProductIndexEsField.middleSortId, 0);
recallSkns.add(new RecallResponse.RecallSkn(productSkn, brandId, middleSortId));
}
return new RecallResponse(searchResult.getTotal(), recallSkns);
}
}
package com.yoho.search.recall.scene.models;
import com.alibaba.fastjson.JSON;
import com.yoho.core.redis.cluster.operations.serializer.RedisKeyBuilder;
import com.yoho.search.base.utils.Transfer;
public class CacheRecallRequestResponse extends CacheRequestResponse<RecallRequest,RecallResponse> {
static Transfer<RecallRequest,RedisKeyBuilder> requestRedisKeyBuilderTransfer = new Transfer<RecallRequest, RedisKeyBuilder>() {
@Override
public RedisKeyBuilder transfer(RecallRequest recallRequest) {
return recallRequest.redisKeyBuilder();
}
};
static Transfer<String,RecallResponse> toResponseTransfer = new Transfer<String, RecallResponse>() {
@Override
public RecallResponse transfer(String jsonValue) {
return JSON.parseObject(jsonValue, RecallResponse.class);
}
};
static Transfer<RecallResponse,String> fromResponseTransfer = new Transfer<RecallResponse, String>() {
@Override
public String transfer(RecallResponse recallResponse) {
return JSON.toJSONString(recallResponse);
}
};
public CacheRecallRequestResponse(RecallRequest request) {
super(request,requestRedisKeyBuilderTransfer,request.cacheTimeInSecond(), toResponseTransfer,fromResponseTransfer);
}
}
... ...
... ... @@ -4,49 +4,57 @@ import com.yoho.core.redis.cluster.operations.serializer.RedisKeyBuilder;
import com.yoho.search.base.utils.Transfer;
public class CacheRequestResponse<K,V> {
private K keyObject;
private Transfer<K,RedisKeyBuilder> redisKeyBuilderTransfer;
private V valueObject;
private Transfer<String,V> toValueTransfer;
private Transfer<V,String> fromValueTransfer;
private boolean needRecache;
private K request;
private RedisKeyBuilder redisKeyBuilder;
private int cacheTimeInSecond;
public CacheRequestResponse(K keyObject,Transfer<K,RedisKeyBuilder> redisKeyBuilderTransfer,Transfer<String,V> toValueTransfer,Transfer<V,String> fromValueTransfer){
this.keyObject = keyObject;
this.redisKeyBuilderTransfer = redisKeyBuilderTransfer;
this.toValueTransfer = toValueTransfer;
this.fromValueTransfer = fromValueTransfer;
private V response;
private Transfer<String,V> toResponseTransfer;
private Transfer<V,String> fromResponseTransfer;
private boolean needRecache = false;
public CacheRequestResponse(K request,Transfer<K,RedisKeyBuilder> requestRedisKeyBuilder,int cacheTimeInSecond,Transfer<String,V> toResponseTransfer,Transfer<V,String> fromResponseTransfer){
this.request = request;
this.redisKeyBuilder = requestRedisKeyBuilder.transfer(request);
this.cacheTimeInSecond = cacheTimeInSecond;
this.toResponseTransfer = toResponseTransfer;
this.fromResponseTransfer = fromResponseTransfer;
}
public K getKeyObject() {
return keyObject;
public K getRequest() {
return request;
}
public V getValueObject() {
return valueObject;
public V getResponse() {
return response;
}
public Transfer<K, RedisKeyBuilder> getRedisKeyBuilderTransfer() {
return redisKeyBuilderTransfer;
public RedisKeyBuilder getRequestRedisKeyBuilder() {
return redisKeyBuilder;
}
public Transfer<String, V> getToValueTransfer() {
return toValueTransfer;
public Transfer<String, V> getToResponseTransfer() {
return toResponseTransfer;
}
public Transfer<V, String> getFromValueTransfer() {
return fromValueTransfer;
public Transfer<V, String> getFromResponseTransfer() {
return fromResponseTransfer;
}
public void setValueObject(V valueObject) {
this.valueObject = valueObject;
public void setResponse(V response,boolean needRecache) {
this.response = response;
this.needRecache = needRecache;
}
public boolean isNeedRecache() {
return needRecache;
}
public void setNeedRecache(boolean needRecache) {
this.needRecache = needRecache;
public int getCacheTimeInSecond() {
return cacheTimeInSecond;
}
}
... ...
package com.yoho.search.recall.scene.models;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yoho.core.redis.cluster.operations.serializer.RedisKeyBuilder;
import com.yoho.search.base.utils.Transfer;
import java.util.HashMap;
import java.util.Map;
public class CacheSknInfoRequestResponse extends CacheRequestResponse<Integer, Map<String, Object>> {
static Transfer<Integer, RedisKeyBuilder> requestRedisKeyBuilder = new Transfer<Integer, RedisKeyBuilder>() {
@Override
public RedisKeyBuilder transfer(Integer productSkn) {
return RedisKeyBuilder.newInstance().appendFixed("YOHOSEARCH:").appendFixed("SKN:").appendVar(productSkn);
}
};
static Transfer<String, Map<String, Object>> toResponseTransfer = new Transfer<String, Map<String, Object>>() {
@Override
public Map<String, Object> transfer(String value) {
Map<String, Object> product = new HashMap<>();
product.putAll(JSONObject.parseObject(value));
return product;
}
};
static Transfer<Map<String, Object>, String> fromResponseTransfer = new Transfer<Map<String, Object>, String>() {
@Override
public String transfer(Map<String, Object> product) {
return JSON.toJSONString(product);
}
};
public CacheSknInfoRequestResponse(Integer productSkn, int cacheTimeInSecond) {
super(productSkn, requestRedisKeyBuilder, cacheTimeInSecond * 60, toResponseTransfer,fromResponseTransfer);
}
}
... ...
package com.yoho.search.recall.scene.models;
public class RecallRequestResponse {
private RecallRequest request;
private RecallResponse response;
public RecallRequestResponse(RecallRequest request, RecallResponse response) {
this.request = request;
this.response = response;
}
public RecallRequest getRequest() {
return request;
}
public void setResponse(RecallResponse response) {
this.response = response;
}
public RecallResponse getResponse() {
return response;
}
}
package com.yoho.search.recall.scene.component;
package com.yoho.search.recall.scene.request;
import com.yoho.search.recall.scene.models.RecallParams;
import com.yoho.search.recall.scene.models.RecallRequest;
... ...
package com.yoho.search.recall.scene.component;
package com.yoho.search.recall.scene.request;
import com.yoho.search.recall.scene.models.RecallRequest;
import com.yoho.search.recall.scene.models.RecallRequestResponse;
import com.yoho.search.recall.scene.models.CacheRecallRequestResponse;
import com.yoho.search.recall.scene.models.RecallResponse;
import com.yoho.search.recall.scene.models.RecallResponseBatch;
import com.yoho.search.recall.scene.strategy.StrategyNameEnum;
... ... @@ -20,7 +20,7 @@ public class BatchResponseBuilder {
* @param requestResponses
* @return
*/
public RecallResponseBatch buildRecallResponseBatch(List<RecallRequestResponse> requestResponses){
public RecallResponseBatch buildRecallResponseBatch(List<CacheRecallRequestResponse> requestResponses){
//1、从兜底类型中获取总数
long total = this.getTotalCount(requestResponses);
//2、获取召回的skn
... ... @@ -35,9 +35,9 @@ public class BatchResponseBuilder {
* @param requestResponses
* @return
*/
private long getTotalCount(List<RecallRequestResponse> requestResponses) {
private long getTotalCount(List<CacheRecallRequestResponse> requestResponses) {
long total = 0;
for (RecallRequestResponse requestResponse : requestResponses) {
for (CacheRecallRequestResponse requestResponse : requestResponses) {
RecallRequest request = requestResponse.getRequest();
RecallResponse response = requestResponse.getResponse();
if (request ==null || response == null) {
... ... @@ -57,10 +57,10 @@ public class BatchResponseBuilder {
* @param requestResponses
* @return
*/
private List<RecallResponseBatch.SknResult> distinctRecallSkn(List<RecallRequestResponse> requestResponses) {
private List<RecallResponseBatch.SknResult> distinctRecallSkn(List<CacheRecallRequestResponse> requestResponses) {
List<RecallResponseBatch.SknResult> sknResults = new ArrayList<>();
Map<Integer,List<String>> sknRequestMaps = new HashMap<>();
for (RecallRequestResponse requestResponse : requestResponses) {
for (CacheRecallRequestResponse requestResponse : requestResponses) {
RecallRequest request = requestResponse.getRequest();
RecallResponse response = requestResponse.getResponse();
if (request==null || response == null || response.getSkns()==null){
... ...
package com.yoho.search.recall.scene.component;
package com.yoho.search.recall.scene.request;
import com.yoho.search.recall.scene.models.ParamQueryFilter;
import com.yoho.search.recall.scene.models.RecallParams;
... ...
package com.yoho.search.recall.scene.component;
package com.yoho.search.recall.scene.request;
import com.yoho.search.base.utils.CollectionUtils;
import com.yoho.search.recall.scene.models.RecallParams;
... ...