IncrementCrawlerFlow.java 6.42 KB
package com.yoho.search.spider.increment;

import com.yoho.core.redis.YHZSetOperations;
import com.yoho.search.base.utils.RedisKeys;
import com.yoho.search.consumer.index.common.AnalyzerHelper;
import com.yoho.search.consumer.service.base.SpiderContentService;
import com.yoho.search.consumer.suggests.common.RetryBusinessFlow;
import com.yoho.search.consumer.suggests.common.SuggestionConstants;
import com.yoho.search.dal.model.SpiderContent;
import com.yoho.search.spider.common.BaikeBO;
import com.yoho.search.spider.common.SpiderBasedHttpRequest;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;

import javax.annotation.Resource;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
 * Created by ginozhang on 2017/3/1.
 */
@Component
public class IncrementCrawlerFlow implements RetryBusinessFlow {

    private static final Logger logger = LoggerFactory.getLogger("FLOW_EXECUTOR");

    private static final int KEYWORD_COUNT = 100;

    @Resource(name = "yhNoSyncZSetOperations")
    private YHZSetOperations<String, String> yhNoSyncZSetOperations;

    @Autowired
    private SpiderContentService spiderContentService;

    @Autowired
    private AnalyzerHelper analyzerHelper;

    @Autowired
    private BlackKeywordsMgr blackKeywordsMgr;

    @Autowired
    private IncrementCrawlerService incrementCrawlerService;

    private List<String> validKeywordList = null;

    @Override
    public String flowName() {
        return this.getClass().getSimpleName();
    }

    @Override
    public void init() {
        Set<String> keywordSet = new HashSet<>(1000);
        Set<ZSetOperations.TypedTuple<String>> redisResults = yhNoSyncZSetOperations.reverseRangeWithScores(RedisKeys.getRedisKey4Yesterday(RedisKeys.YOHO_SEARCH_KEYWORDS_EMPTY), 0, KEYWORD_COUNT);
        for (ZSetOperations.TypedTuple<String> typedTuple : redisResults) {
            keywordSet.add(typedTuple.getValue());
        }

        redisResults = yhNoSyncZSetOperations.reverseRangeWithScores(RedisKeys.getRedisKey4Yesterday(RedisKeys.YOHO_SEARCH_KEYWORDS_LESS), 0, KEYWORD_COUNT);
        for (ZSetOperations.TypedTuple<String> typedTuple : redisResults) {
            keywordSet.add(typedTuple.getValue());
        }

        logger.info("[func=IncrementCrawlerFlow.init][keywordSetSize={}]", keywordSet.size());
        if (keywordSet.isEmpty()) {
            return;
        }

        Set<String> existSubjects = spiderContentService.getAllSubjects();
        keywordSet = keywordSet.stream().filter(keyword -> !existSubjects.contains(keyword.toLowerCase().trim())).collect(Collectors.toSet());
        logger.info("[func=IncrementCrawlerFlow.init][keywordSetSizeRemovedExist={}]", keywordSet.size());

        Set<String> invalidKeywords = blackKeywordsMgr.getBlackKeywords();
        if (CollectionUtils.isNotEmpty(invalidKeywords)) {
            keywordSet = keywordSet.stream().filter(keyword -> !invalidKeywords.contains(keyword.toLowerCase().trim())).collect(Collectors.toSet());
        }

        logger.info("[func=IncrementCrawlerFlow.init][keywordSetSizeRemovedInvalid={}]", keywordSet.size());
        this.validKeywordList = keywordSet.parallelStream().filter(keyword -> validKeyword(keyword)).collect(Collectors.toList());
        logger.info("[func=IncrementCrawlerFlow.init][validKeywordListSize={}]", validKeywordList != null ? validKeywordList.size() : 0);
    }

    private boolean validKeyword(String keyword) {
        // 太短的字符不要
        if (StringUtils.isEmpty(keyword) || keyword.length() < 2) {
            return false;
        }

        // 在忽略列表的不要
        if (SuggestionConstants.IGNORE_KEYWORDS.contains(keyword)) {
            return false;
        }

        // 分词后超过3个Term的不要
        if (analyzerHelper.getTokens(keyword, "ik_smart").size() > 3) {
            return false;
        }

        return true;
    }

    @Override
    public int getTotalCount() {
        return validKeywordList != null ? validKeywordList.size() : 0;
    }

    @Override
    public boolean doBusiness(int pageNo, int batchSize) {
        Assert.notNull(this.validKeywordList);
        int start = (pageNo - 1) * batchSize;
        int end = start + batchSize - 1;
        if (end > validKeywordList.size()) {
            end = validKeywordList.size();
        }

        List<String> subListKeywords = this.validKeywordList.subList(start, end);
        if (CollectionUtils.isEmpty(subListKeywords)) {
            return true;
        }

        List<String> failedKeywords = new ArrayList<>();
        List<BaikeBO> baikeBOList = new ArrayList<>();
        BaikeBO tempBaikeBO;
        for (String keyword : subListKeywords) {
            if ((tempBaikeBO = incrementCrawlerService.doCrawle(keyword)) != null) {
                tempBaikeBO.setTitle(keyword);
                baikeBOList.add(tempBaikeBO);
            } else {
                failedKeywords.add(keyword.toLowerCase().trim());
            }
        }

        logger.info("[func=IncrementCrawlerFlow.doBusiness][baikeBOListSize={}][failedKeywords={}]", baikeBOList.size(), failedKeywords);
        if (CollectionUtils.isNotEmpty(baikeBOList)) {
            List<SpiderContent> spiderContentList = baikeBOList.stream().map(baikeBO -> baikeBO.toSpiderContent()).collect(Collectors.toList());
            spiderContentService.insertBatch(spiderContentList);
        }

        blackKeywordsMgr.addBlackKeywords(failedKeywords);
        return true;
    }

    @Override
    public void finish(boolean doBusinessResult, Exception exception) {
        this.validKeywordList = null;
        logger.info("[func=IncrementCrawlerFlow.finish][doBusinessResult=" + doBusinessResult + "]", exception);
    }

    public static void main(String[] args) throws UnsupportedEncodingException {
        final String keyword = "华伦天奴";
        String url = "http://baike.baidu.com/item/" + URLEncoder.encode(keyword, "UTF-8");
        logger.info("[func=IncrementCrawlerFlow][keyword={}][url={}]", keyword, url);
        System.out.println((new SpiderBasedHttpRequest()).get(url));
    }
}