Authored by Gino Zhang

consumer增加报告日志 用于定时查询关键信息

... ... @@ -13,9 +13,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.*;
import java.util.stream.Collectors;
/**
... ... @@ -26,10 +24,14 @@ public class SuggestConvertorFlow implements RetryBusinessFlow {
private static final Logger logger = LoggerFactory.getLogger("FLOW_EXECUTOR");
private static final Logger REPORT_LOGGER = LoggerFactory.getLogger("CONSUMER_REPORTER");
private volatile Set<String> existSourceSet = null;
private volatile YohoKeywordsBO yohoKeywordsBO = null;
private volatile Map<String, String> newConversionMap = new HashMap<>(100);
@Autowired
private SuggestConversionService suggestConversionService;
... ... @@ -46,6 +48,7 @@ public class SuggestConvertorFlow implements RetryBusinessFlow {
@Override
public void init() {
newConversionMap.clear();
existSourceSet = suggestConversionService.getAllSources();
logger.info("[func=SuggestConversionFlow.init][existSourceSetSize={}]", existSourceSet.size());
yohoKeywordsBO = suggestConvertorService.buildYohoKeywordBO();
... ... @@ -64,13 +67,14 @@ public class SuggestConvertorFlow implements RetryBusinessFlow {
return true;
}
List<SpiderContent> filteredContentList = spiderContentList.stream().filter(spiderContent -> spiderContent != null && !existSourceSet.contains(spiderContent.getSubject())).collect(Collectors.toList());
List<SpiderContent> filteredContentList = spiderContentList.stream().filter(spiderContent -> spiderContent != null && !existSourceSet.contains(spiderContent.getSubject().toLowerCase().trim())).collect(Collectors.toList());
logger.info("[func=SuggestConversionFlow.doBusiness][pageNo={}][spiderContentListSize={}][filteredContentListSize={}]", pageNo, spiderContentList.size(), filteredContentList.size());
if (CollectionUtils.isEmpty(filteredContentList)) {
return true;
}
// 获取每一个title与其关联的yoho关键词
Map<String, String> tempNewConversionMap = new HashMap<>(batchSize);
List<SuggestConversion> suggestConversionList = new ArrayList<>();
for (SpiderContent spiderContent : filteredContentList) {
String dest = suggestConvertorService.convert(spiderContent, yohoKeywordsBO);
... ... @@ -80,6 +84,7 @@ public class SuggestConvertorFlow implements RetryBusinessFlow {
suggestConversion.setDest(dest);
suggestConversion.setUpdateTime(DateUtil.getCurrentTimeSecond());
suggestConversionList.add(suggestConversion);
tempNewConversionMap.put(spiderContent.getSubject(), dest);
}
}
... ... @@ -88,6 +93,12 @@ public class SuggestConvertorFlow implements RetryBusinessFlow {
suggestConversionService.insertBatch(suggestConversionList);
}
if (!tempNewConversionMap.isEmpty()) {
synchronized (this) {
this.newConversionMap.putAll(tempNewConversionMap);
}
}
return true;
}
... ... @@ -96,5 +107,7 @@ public class SuggestConvertorFlow implements RetryBusinessFlow {
this.yohoKeywordsBO = null;
this.existSourceSet = null;
logger.info("[func=SuggestConversionFlow.finish][doBusinessResult=" + doBusinessResult + "]", exception);
REPORT_LOGGER.info("[key=IncrementConversionMap][incrementConversionMap={}]", newConversionMap);
newConversionMap.clear();
}
}
... ...
... ... @@ -35,6 +35,8 @@ public class IncrementCrawlerFlow implements RetryBusinessFlow {
private static final Logger logger = LoggerFactory.getLogger("FLOW_EXECUTOR");
private static final Logger REPORT_LOGGER = LoggerFactory.getLogger("CONSUMER_REPORTER");
private static final int KEYWORD_COUNT = 100;
@Resource(name = "yhNoSyncZSetOperations")
... ... @@ -54,6 +56,10 @@ public class IncrementCrawlerFlow implements RetryBusinessFlow {
private List<String> validKeywordList = null;
private volatile List<String> succeedKeywords = new ArrayList<>(100);
private volatile List<String> failedKeywords = new ArrayList<>(100);
@Override
public String flowName() {
return this.getClass().getSimpleName();
... ... @@ -61,17 +67,26 @@ public class IncrementCrawlerFlow implements RetryBusinessFlow {
@Override
public void init() {
Set<String> keywordSet = new HashSet<>(1000);
succeedKeywords.clear();
failedKeywords.clear();
Set<String> keywordSet = new HashSet<>(200);
Set<String> topEmptySeachKeywords = new HashSet<>(100);
Set<String> topLessSeachKeywords = new HashSet<>(100);
Set<ZSetOperations.TypedTuple<String>> redisResults = yhNoSyncZSetOperations.reverseRangeWithScores(RedisKeys.getRedisKey4Yesterday(RedisKeys.YOHO_SEARCH_KEYWORDS_EMPTY), 0, KEYWORD_COUNT);
for (ZSetOperations.TypedTuple<String> typedTuple : redisResults) {
keywordSet.add(typedTuple.getValue());
topEmptySeachKeywords.add(typedTuple.getValue());
}
redisResults = yhNoSyncZSetOperations.reverseRangeWithScores(RedisKeys.getRedisKey4Yesterday(RedisKeys.YOHO_SEARCH_KEYWORDS_LESS), 0, KEYWORD_COUNT);
for (ZSetOperations.TypedTuple<String> typedTuple : redisResults) {
keywordSet.add(typedTuple.getValue());
topLessSeachKeywords.add(typedTuple.getValue());
}
REPORT_LOGGER.info("[key=TopEmptySeachKeywords][topEmptySeachKeywords={}]", topEmptySeachKeywords);
REPORT_LOGGER.info("[key=TopLessSeachKeywords][topLessSeachKeywords={}]", topLessSeachKeywords);
keywordSet.addAll(topEmptySeachKeywords);
keywordSet.addAll(topLessSeachKeywords);
logger.info("[func=IncrementCrawlerFlow.init][keywordSetSize={}]", keywordSet.size());
if (keywordSet.isEmpty()) {
return;
... ... @@ -89,6 +104,7 @@ public class IncrementCrawlerFlow implements RetryBusinessFlow {
logger.info("[func=IncrementCrawlerFlow.init][keywordSetSizeRemovedInvalid={}]", keywordSet.size());
this.validKeywordList = keywordSet.parallelStream().filter(keyword -> validKeyword(keyword)).collect(Collectors.toList());
logger.info("[func=IncrementCrawlerFlow.init][validKeywordListSize={}]", validKeywordList != null ? validKeywordList.size() : 0);
REPORT_LOGGER.info("[key=ValidKeywordList][validIncrementKeywords={}]", validKeywordList);
}
private boolean validKeyword(String keyword) {
... ... @@ -129,25 +145,41 @@ public class IncrementCrawlerFlow implements RetryBusinessFlow {
return true;
}
List<String> failedKeywords = new ArrayList<>();
List<String> tempFailedKeywords = new ArrayList<>();
List<String> tempSucceedKeywords = new ArrayList<>();
List<BaikeBO> baikeBOList = new ArrayList<>();
BaikeBO tempBaikeBO;
for (String keyword : subListKeywords) {
if ((tempBaikeBO = incrementCrawlerService.doCrawle(keyword)) != null) {
tempBaikeBO.setTitle(keyword);
baikeBOList.add(tempBaikeBO);
tempSucceedKeywords.add(keyword.toLowerCase().trim());
} else {
failedKeywords.add(keyword.toLowerCase().trim());
tempFailedKeywords.add(keyword.toLowerCase().trim());
}
}
logger.info("[func=IncrementCrawlerFlow.doBusiness][baikeBOListSize={}][failedKeywords={}]", baikeBOList.size(), failedKeywords);
logger.info("[func=IncrementCrawlerFlow.doBusiness][baikeBOListSize={}][failedKeywords={}]", baikeBOList.size(), tempFailedKeywords);
if (CollectionUtils.isNotEmpty(baikeBOList)) {
List<SpiderContent> spiderContentList = baikeBOList.stream().map(baikeBO -> baikeBO.toSpiderContent()).collect(Collectors.toList());
spiderContentService.insertBatch(spiderContentList);
}
blackKeywordsMgr.addBlackKeywords(failedKeywords);
blackKeywordsMgr.addBlackKeywords(tempFailedKeywords);
// 用于输出统计日志
if (!tempSucceedKeywords.isEmpty()) {
synchronized (this) {
this.succeedKeywords.addAll(tempSucceedKeywords);
}
}
if (!tempFailedKeywords.isEmpty()) {
synchronized (this) {
this.failedKeywords.addAll(tempFailedKeywords);
}
}
return true;
}
... ... @@ -155,6 +187,11 @@ public class IncrementCrawlerFlow implements RetryBusinessFlow {
public void finish(boolean doBusinessResult, Exception exception) {
this.validKeywordList = null;
logger.info("[func=IncrementCrawlerFlow.finish][doBusinessResult=" + doBusinessResult + "]", exception);
REPORT_LOGGER.info("[key=SucceedIncrementKeywords][succeedIncrementKeywords={}]", succeedKeywords);
REPORT_LOGGER.info("[key=FailedIncrementKeywords][failedIncrementKeywords={}]", failedKeywords);
succeedKeywords.clear();
failedKeywords.clear();
}
public static void main(String[] args) throws UnsupportedEncodingException {
... ...
... ... @@ -157,6 +157,20 @@
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level - %msg%n</pattern>
</encoder>
</appender>
<appender name="CONSUMER_REPORTER_APPEND" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${catalina.home}/logs/search-consumer/consumer-reporter.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${catalina.home}/logs/search-consumer/archived/consumer-reporter.%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>10MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<maxHistory>7</maxHistory>
</rollingPolicy>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} - %msg%n</pattern>
</encoder>
</appender>
<!-- 事件上报 appender -->
<appender name="SEARCH_EVENT_APPAND" class="ch.qos.logback.core.rolling.RollingFileAppender">
... ... @@ -229,6 +243,11 @@
<appender-ref ref="SPIDER_APPAND"/>
</logger>
<logger name="CONSUMER_REPORTER" additivity="false">
<level value="INFO"/>
<appender-ref ref="CONSUMER_REPORTER_APPEND"/>
</logger>
<!-- 事件上报日志 -->
<logger name="SEARCH_EVENT_LOG" additivity="false">
<level value="INFO"/>
... ...
... ... @@ -130,10 +130,10 @@
<fileNamePattern>${yoho.logs.basedir}/${yoho.search.consumer.env.namespace}/archived/retry-business.%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<!-- or whenever the file size reaches 100MB -->
<maxFileSize>10MB</maxFileSize>
<maxFileSize>${yoho.logs.maxFileSize}</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<!-- keep 30 days' worth of history -->
<maxHistory>7</maxHistory>
<maxHistory>${yoho.logs.maxHistory}</maxHistory>
</rollingPolicy>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger:%line - %msg%n</pattern>
... ... @@ -148,15 +148,29 @@
<fileNamePattern>${yoho.logs.basedir}/${yoho.search.consumer.env.namespace}/archived/spider.%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<!-- or whenever the file size reaches 100MB -->
<maxFileSize>10MB</maxFileSize>
<maxFileSize>${yoho.logs.maxFileSize}</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<!-- keep 30 days' worth of history -->
<maxHistory>7</maxHistory>
<maxHistory>${yoho.logs.maxHistory}</maxHistory>
</rollingPolicy>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level - %msg%n</pattern>
</encoder>
</appender>
<appender name="CONSUMER_REPORTER_APPEND" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${yoho.logs.basedir}/${yoho.search.consumer.env.namespace}/consumer-reporter.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${yoho.logs.basedir}/${yoho.search.consumer.env.namespace}/archived/consumer-reporter.%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>${yoho.logs.maxFileSize}</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<maxHistory>${yoho.logs.maxHistory}</maxHistory>
</rollingPolicy>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} - %msg%n</pattern>
</encoder>
</appender>
<!-- 事件上报 appender -->
<appender name="SEARCH_EVENT_APPAND" class="ch.qos.logback.core.rolling.RollingFileAppender">
... ... @@ -229,6 +243,11 @@
<appender-ref ref="SPIDER_APPAND"/>
</logger>
<logger name="CONSUMER_REPORTER" additivity="false">
<level value="INFO"/>
<appender-ref ref="CONSUMER_REPORTER_APPEND"/>
</logger>
<!-- 事件上报日志 -->
<logger name="SEARCH_EVENT_LOG" additivity="false">
<level value="INFO"/>
... ...