RecallCommonService.java
7.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
package com.yoho.search.recall.scene;
import com.yoho.core.redis.cluster.operations.serializer.RedisKeyBuilder;
import com.yoho.search.base.utils.CollectionUtils;
import com.yoho.search.base.utils.ISearchConstants;
import com.yoho.search.base.utils.Transfer;
import com.yoho.search.common.cache.impls.SearchRedis;
import com.yoho.search.core.es.model.SearchParam;
import com.yoho.search.core.es.model.SearchResult;
import com.yoho.search.recall.scene.constants.RecallConstants;
import com.yoho.search.recall.scene.helper.RecallResponseHelper;
import com.yoho.search.recall.scene.models.RecallRequest;
import com.yoho.search.recall.scene.models.RecallRequestResponse;
import com.yoho.search.recall.scene.models.RecallResponse;
import com.yoho.search.service.base.SearchCommonService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Component
public class RecallCommonService {
@Autowired
private SearchRedis searchRedis;
@Autowired
private SearchCommonService searchCommonService;
/**
* 批量召回入口
*
* @param requests
* @return
*/
public List<RecallRequestResponse> batchRecallAndCache(final List<RecallRequest> requests) {
//1、先从缓存中获取数据,并且构建返回结果对象
final List<RecallRequestResponse> results = this.queryResultFromCache(requests);
//2、构造未命中缓存的请求-最多透传x个
final List<RecallRequest> notCachedRequests = this.buildNotCachedRequests(results, RecallConstants.MAX_ES_QUERY_PER_COUNT);
//3、如果remainRequests为空,则说明全部命中了缓存,直接返回即可
if (notCachedRequests.isEmpty()) {
return results;
}
//4、处理剩余请求
List<RecallRequestResponse> notCachedResults = this.queryNotCachedResult(notCachedRequests);
//5、将查出来对象加入缓存
this.addResultsToCache(notCachedResults);
//6、填充results
this.fillResults(results,notCachedResults);
return results;
}
/**
* 从缓存中批量获取
*
* @param requests
* @return
*/
private List<RecallRequestResponse> queryResultFromCache(final List<RecallRequest> requests) {
List<RecallRequestResponse> results = new ArrayList<>(requests.size());
List<RedisKeyBuilder> redisKeyBuilders = new ArrayList<>();
for (RecallRequest req : requests) {
redisKeyBuilders.add(req.redisKeyBuilder());
}
List<String> cacheValues = searchRedis.searchValueOperations.multiGet(redisKeyBuilders);
for (int i = 0; i < requests.size(); i++) {
RecallRequest request = requests.get(i);
String cacheValue = cacheValues.get(i);
RecallResponse response = RecallResponseHelper.buildResonse(cacheValue);
results.add(new RecallRequestResponse(request, response));
}
return results;
}
/**
* 获取未命中缓存的请求
* @param requestResponseResults
* @return
*/
private List<RecallRequest> buildNotCachedRequests(List<RecallRequestResponse> requestResponseResults,int maxCount){
List<RecallRequest> remainRequests = new ArrayList<>();
for (RecallRequestResponse requestResponse : requestResponseResults) {
if (requestResponse.getResponse() == null) {
remainRequests.add(requestResponse.getRequest());
}
if(remainRequests.size()>=maxCount){
break;
}
}
return remainRequests;
}
/**
* 查询命中缓存的请求
* @param notCachedRequests
* @return
*/
private List<RecallRequestResponse> queryNotCachedResult(List<RecallRequest> notCachedRequests) {
//1、构造请求参数
List<SearchParam> searchParams = new ArrayList<>();
for (RecallRequest request : notCachedRequests) {
searchParams.add(request.searchParam());
}
//2、执行搜索
List<SearchResult> searchResults = searchCommonService.doMutiSearch(ISearchConstants.INDEX_NAME_PRODUCT_INDEX, searchParams);
//3、构造返回结果
List<RecallRequestResponse> results = new ArrayList<>();
for (int i = 0; i < notCachedRequests.size(); i++) {
RecallRequest request = notCachedRequests.get(i);
SearchResult searchResult = searchResults.get(i);
RecallResponse response = RecallResponseHelper.buildResonse(searchResult);
results.add(new RecallRequestResponse(request,response));
}
return results;
}
/**
* 将请求结果加入缓存
* @param notCachedResults
*/
private void addResultsToCache(List<RecallRequestResponse> notCachedResults){
//1、按缓存时间分组
Map<Integer,List<RecallRequestResponse>> groupMap = CollectionUtils.toListMap(notCachedResults, new Transfer<RecallRequestResponse, Integer>() {
@Override
public Integer transfer(RecallRequestResponse recallRequestResponse) {
return recallRequestResponse.getRequest().cacheTimeInSecond();
}
});
//2、按缓存时间大小直接加入缓存
for (Map.Entry<Integer,List<RecallRequestResponse>> entry: groupMap.entrySet()) {
this.addRequestResponseToCache(entry.getValue(),entry.getKey());
}
}
private void addRequestResponseToCache(List<RecallRequestResponse> requestResponseList,int cacheTimeInSecond){
//1、构造缓存结果
Map<RedisKeyBuilder, String> toCacheResults = new HashMap<>();
for (RecallRequestResponse requestResponse :requestResponseList) {
RecallRequest request = requestResponse.getRequest();
RecallResponse response = requestResponse.getResponse();
toCacheResults.put(request.redisKeyBuilder(), RecallResponseHelper.serializerToString(response));
}
//2、加入缓存
searchRedis.searchRedisTemplate.mset(toCacheResults, cacheTimeInSecond);
}
/**
* 填充返回结果
* @param results
* @param remainRequestResults
*/
private void fillResults(List<RecallRequestResponse> results,List<RecallRequestResponse> remainRequestResults){
Map<String,RecallRequestResponse> remainRequestResponseMap = CollectionUtils.toMap(remainRequestResults, new Transfer<RecallRequestResponse,String>(){
@Override
public String transfer(RecallRequestResponse requestResponse) {
return requestResponse.getRequest().redisKeyBuilder().getKey();
}
});
for (RecallRequestResponse result: results) {
if(result.getResponse()!=null){
continue;
}
RecallRequestResponse requestResponse = remainRequestResponseMap.get(result.getRequest().redisKeyBuilder().getKey());
if(requestResponse==null){
continue;
}
result.setResponse(requestResponse.getResponse());
}
}
}