SuggestionJob.java 5.78 KB
package com.yoho.search.consumer.job;

import com.yoho.search.consumer.common.ConsumerConfiger;
import com.yoho.search.consumer.suggests.common.RetryBusinessFlowExecutor;
import com.yoho.search.consumer.suggests.common.SuggestionCache;
import com.yoho.search.consumer.suggests.counter.AbstractSuggestionCounter;
import com.yoho.search.consumer.suggests.discover.AbstractSuggestionDiscoverer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

/**
 * Created by ginozhang on 2017/3/14.
 */
@Component
public class SuggestionJob implements ApplicationContextAware {

    private static final Logger logger = LoggerFactory.getLogger("FLOW_EXECUTOR");

    private List<RetryBusinessFlowExecutor> discoveryFlowExecutorList = new ArrayList<>();

    private List<RetryBusinessFlowExecutor> counterFlowExecutorList = new ArrayList<>();

    @Autowired
    private SuggestionCache suggestionCache;
    @Autowired
    private IndexRebuildJob indexRebuildJob;
    @Autowired
    private ConsumerConfiger configer;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        Map<String, AbstractSuggestionDiscoverer> beanMap = applicationContext.getBeansOfType(AbstractSuggestionDiscoverer.class);
        Assert.isTrue(beanMap != null && !beanMap.isEmpty(), "There is no suggestion discoverer defined.");
        discoveryFlowExecutorList = beanMap.values().stream()
                .sorted((discoverer1, discoverer2) -> discoverer1.getKeywordType().compare(discoverer2.getKeywordType()))
                .map(counter -> new RetryBusinessFlowExecutor(counter, configer.getSuggestionDiscoverBatchMaxThreadSize(),configer.getSuggestionDiscoverBatchLimit()))
                .collect(Collectors.toList());

        Map<String, AbstractSuggestionCounter> countBeanMap = applicationContext.getBeansOfType(AbstractSuggestionCounter.class);
        Assert.isTrue(countBeanMap != null && !countBeanMap.isEmpty(), "There is no suggestion counter defined.");
        counterFlowExecutorList = countBeanMap.values().stream()
                .map(counter -> new RetryBusinessFlowExecutor(counter,configer.getSuggestionCountBatchMaxThreadSize(),configer.getSuggestionCountBatchLimit()))
                .collect(Collectors.toList());
    }

    @Scheduled(cron = "0 30 3 * * ?")
    public void executeAll(){
        executeDiscovery();
        safeSleep();
        executeCounter();
        safeSleep();
        indexRebuildJob.rebuildSuggestIndex();
    }

    private void safeSleep() {
        try {
            Thread.sleep(10000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void executeDiscovery() {
        try {
            long begin = System.currentTimeMillis();
            logger.info("SuggestionJob.executeDiscovery start----[begin={}]", begin);
            suggestionCache.init();
            boolean result = true;
            for (RetryBusinessFlowExecutor executor : discoveryFlowExecutorList) {
                boolean tempResult = executor.execute();
                if (!tempResult) {
                    logger.warn("SuggestionJob.executeDiscovery has failure----[bean={}]", executor.getFlowName());
                }
                result = result && tempResult;
            }
            logger.info("SuggestionJob.executeDiscovery end----[cost={}][result={}]", (System.currentTimeMillis() - begin), result);
        } finally {
            suggestionCache.cleanup();
        }
    }

    public void executeDiscoveryForSingleFlow(String flowName) {
        try {
            long begin = System.currentTimeMillis();
            logger.info("SuggestionJob.executeDiscoveryForSingleFlow start----[begin={}]", begin);
            suggestionCache.init();
            Optional<RetryBusinessFlowExecutor> executor = discoveryFlowExecutorList.stream().filter(item -> flowName.equals(item.getFlowName())).findAny();
            executor.ifPresent(RetryBusinessFlowExecutor::execute);
            logger.info("SuggestionJob.executeDiscoveryForSingleFlow end----[cost={}]", (System.currentTimeMillis() - begin));
        } finally {
            suggestionCache.cleanup();
        }
    }

    public void executeCounter() {
        long begin = System.currentTimeMillis();
        logger.info("SuggestionJob.executeCounter start----[begin={}]", begin);
        boolean result = true;
        for (RetryBusinessFlowExecutor executor : counterFlowExecutorList) {
            boolean tempResult = executor.execute();
            if (!tempResult) {
                logger.warn("SuggestionJob.executeCounter has failure----[bean={}]", executor.getFlowName());
            }
            result = result && tempResult;
        }
        logger.info("SuggestionJob.executeCounter end----[cost={}][result={}]", (System.currentTimeMillis() - begin), result);
    }

    public void executeCounterForSingleFlow(String flowName) {
        long begin = System.currentTimeMillis();
        logger.info("SuggestionJob.executeCounter start----[begin={}]", begin);
        Optional<RetryBusinessFlowExecutor> executor = counterFlowExecutorList.stream().filter(item -> flowName.equals(item.getFlowName())).findAny();
        executor.ifPresent(RetryBusinessFlowExecutor::execute);
        logger.info("SuggestionJob.executeCounter end----[cost={}]", (System.currentTimeMillis() - begin));
    }

}