RecallCommonService.java 7.31 KB
package com.yoho.search.recall.scene;

import com.yoho.core.redis.cluster.operations.serializer.RedisKeyBuilder;
import com.yoho.search.base.utils.CollectionUtils;
import com.yoho.search.base.utils.ISearchConstants;
import com.yoho.search.base.utils.Transfer;
import com.yoho.search.common.cache.impls.SearchRedis;
import com.yoho.search.core.es.model.SearchParam;
import com.yoho.search.core.es.model.SearchResult;
import com.yoho.search.recall.scene.constants.RecallConstants;
import com.yoho.search.recall.scene.helper.RecallResponseHelper;
import com.yoho.search.recall.scene.models.RecallRequest;
import com.yoho.search.recall.scene.models.RecallRequestResponse;
import com.yoho.search.recall.scene.models.RecallResponse;
import com.yoho.search.service.base.SearchCommonService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Component
public class RecallCommonService {

    @Autowired
    private SearchRedis searchRedis;
    @Autowired
    private SearchCommonService searchCommonService;

    /**
     * 批量召回入口
     *
     * @param requests
     * @return
     */
    public List<RecallRequestResponse> batchRecallAndCache(final List<RecallRequest> requests) {

        //1、先从缓存中获取数据,并且构建返回结果对象
        final List<RecallRequestResponse> results = this.queryResultFromCache(requests);

        //2、构造未命中缓存的请求-最多透传x个
        final List<RecallRequest> notCachedRequests = this.buildNotCachedRequests(results, RecallConstants.MAX_ES_QUERY_PER_COUNT);

        //3、如果remainRequests为空,则说明全部命中了缓存,直接返回即可
        if (notCachedRequests.isEmpty()) {
            return results;
        }

        //4、处理剩余请求
        List<RecallRequestResponse> notCachedResults = this.queryNotCachedResult(notCachedRequests);

        //5、将查出来对象加入缓存
        this.addResultsToCache(notCachedResults);

        //6、填充results
        this.fillResults(results,notCachedResults);

        return results;
    }

    /**
     * 从缓存中批量获取
     *
     * @param requests
     * @return
     */
    private List<RecallRequestResponse> queryResultFromCache(final List<RecallRequest> requests) {
        List<RecallRequestResponse> results = new ArrayList<>(requests.size());
        List<RedisKeyBuilder> redisKeyBuilders = new ArrayList<>();
        for (RecallRequest req : requests) {
            redisKeyBuilders.add(req.redisKeyBuilder());
        }
        List<String> cacheValues = searchRedis.searchValueOperations.multiGet(redisKeyBuilders);
        for (int i = 0; i < requests.size(); i++) {
            RecallRequest request = requests.get(i);
            String cacheValue = cacheValues.get(i);
            RecallResponse response = RecallResponseHelper.buildResonse(cacheValue);
            results.add(new RecallRequestResponse(request, response));
        }
        return results;
    }

    /**
     * 获取未命中缓存的请求
     * @param requestResponseResults
     * @return
     */
    private List<RecallRequest> buildNotCachedRequests(List<RecallRequestResponse> requestResponseResults,int maxCount){
        List<RecallRequest> remainRequests = new ArrayList<>();
        for (RecallRequestResponse requestResponse : requestResponseResults) {
            if (requestResponse.getResponse() == null) {
                remainRequests.add(requestResponse.getRequest());
            }
            if(remainRequests.size()>=maxCount){
                break;
            }
        }
        return remainRequests;
    }

    /**
     * 查询命中缓存的请求
     * @param notCachedRequests
     * @return
     */
    private List<RecallRequestResponse> queryNotCachedResult(List<RecallRequest> notCachedRequests) {
        //1、构造请求参数
        List<SearchParam> searchParams = new ArrayList<>();
        for (RecallRequest request : notCachedRequests) {
            searchParams.add(request.searchParam());
        }
        //2、执行搜索
        List<SearchResult> searchResults = searchCommonService.doMutiSearch(ISearchConstants.INDEX_NAME_PRODUCT_INDEX, searchParams);
        //3、构造返回结果
        List<RecallRequestResponse> results = new ArrayList<>();
        for (int i = 0; i < notCachedRequests.size(); i++) {
            RecallRequest request = notCachedRequests.get(i);
            SearchResult searchResult = searchResults.get(i);
            RecallResponse response = RecallResponseHelper.buildResonse(searchResult);
            results.add(new RecallRequestResponse(request,response));
        }
        return results;
    }


    /**
     * 将请求结果加入缓存
     * @param notCachedResults
     */
    private void addResultsToCache(List<RecallRequestResponse> notCachedResults){
        //1、按缓存时间分组
        Map<Integer,List<RecallRequestResponse>> groupMap = CollectionUtils.toListMap(notCachedResults, new Transfer<RecallRequestResponse, Integer>() {
            @Override
            public Integer transfer(RecallRequestResponse recallRequestResponse) {
                return recallRequestResponse.getRequest().cacheTimeInSecond();
            }
        });
        //2、按缓存时间大小直接加入缓存
        for (Map.Entry<Integer,List<RecallRequestResponse>> entry: groupMap.entrySet()) {
            this.addRequestResponseToCache(entry.getValue(),entry.getKey());
        }
    }

    private void addRequestResponseToCache(List<RecallRequestResponse> requestResponseList,int cacheTimeInSecond){
        //1、构造缓存结果
        Map<RedisKeyBuilder, String> toCacheResults = new HashMap<>();
        for (RecallRequestResponse requestResponse :requestResponseList) {
            RecallRequest request = requestResponse.getRequest();
            RecallResponse response = requestResponse.getResponse();
            toCacheResults.put(request.redisKeyBuilder(), RecallResponseHelper.serializerToString(response));
        }
        //2、加入缓存
        searchRedis.searchRedisTemplate.mset(toCacheResults, cacheTimeInSecond);
    }

    /**
     * 填充返回结果
     * @param results
     * @param remainRequestResults
     */
    private void fillResults(List<RecallRequestResponse> results,List<RecallRequestResponse> remainRequestResults){
        Map<String,RecallRequestResponse> remainRequestResponseMap = CollectionUtils.toMap(remainRequestResults, new Transfer<RecallRequestResponse,String>(){
            @Override
            public String transfer(RecallRequestResponse requestResponse) {
                return requestResponse.getRequest().redisKeyBuilder().getKey();
            }
        });
        for (RecallRequestResponse result: results) {
            if(result.getResponse()!=null){
                continue;
            }
            RecallRequestResponse requestResponse = remainRequestResponseMap.get(result.getRequest().redisKeyBuilder().getKey());
            if(requestResponse==null){
                continue;
            }
            result.setResponse(requestResponse.getResponse());
        }
    }

}