|
|
package com.yoho.search.consumer.job;
|
|
|
|
|
|
import com.yoho.search.consumer.suggests.common.RetryBusinessFlowExecutor;
|
|
|
import com.yoho.search.consumer.suggests.common.SuggestionCache;
|
|
|
import com.yoho.search.consumer.suggests.common.SuggestionConstants;
|
|
|
import com.yoho.search.consumer.suggests.discover.AbstractSuggestionDiscoverer;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collections;
|
|
|
import java.util.Comparator;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Optional;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
import org.springframework.beans.BeansException;
|
...
|
...
|
@@ -13,63 +17,65 @@ import org.springframework.context.ApplicationContextAware; |
|
|
import org.springframework.scheduling.annotation.Scheduled;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
import java.util.*;
|
|
|
import java.util.stream.Collectors;
|
|
|
import com.yoho.search.consumer.suggests.common.RetryBusinessFlowExecutor;
|
|
|
import com.yoho.search.consumer.suggests.common.SuggestionCache;
|
|
|
import com.yoho.search.consumer.suggests.common.SuggestionConstants;
|
|
|
import com.yoho.search.consumer.suggests.discover.AbstractSuggestionDiscoverer;
|
|
|
|
|
|
@Component
|
|
|
public class SuggestionDiscoveryJob implements ApplicationContextAware {
|
|
|
|
|
|
private static final Logger logger = LoggerFactory.getLogger("FLOW_EXECUTOR");
|
|
|
|
|
|
private List<RetryBusinessFlowExecutor> flowExecutorList = new ArrayList<>();
|
|
|
private static final Logger logger = LoggerFactory.getLogger("FLOW_EXECUTOR");
|
|
|
|
|
|
@Autowired
|
|
|
private SuggestionCache suggestionCache;
|
|
|
private List<RetryBusinessFlowExecutor> flowExecutorList = new ArrayList<>();
|
|
|
|
|
|
@Override
|
|
|
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
|
|
|
Map<String, AbstractSuggestionDiscoverer> beanMap = applicationContext.getBeansOfType(AbstractSuggestionDiscoverer.class);
|
|
|
if (beanMap == null || beanMap.isEmpty()) {
|
|
|
logger.warn("There is no suggestion discoverer defined.");
|
|
|
return;
|
|
|
}
|
|
|
@Autowired
|
|
|
private SuggestionCache suggestionCache;
|
|
|
|
|
|
flowExecutorList = beanMap.values().stream()
|
|
|
.sorted(Comparator.comparingInt(discoverer -> discoverer.getKeywordType().getSortValue()))
|
|
|
.map(counter -> new RetryBusinessFlowExecutor(counter, SuggestionConstants.SUGGESTION_DISCOVER_BATCH_MAX_THREAD_SIZE, SuggestionConstants.SUGGESTION_DISCOVER_BATCH_LIMIT))
|
|
|
.collect(Collectors.toList());
|
|
|
}
|
|
|
@Override
|
|
|
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
|
|
|
Map<String, AbstractSuggestionDiscoverer> beanMap = applicationContext.getBeansOfType(AbstractSuggestionDiscoverer.class);
|
|
|
if (beanMap == null || beanMap.isEmpty()) {
|
|
|
logger.warn("There is no suggestion discoverer defined.");
|
|
|
return;
|
|
|
}
|
|
|
flowExecutorList = beanMap.values().stream()
|
|
|
.sorted((discoverer1, discoverer2) -> discoverer1.getKeywordType().compare(discoverer2.getKeywordType()))
|
|
|
.map(counter -> new RetryBusinessFlowExecutor(counter, SuggestionConstants.SUGGESTION_DISCOVER_BATCH_MAX_THREAD_SIZE,
|
|
|
SuggestionConstants.SUGGESTION_DISCOVER_BATCH_LIMIT))
|
|
|
.collect(Collectors.toList());
|
|
|
}
|
|
|
|
|
|
@Scheduled(cron = "0 30 1 * * ?")
|
|
|
public void execute() {
|
|
|
try {
|
|
|
long begin = System.currentTimeMillis();
|
|
|
logger.info("SuggestionDiscoveryJob execute start----[begin={}]", begin);
|
|
|
suggestionCache.init();
|
|
|
boolean result = true;
|
|
|
for (RetryBusinessFlowExecutor executor : flowExecutorList) {
|
|
|
boolean tempResult = executor.execute();
|
|
|
if (!tempResult) {
|
|
|
logger.warn("SuggestionDiscoveryJob execute has failure----[bean={}]", executor.getFlowName());
|
|
|
}
|
|
|
result = result && tempResult;
|
|
|
}
|
|
|
logger.info("SuggestionDiscoveryJob execute end----[cost={}][result={}]", (System.currentTimeMillis() - begin), result);
|
|
|
} finally {
|
|
|
suggestionCache.cleanup();
|
|
|
}
|
|
|
}
|
|
|
@Scheduled(cron = "0 30 1 * * ?")
|
|
|
public void execute() {
|
|
|
try {
|
|
|
long begin = System.currentTimeMillis();
|
|
|
logger.info("SuggestionDiscoveryJob execute start----[begin={}]", begin);
|
|
|
suggestionCache.init();
|
|
|
boolean result = true;
|
|
|
for (RetryBusinessFlowExecutor executor : flowExecutorList) {
|
|
|
boolean tempResult = executor.execute();
|
|
|
if (!tempResult) {
|
|
|
logger.warn("SuggestionDiscoveryJob execute has failure----[bean={}]", executor.getFlowName());
|
|
|
}
|
|
|
result = result && tempResult;
|
|
|
}
|
|
|
logger.info("SuggestionDiscoveryJob execute end----[cost={}][result={}]", (System.currentTimeMillis() - begin), result);
|
|
|
} finally {
|
|
|
suggestionCache.cleanup();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public void executeFlow(String flowName) {
|
|
|
try {
|
|
|
long begin = System.currentTimeMillis();
|
|
|
logger.info("SuggestionDiscoveryJob executeFlow start----[begin={}]", begin);
|
|
|
suggestionCache.init();
|
|
|
Optional<RetryBusinessFlowExecutor> executor = flowExecutorList.stream().filter(item -> flowName.equals(item.getFlowName())).findAny();
|
|
|
executor.ifPresent(RetryBusinessFlowExecutor::execute);
|
|
|
logger.info("SuggestionDiscoveryJob executeFlow end----[cost={}]", (System.currentTimeMillis() - begin));
|
|
|
} finally {
|
|
|
suggestionCache.cleanup();
|
|
|
}
|
|
|
}
|
|
|
public void executeFlow(String flowName) {
|
|
|
try {
|
|
|
long begin = System.currentTimeMillis();
|
|
|
logger.info("SuggestionDiscoveryJob executeFlow start----[begin={}]", begin);
|
|
|
suggestionCache.init();
|
|
|
Optional<RetryBusinessFlowExecutor> executor = flowExecutorList.stream().filter(item -> flowName.equals(item.getFlowName())).findAny();
|
|
|
executor.ifPresent(RetryBusinessFlowExecutor::execute);
|
|
|
logger.info("SuggestionDiscoveryJob executeFlow end----[cost={}]", (System.currentTimeMillis() - begin));
|
|
|
} finally {
|
|
|
suggestionCache.cleanup();
|
|
|
}
|
|
|
}
|
|
|
} |
...
|
...
|
|