Authored by hugufei

拆包

Showing 18 changed files with 379 additions and 37 deletions
... ... @@ -5,7 +5,8 @@ import com.yoho.search.recall.scene.builder.request.CommonRecallRequestBuilder;
import com.yoho.search.recall.scene.builder.request.SortPriceRecallRequestBuilder;
import com.yoho.search.recall.scene.builder.response.RecallResponseBatchBuilder;
import com.yoho.search.recall.scene.builder.response.RecallSknResultBuilder;
import com.yoho.search.recall.scene.component.BatchRecallComponent;
import com.yoho.search.recall.scene.cache.CacheRecallRequestResponse;
import com.yoho.search.recall.scene.beans.BatchRecallComponent;
import com.yoho.search.recall.scene.models.*;
import com.yoho.search.recall.scene.models.RecallResponseBatch;
import com.yoho.search.recall.scene.persional.PersionalFactor;
... ...
... ... @@ -7,7 +7,7 @@ import com.yoho.search.base.utils.ProductIndexEsField;
import com.yoho.search.core.es.model.SearchParam;
import com.yoho.search.core.es.model.SearchResult;
import com.yoho.search.models.SearchApiResult;
import com.yoho.search.recall.scene.component.*;
import com.yoho.search.recall.scene.beans.BacthSknInfoComponent;
import com.yoho.search.recall.scene.strategy.helper.SortBuilderHelper;
import com.yoho.search.recall.scene.models.*;
import com.yoho.search.recall.scene.builder.request.RecallParamsBuilder;
... ... @@ -116,11 +116,10 @@ public class SceneRecallService {
//5、设置排序
List<SortBuilder<?>> sortBuilders = new ArrayList<>();
sortBuilders.add(SortBuilderHelper.getHeatValueDescSort());
sortBuilders.add(SortBuilderHelper.getSevendayMoneyDescSort());
sortBuilders.add(SortBuilderHelper.getIdDescSort());
searchParam.setSortBuilders(sortBuilders);
SearchResult searchResult = searchCommonService.doSearch(ISearchConstants.INDEX_NAME_PRODUCT_INDEX,searchParam);
SearchResult searchResult = searchCommonService.doSearch(ISearchConstants.INDEX_NAME_PRODUCT_BASE_INDEX,searchParam);
List<Integer> results = new ArrayList<>();
for (Map<String, Object> product : searchResult.getResultList()) {
... ...
package com.yoho.search.recall.scene.beans;
import com.yoho.search.base.utils.ISearchConstants;
import com.yoho.search.base.utils.ProductIndexEsField;
import com.yoho.search.core.es.model.SearchParam;
import com.yoho.search.core.es.model.SearchResult;
import com.yoho.search.recall.scene.cache.CacheRequestResponseComponent;
import com.yoho.search.recall.scene.cache.CacheSknInfoRequestResponse;
import com.yoho.search.recall.scene.models.SknInfoRequest;
import com.yoho.search.service.base.SearchCommonService;
import com.yoho.search.service.base.index.ProductIndexBaseService;
import org.apache.commons.collections.MapUtils;
import org.elasticsearch.index.query.QueryBuilders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Component
public class BacthSknInfoComponent {
@Autowired
private SearchCommonService searchCommonService;
@Autowired
private ProductIndexBaseService productIndexBaseService;
@Autowired
private CacheRequestResponseComponent cacheRequestResponseComponent;
private static final boolean useEhCache = true;
/**
* 按skn查询并按顺序返回
*
* @param productSkns
* @return
*/
public List<Map<String, Object>> queryProductListBySkn(List<Integer> productSkns,int size){
//1、批量查询SKN信息
List<CacheSknInfoRequestResponse> sknInfoCacheRequestRespons = this.batchQuery(productSkns);
//2、构造返回结果
List<Map<String, Object>> finalResults = new ArrayList<>();
for (CacheSknInfoRequestResponse sknInfoCacheRequestResponse : sknInfoCacheRequestRespons) {
if(sknInfoCacheRequestResponse !=null && sknInfoCacheRequestResponse.getResponse()!=null){
finalResults.add(sknInfoCacheRequestResponse.getResponse());
}
if(finalResults.size()>=size){
break;
}
}
return finalResults;
}
private List<CacheSknInfoRequestResponse> batchQuery(List<Integer> productSkns){
//1、构建请求与返回结果
final List<CacheSknInfoRequestResponse> sknInfoCacheRequestRespons = new ArrayList<>();
for (Integer productSkn : productSkns) {
sknInfoCacheRequestRespons.add(new CacheSknInfoRequestResponse(new SknInfoRequest(productSkn)));
}
//2、批量从缓存中获取
cacheRequestResponseComponent.batchFillResponseFromCache(sknInfoCacheRequestRespons,useEhCache);
//3、获取未命中缓存的请求
List<CacheSknInfoRequestResponse> missCacheRequests = cacheRequestResponseComponent.filterMissCacheRequests(sknInfoCacheRequestRespons);
//4、执行批量查询
Map<String,Map<String, Object>> queryResults = this.batchQueryMissCacheRequests(missCacheRequests);
//5、填充查询结果
cacheRequestResponseComponent.batchFillResponseWithQueryResults(sknInfoCacheRequestRespons,queryResults);
//6、将CacheRequestResponse中需要缓存的key加入缓存
cacheRequestResponseComponent.batchAddResponseToCache(sknInfoCacheRequestRespons,useEhCache);
return sknInfoCacheRequestRespons;
}
private Map<String,Map<String, Object>> batchQueryMissCacheRequests(List<CacheSknInfoRequestResponse> notCachedRequestResponse) {
//1、合法性判断
Map<String,Map<String, Object>> results = new HashMap<>();
if(notCachedRequestResponse==null||notCachedRequestResponse.isEmpty()){
return results;
}
//2、获取skn
List<Integer> productSkns = new ArrayList<>();
for (CacheSknInfoRequestResponse sknInfoCacheRequestResponse : notCachedRequestResponse) {
productSkns.add(sknInfoCacheRequestResponse.getRequest().getProductSkn());
}
//3、构建SearchParam
SearchParam searchParam = new SearchParam();
searchParam.setOffset(0);
searchParam.setSize(productSkns.size());
searchParam.setFiter(QueryBuilders.termsQuery(ProductIndexEsField.productSkn, productSkns));
searchParam.setIncludeFields(productIndexBaseService.getProductIndexIncludeFields());
SearchResult searchResult = searchCommonService.doSearch(ISearchConstants.INDEX_NAME_PRODUCT_INDEX, searchParam);
List<Map<String, Object>> productList = productIndexBaseService.getProductListWithPricePlan(searchResult.getResultList());
//4、构建SKN临时结果
Map<Integer,Map<String, Object>> productTempMap = new HashMap<>();
for (Map<String, Object> product: productList) {
productTempMap.put(MapUtils.getIntValue(product,"product_skn",0),product);
}
//5、构造最终结果
for (CacheSknInfoRequestResponse requestResponse :notCachedRequestResponse ) {
results.put(requestResponse.getRequest().redisKeyBuilder().getKey(),productTempMap.get(requestResponse.getRequest().getProductSkn()));
}
return results;
}
}
... ...
package com.yoho.search.recall.scene.beans;
import com.yoho.search.base.utils.ISearchConstants;
import com.yoho.search.base.utils.ProductIndexEsField;
import com.yoho.search.core.es.model.SearchParam;
import com.yoho.search.core.es.model.SearchResult;
import com.yoho.search.recall.scene.cache.CacheRequestResponseComponent;
import com.yoho.search.recall.scene.cache.CacheRecallRequestResponse;
import com.yoho.search.recall.scene.models.RecallRequest;
import com.yoho.search.recall.scene.models.RecallResponse;
import com.yoho.search.service.base.SearchCommonService;
import org.apache.commons.collections.MapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Component
public class BatchRecallComponent {
private static final Logger logger = LoggerFactory.getLogger(BatchRecallComponent.class);
@Autowired
private CacheRequestResponseComponent cacheRequestResponseHelper;
@Autowired
private SearchCommonService searchCommonService;
private static final boolean useEhCache = false;
/**
* 批量召回入口
*
* @param batchRequests
* @return
*/
public List<CacheRecallRequestResponse> batchRecallAndCache(final List<RecallRequest> batchRequests) {
//1、构造返回结果
final List<CacheRecallRequestResponse> recallRequestResponses = new ArrayList<>();
for (RecallRequest request : batchRequests) {
recallRequestResponses.add(new CacheRecallRequestResponse(request));
}
//2、先从缓存中填充response
cacheRequestResponseHelper.batchFillResponseFromCache(recallRequestResponses,useEhCache);
//3、构造未命中缓存的请求-最多透传x个
final List<CacheRecallRequestResponse> missCacheRequests =cacheRequestResponseHelper.filterMissCacheRequests(recallRequestResponses);
//4、如果remainRequests为空,则说明全部命中了缓存,直接返回即可
if (missCacheRequests.isEmpty()) {
return recallRequestResponses;
}
//5、处理剩余请求
Map<String,RecallResponse> notCacheResults = this.queryNotCachedResult(missCacheRequests);
//6、填充recallRequestResponses
cacheRequestResponseHelper.batchFillResponseWithQueryResults(recallRequestResponses,notCacheResults);
//7、将尚未缓存的对象加入缓存
cacheRequestResponseHelper.batchAddResponseToCache(recallRequestResponses,useEhCache);
return recallRequestResponses;
}
/**
* 查询命中缓存的请求
* @param notCachedRequests
* @return
*/
private Map<String,RecallResponse> queryNotCachedResult(List<CacheRecallRequestResponse> notCachedRequests) {
//1、构造请求参数
List<SearchParam> searchParams = new ArrayList<>();
for (CacheRecallRequestResponse requestResponse : notCachedRequests) {
searchParams.add(requestResponse.getRequest().searchParam());
}
//2、执行搜索
List<SearchResult> searchResults = searchCommonService.doMutiSearch(ISearchConstants.INDEX_NAME_PRODUCT_INDEX, searchParams);
//3、构造返回结果
Map<String,RecallResponse> notCachedResults = new HashMap<>();
for (int i = 0; i < notCachedRequests.size(); i++) {
RecallRequest request = notCachedRequests.get(i).getRequest();
SearchResult searchResult = searchResults.get(i);
RecallResponse response = this.buildResonse(searchResult);
notCachedResults.put(request.redisKeyBuilder().getKey(),response);
}
return notCachedResults;
}
private RecallResponse buildResonse(SearchResult searchResult) {
List<Map<String, Object>> results = searchResult.getResultList();
List<RecallResponse.RecallSkn> recallSkns = new ArrayList<>();
for (Map<String, Object> result : results) {
Integer productSkn = MapUtils.getInteger(result, ProductIndexEsField.productSkn, 0);
Integer brandId = MapUtils.getInteger(result, ProductIndexEsField.brandId, 0);
Integer middleSortId = MapUtils.getInteger(result, ProductIndexEsField.middleSortId, 0);
recallSkns.add(new RecallResponse.RecallSkn(productSkn, brandId, middleSortId));
}
return new RecallResponse(searchResult.getTotal(), recallSkns);
}
}
... ...
package com.yoho.search.recall.scene.beans;
public class RecallSknComponent {
}
... ...
package com.yoho.search.recall.scene.builder.response;
import com.yoho.search.recall.scene.models.CacheRecallRequestResponse;
import com.yoho.search.recall.scene.cache.CacheRecallRequestResponse;
import com.yoho.search.recall.scene.models.RecallRequest;
import com.yoho.search.recall.scene.models.RecallResponse;
import com.yoho.search.recall.scene.models.RecallResponseBatch;
... ...
package com.yoho.search.recall.scene.cache;
import com.alibaba.fastjson.JSON;
import com.yoho.core.redis.cluster.operations.serializer.RedisKeyBuilder;
import com.yoho.search.base.utils.Transfer;
import com.yoho.search.recall.scene.cache.CacheRequestResponse;
import com.yoho.search.recall.scene.models.RecallRequest;
import com.yoho.search.recall.scene.models.RecallResponse;
public class CacheRecallRequestResponse extends CacheRequestResponse<RecallRequest,RecallResponse> {
public CacheRecallRequestResponse(RecallRequest request) {
super(request);
}
@Override
public Transfer<String, RecallResponse> getToResponseTransfer() {
return toResponseTransfer;
}
@Override
public Transfer<RecallResponse, String> getFromResponseTransfer() {
return fromResponseTransfer;
}
private static Transfer<String,RecallResponse> toResponseTransfer = new Transfer<String, RecallResponse>() {
@Override
public RecallResponse transfer(String jsonValue) {
return JSON.parseObject(jsonValue, RecallResponse.class);
}
};
private static Transfer<RecallResponse,String> fromResponseTransfer = new Transfer<RecallResponse, String>() {
@Override
public String transfer(RecallResponse recallResponse) {
return JSON.toJSONString(recallResponse);
}
};
}
... ...
package com.yoho.search.recall.scene.cache;
import com.alibaba.fastjson.JSON;
import com.yoho.search.base.utils.Transfer;
import com.yoho.search.recall.scene.cache.CacheRequestResponse;
import com.yoho.search.recall.scene.models.RecallSknParams;
import com.yoho.search.recall.scene.models.RecallSknResult;
public class CacheRecallSknRequestResponse extends CacheRequestResponse<RecallSknParams,RecallSknResult> {
public CacheRecallSknRequestResponse(RecallSknParams request) {
super(request);
}
private static Transfer<String,RecallSknResult> toResponseTransfer = new Transfer<String, RecallSknResult>() {
@Override
public RecallSknResult transfer(String jsonValue) {
return JSON.parseObject(jsonValue, RecallSknResult.class);
}
};
private static Transfer<RecallSknResult,String> fromResponseTransfer = new Transfer<RecallSknResult, String>() {
@Override
public String transfer(RecallSknResult recallSknResult) {
return JSON.toJSONString(recallSknResult);
}
};
@Override
public Transfer<String, RecallSknResult> getToResponseTransfer() {
return toResponseTransfer;
}
@Override
public Transfer<RecallSknResult, String> getFromResponseTransfer() {
return fromResponseTransfer;
}
}
... ...
package com.yoho.search.recall.scene.cache;
import com.yoho.search.base.utils.Transfer;
import com.yoho.search.recall.scene.models.ICacheRequest;
public abstract class CacheRequestResponse<K extends ICacheRequest,V> extends ICacheResponse<V> {
public class CacheRequestResponse<K extends ICacheRequest,V> {
private K request;
private V response;
private Transfer<String,V> toResponseTransfer;
private Transfer<V,String> fromResponseTransfer;
private boolean needRecache = false;
public CacheRequestResponse(K request ,Transfer<String,V> toResponseTransfer,Transfer<V,String> fromResponseTransfer){
public CacheRequestResponse(K request){
this.request = request;
this.toResponseTransfer = toResponseTransfer;
this.fromResponseTransfer = fromResponseTransfer;
}
public K getRequest() {
... ... @@ -24,14 +18,6 @@ public class CacheRequestResponse<K extends ICacheRequest,V> {
return response;
}
public Transfer<String, V> getToResponseTransfer() {
return toResponseTransfer;
}
public Transfer<V, String> getFromResponseTransfer() {
return fromResponseTransfer;
}
public void setResponse(V response,boolean needRecache) {
this.response = response;
this.needRecache = needRecache;
... ... @@ -40,5 +26,4 @@ public class CacheRequestResponse<K extends ICacheRequest,V> {
public boolean isNeedRecache() {
return needRecache;
}
}
... ...
... ... @@ -5,7 +5,6 @@ import com.yoho.search.base.utils.CollectionUtils;
import com.yoho.search.base.utils.Transfer;
import com.yoho.search.common.cache.impls.EhCache;
import com.yoho.search.core.redis.components.YohoSearchRedisComponent;
import com.yoho.search.recall.scene.models.ICacheRequest;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
... ...
package com.yoho.search.recall.scene.cache;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yoho.core.redis.cluster.operations.serializer.RedisKeyBuilder;
import com.yoho.search.base.utils.Transfer;
import com.yoho.search.recall.scene.cache.CacheRequestResponse;
import com.yoho.search.recall.scene.models.SknInfoRequest;
import java.util.HashMap;
import java.util.Map;
public class CacheSknInfoRequestResponse extends CacheRequestResponse<SknInfoRequest, Map<String, Object>> {
public CacheSknInfoRequestResponse(SknInfoRequest sknInfoRequest) {
super(sknInfoRequest);
}
static Transfer<String, Map<String, Object>> toResponseTransfer = new Transfer<String, Map<String, Object>>() {
@Override
public Map<String, Object> transfer(String value) {
Map<String, Object> product = new HashMap<>();
product.putAll(JSONObject.parseObject(value));
return product;
}
};
static Transfer<Map<String, Object>, String> fromResponseTransfer = new Transfer<Map<String, Object>, String>() {
@Override
public String transfer(Map<String, Object> product) {
return JSON.toJSONString(product);
}
};
@Override
public Transfer<String, Map<String, Object>> getToResponseTransfer() {
return toResponseTransfer;
}
@Override
public Transfer<Map<String, Object>, String> getFromResponseTransfer() {
return fromResponseTransfer;
}
}
... ...
package com.yoho.search.recall.scene.cache;
import com.yoho.core.redis.cluster.operations.serializer.RedisKeyBuilder;
public interface ICacheRequest {
/**
* 缓存key
* @return
*/
RedisKeyBuilder redisKeyBuilder();
/**
* 缓存时间
* @return
*/
int cacheTimeInSecond();
}
... ...
package com.yoho.search.recall.scene.cache;
import com.yoho.search.base.utils.Transfer;
public abstract class ICacheResponse<V> {
public abstract Transfer<String, V> getToResponseTransfer();
public abstract Transfer<V, String> getFromResponseTransfer();
}
... ...
package com.yoho.search.recall.scene.models;
import com.yoho.search.base.utils.Transfer;
public interface ICacheResponse<V> {
Transfer<String,V> toResponseTransfer();
Transfer<V,String> fromResponseTransfer();
}
... ... @@ -3,6 +3,7 @@ package com.yoho.search.recall.scene.models;
import com.yoho.core.redis.cluster.operations.serializer.RedisKeyBuilder;
import com.yoho.search.base.utils.MD5Util;
import com.yoho.search.core.es.model.SearchParam;
import com.yoho.search.recall.scene.cache.ICacheRequest;
import com.yoho.search.recall.scene.strategy.IStrategy;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
... ...
package com.yoho.search.recall.scene.models;
import com.yoho.search.recall.scene.models.RecallResponse;
import java.util.List;
public class RecallResponseBatch {
... ...
... ... @@ -3,6 +3,7 @@ package com.yoho.search.recall.scene.models;
import com.alibaba.fastjson.JSON;
import com.yoho.core.redis.cluster.operations.serializer.RedisKeyBuilder;
import com.yoho.search.base.utils.MD5Util;
import com.yoho.search.recall.scene.cache.ICacheRequest;
import com.yoho.search.recall.scene.constants.CacheTimeConstants;
import java.util.List;
... ...
package com.yoho.search.recall.scene.models;
import com.yoho.core.redis.cluster.operations.serializer.RedisKeyBuilder;
import com.yoho.search.recall.scene.cache.ICacheRequest;
import com.yoho.search.recall.scene.constants.CacheTimeConstants;
public class SknInfoRequest implements ICacheRequest{
public class SknInfoRequest implements ICacheRequest {
private Integer productSkn;
... ...