AbstractCacheBean.java
8.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
package com.yoho.search.cache.beans;
import com.yoho.core.redis.cluster.operations.serializer.RedisKeyBuilder;
import com.yoho.search.base.utils.SearchCollectionUtils;
import com.yoho.search.base.utils.Transfer;
import com.yoho.search.cache.impls.EhCache;
import com.yoho.search.cache.impls.SearchRedis;
import com.yoho.search.cache.model.AbstractCacheRequestResponse;
import com.yoho.search.cache.model.ICacheRequest;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.*;
public abstract class AbstractCacheBean<Request extends ICacheRequest, Response, RequestResponse extends AbstractCacheRequestResponse<Request, Response>> {
private static final Logger RECALL_NEW_LOGGER = LoggerFactory.getLogger("RECALL");
@Autowired
private EhCache ehCache;
@Autowired
private SearchRedis searchRedis;
/**
* 批量处理请求,以及添加缓存
*
* @param requestResponse
* @return
*/
public void bacthFillResponseWithCache(final RequestResponse requestResponse,boolean ignoreQueryCache) {
List<RequestResponse> requestResponses = Arrays.asList(requestResponse);
this.bacthFillResponseWithCache(requestResponses,1,ignoreQueryCache);
}
/**
* 批量处理请求,以及添加缓存
*
* @param requestResponses
* @return
*/
public void bacthFillResponseWithCache(final List<RequestResponse> requestResponses,int maxMissRequestCount) {
this.bacthFillResponseWithCache(requestResponses,maxMissRequestCount,false);
}
/**
* 批量处理请求,以及添加缓存
*
* @param requestResponses
* @return
*/
public void bacthFillResponseWithCache(final List<RequestResponse> requestResponses,int maxMissRequestCount,boolean ignoreQueryCache) {
//1、批量从缓存中获取
if(!ignoreQueryCache){
this.batchFillResponseFromCache(requestResponses);
}
//2、获取未命中缓存的请求
List<RequestResponse> missCacheRequests = this.filterMissCacheRequests(requestResponses, maxMissRequestCount);
//3、如果remainRequests为空,则说明全部命中了缓存,直接返回即可
if (missCacheRequests.isEmpty()) {
return;
}
//4、处理请求
Map<Request, Response> requestResponseMap = this.queryMissCacheRequestResults(missCacheRequests);
//5、填充查询结果
this.batchFillResponseWithQueryResults(requestResponses, requestResponseMap);
//6、将CacheRequestResponse中需要缓存的key加入缓存
this.batchAddResponseToCache(requestResponses);
}
/**
* 从缓存中批量填充response
*
* @param requestResponses
*/
private void batchFillResponseFromCache(final List<RequestResponse> requestResponses) {
try {
Collection<RedisKeyBuilder> keys = new ArrayList<>();
for (RequestResponse requestResponse : requestResponses) {
RedisKeyBuilder redisKeyBuilder = requestResponse.getRequest().redisKeyBuilder();
keys.add(redisKeyBuilder);
}
List<String> cachedValues = this.mutiGetFromCache(keys);
for (int i = 0; i < requestResponses.size(); i++) {
RequestResponse requestResponse = requestResponses.get(i);
String cachedValue = cachedValues.get(i);
if (!StringUtils.isBlank(cachedValue)) {
Response response = requestResponse.getToResponseTransfer().transfer(cachedValue);
requestResponse.setResponse(response, false);
}
}
} catch (Exception e) {
RECALL_NEW_LOGGER.error(e.getMessage(), e);
}
}
/**
* 过滤出未命中缓存的结果
*
* @param requestResponses
* @return
*/
private List<RequestResponse> filterMissCacheRequests(List<RequestResponse> requestResponses, int maxMissRequestCount) {
List<RequestResponse> missCacheRequestResponse = new ArrayList<>();
for (RequestResponse requestResponse : requestResponses) {
if (requestResponse != null && requestResponse.getResponse() == null) {
missCacheRequestResponse.add(requestResponse);
}
if (missCacheRequestResponse.size() >= maxMissRequestCount) {
break;
}
}
return missCacheRequestResponse;
}
/**
* 是否使用本地缓存
* @return
*/
protected abstract boolean useEhCache();
/**
* 处理未命中缓存的请求-子类实现
*
* @param missCacheRequests
* @return
*/
protected abstract Map<Request, Response> queryMissCacheRequestResults(List<RequestResponse> missCacheRequests);
/**
* 使用查询结果填充请求
*
* @param requestResponses
* @param requestResponseMap
*/
private void batchFillResponseWithQueryResults(List<RequestResponse> requestResponses, Map<Request, Response> requestResponseMap) {
//1、先转成map
Map<String, Response> requestKeyResponseMap = new HashMap<>();
for (Map.Entry<Request, Response> requestResponseEntry : requestResponseMap.entrySet()) {
Request request = requestResponseEntry.getKey();
Response response = requestResponseEntry.getValue();
requestKeyResponseMap.put(request.redisKeyBuilder().getKey(), response);
}
//2、填充结果
for (RequestResponse requestResponse : requestResponses) {
if (requestResponse.getResponse() != null) {
continue;
}
RedisKeyBuilder redisKeyBuilder = requestResponse.getRequest().redisKeyBuilder();
Response response = requestKeyResponseMap.get(redisKeyBuilder.getKey());
if (response != null) {
requestResponse.setResponse(response, true);
}
}
}
/**
* 将未缓存的response添加至缓存
*
* @param requestResponses
*/
protected void batchAddResponseToCache(List<RequestResponse> requestResponses) {
//1、按缓存时间分组
Map<Integer, List<RequestResponse>> groupMap = SearchCollectionUtils.toListMap(requestResponses, new Transfer<RequestResponse, Integer>() {
@Override
public Integer transfer(RequestResponse requestResponse) {
Request request = requestResponse.getRequest();
if(request==null){
return 0;
}
return request.cacheTimeInMinute();
}
});
//2、按缓存时间大小直接加入缓存
for (Map.Entry<Integer, List<RequestResponse>> entry : groupMap.entrySet()) {
this.batchAddResponseToCache(entry.getValue(), entry.getKey());
}
}
private void batchAddResponseToCache(List<RequestResponse> requestResponses,int expiredTimeInMinute) {
try {
Map<RedisKeyBuilder, String> toCacheMap = new HashMap<>();
for (RequestResponse requestResponse : requestResponses) {
if (!requestResponse.isNeedRecache()) {
continue;
}
Request request = requestResponse.getRequest();
Response response = requestResponse.getResponse();
if (request==null || response == null) {
continue;
}
RedisKeyBuilder redisKeyBuilder = request.redisKeyBuilder();
if(redisKeyBuilder == null){
continue;
}
Transfer<Response, String> fromResponseTransfer = requestResponse.getFromResponseTransfer();
if(fromResponseTransfer==null){
continue;
}
String cacheValue = fromResponseTransfer.transfer(response);
toCacheMap.put(redisKeyBuilder, cacheValue);
}
if (toCacheMap.isEmpty()) {
return;
}
this.batchAddToCache(toCacheMap, expiredTimeInMinute);
} catch (Exception e) {
RECALL_NEW_LOGGER.error(e.getMessage(), e);
}
}
private List<String> mutiGetFromCache(Collection<RedisKeyBuilder> keys) {
if (useEhCache()){
return ehCache.mutiGet(keys);
} else {
return searchRedis.mutiGet(keys);
}
}
private void batchAddToCache(Map<RedisKeyBuilder, String> toCacheMap,int expiredTimeInMinute) {
if (useEhCache()) {
ehCache.mutiSet(toCacheMap, expiredTimeInMinute);
} else {
searchRedis.mutiSet(toCacheMap,expiredTimeInMinute);
}
}
}