SuggestionJob.java
5.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package com.yoho.search.consumer.job;
import com.yoho.search.consumer.common.ConsumerConfiger;
import com.yoho.search.consumer.suggests.common.RetryBusinessFlowExecutor;
import com.yoho.search.consumer.suggests.common.SuggestionCache;
import com.yoho.search.consumer.suggests.counter.AbstractSuggestionCounter;
import com.yoho.search.consumer.suggests.discover.AbstractSuggestionDiscoverer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* Created by ginozhang on 2017/3/14.
*/
@Component
public class SuggestionJob implements ApplicationContextAware {
private static final Logger logger = LoggerFactory.getLogger("FLOW_EXECUTOR");
private List<RetryBusinessFlowExecutor> discoveryFlowExecutorList = new ArrayList<>();
private List<RetryBusinessFlowExecutor> counterFlowExecutorList = new ArrayList<>();
@Autowired
private SuggestionCache suggestionCache;
@Autowired
private IndexRebuildJob indexRebuildJob;
@Autowired
private ConsumerConfiger configer;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Map<String, AbstractSuggestionDiscoverer> beanMap = applicationContext.getBeansOfType(AbstractSuggestionDiscoverer.class);
Assert.isTrue(beanMap != null && !beanMap.isEmpty(), "There is no suggestion discoverer defined.");
discoveryFlowExecutorList = beanMap.values().stream()
.sorted((discoverer1, discoverer2) -> discoverer1.getKeywordType().compare(discoverer2.getKeywordType()))
.map(counter -> new RetryBusinessFlowExecutor(counter, configer.getSuggestionDiscoverBatchMaxThreadSize(),configer.getSuggestionDiscoverBatchLimit()))
.collect(Collectors.toList());
Map<String, AbstractSuggestionCounter> countBeanMap = applicationContext.getBeansOfType(AbstractSuggestionCounter.class);
Assert.isTrue(countBeanMap != null && !countBeanMap.isEmpty(), "There is no suggestion counter defined.");
counterFlowExecutorList = countBeanMap.values().stream()
.map(counter -> new RetryBusinessFlowExecutor(counter,configer.getSuggestionCountBatchMaxThreadSize(),configer.getSuggestionCountBatchLimit()))
.collect(Collectors.toList());
}
@Scheduled(cron = "0 30 3 * * ?")
public void executeAll(){
executeDiscovery();
safeSleep();
executeCounter();
safeSleep();
indexRebuildJob.rebuildSuggestIndex();
}
private void safeSleep() {
try {
Thread.sleep(10000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void executeDiscovery() {
try {
long begin = System.currentTimeMillis();
logger.info("SuggestionJob.executeDiscovery start----[begin={}]", begin);
suggestionCache.init();
boolean result = true;
for (RetryBusinessFlowExecutor executor : discoveryFlowExecutorList) {
boolean tempResult = executor.execute();
if (!tempResult) {
logger.warn("SuggestionJob.executeDiscovery has failure----[bean={}]", executor.getFlowName());
}
result = result && tempResult;
}
logger.info("SuggestionJob.executeDiscovery end----[cost={}][result={}]", (System.currentTimeMillis() - begin), result);
} finally {
suggestionCache.cleanup();
}
}
public void executeDiscoveryForSingleFlow(String flowName) {
try {
long begin = System.currentTimeMillis();
logger.info("SuggestionJob.executeDiscoveryForSingleFlow start----[begin={}]", begin);
suggestionCache.init();
Optional<RetryBusinessFlowExecutor> executor = discoveryFlowExecutorList.stream().filter(item -> flowName.equals(item.getFlowName())).findAny();
executor.ifPresent(RetryBusinessFlowExecutor::execute);
logger.info("SuggestionJob.executeDiscoveryForSingleFlow end----[cost={}]", (System.currentTimeMillis() - begin));
} finally {
suggestionCache.cleanup();
}
}
public void executeCounter() {
long begin = System.currentTimeMillis();
logger.info("SuggestionJob.executeCounter start----[begin={}]", begin);
boolean result = true;
for (RetryBusinessFlowExecutor executor : counterFlowExecutorList) {
boolean tempResult = executor.execute();
if (!tempResult) {
logger.warn("SuggestionJob.executeCounter has failure----[bean={}]", executor.getFlowName());
}
result = result && tempResult;
}
logger.info("SuggestionJob.executeCounter end----[cost={}][result={}]", (System.currentTimeMillis() - begin), result);
}
public void executeCounterForSingleFlow(String flowName) {
long begin = System.currentTimeMillis();
logger.info("SuggestionJob.executeCounter start----[begin={}]", begin);
Optional<RetryBusinessFlowExecutor> executor = counterFlowExecutorList.stream().filter(item -> flowName.equals(item.getFlowName())).findAny();
executor.ifPresent(RetryBusinessFlowExecutor::execute);
logger.info("SuggestionJob.executeCounter end----[cost={}]", (System.currentTimeMillis() - begin));
}
}