Authored by Gino Zhang

增加锁机制 避免并发支持爬虫

@@ -64,7 +64,7 @@ public class SuggestConvertorFlow implements RetryBusinessFlow { @@ -64,7 +64,7 @@ public class SuggestConvertorFlow implements RetryBusinessFlow {
64 return true; 64 return true;
65 } 65 }
66 66
67 - List<SpiderContent> filteredContentList = spiderContentList.stream().filter(spiderContent -> !existSourceSet.contains(spiderContent.getSubject())).collect(Collectors.toList()); 67 + List<SpiderContent> filteredContentList = spiderContentList.stream().filter(spiderContent -> spiderContent != null && !existSourceSet.contains(spiderContent.getSubject())).collect(Collectors.toList());
68 logger.info("[func=SuggestConversionFlow.doBusiness][pageNo={}][spiderContentListSize={}][filteredContentListSize={}]", pageNo, spiderContentList.size(), filteredContentList.size()); 68 logger.info("[func=SuggestConversionFlow.doBusiness][pageNo={}][spiderContentListSize={}][filteredContentListSize={}]", pageNo, spiderContentList.size(), filteredContentList.size());
69 if (CollectionUtils.isEmpty(filteredContentList)) { 69 if (CollectionUtils.isEmpty(filteredContentList)) {
70 return true; 70 return true;
@@ -10,6 +10,8 @@ import org.springframework.beans.factory.annotation.Autowired; @@ -10,6 +10,8 @@ import org.springframework.beans.factory.annotation.Autowired;
10 import org.springframework.scheduling.annotation.Scheduled; 10 import org.springframework.scheduling.annotation.Scheduled;
11 import org.springframework.stereotype.Component; 11 import org.springframework.stereotype.Component;
12 12
  13 +import java.util.concurrent.atomic.AtomicBoolean;
  14 +
13 /** 15 /**
14 * Created by ginozhang on 2017/3/2. 16 * Created by ginozhang on 2017/3/2.
15 */ 17 */
@@ -27,6 +29,9 @@ public class SpiderJob { @@ -27,6 +29,9 @@ public class SpiderJob {
27 @Autowired 29 @Autowired
28 private DynamicConfigService dynamicConfigService; 30 private DynamicConfigService dynamicConfigService;
29 31
  32 + // 避免连续触发
  33 + private volatile AtomicBoolean lockStatus = new AtomicBoolean(false);
  34 +
30 @Scheduled(cron = "0 30 0 * * ?") 35 @Scheduled(cron = "0 30 0 * * ?")
31 public void crawleEmptySearchKeywords() { 36 public void crawleEmptySearchKeywords() {
32 // 分析前一天前1000个搜索无结果或者小于10个的关键词 到baidubaike爬虫获取内容 37 // 分析前一天前1000个搜索无结果或者小于10个的关键词 到baidubaike爬虫获取内容
@@ -37,9 +42,15 @@ public class SpiderJob { @@ -37,9 +42,15 @@ public class SpiderJob {
37 return; 42 return;
38 } 43 }
39 44
40 - RetryBusinessFlowExecutor flowExecutor = new RetryBusinessFlowExecutor(incrementCrawlerFlow);  
41 - boolean result = flowExecutor.execute();  
42 - LOGGER.info("[func=crawleEmptySearchKeywords.end][result={}][cost={}]", result, System.currentTimeMillis() - begin); 45 + if (lockStatus.compareAndSet(false, true)) {
  46 + try {
  47 + RetryBusinessFlowExecutor flowExecutor = new RetryBusinessFlowExecutor(incrementCrawlerFlow);
  48 + boolean result = flowExecutor.execute();
  49 + LOGGER.info("[func=crawleEmptySearchKeywords.end][result={}][cost={}]", result, System.currentTimeMillis() - begin);
  50 + } finally {
  51 + lockStatus.set(false);
  52 + }
  53 + }
43 } 54 }
44 55
45 @Scheduled(cron = "0 40 1 * * ?") 56 @Scheduled(cron = "0 40 1 * * ?")
@@ -52,8 +63,14 @@ public class SpiderJob { @@ -52,8 +63,14 @@ public class SpiderJob {
52 return; 63 return;
53 } 64 }
54 65
55 - RetryBusinessFlowExecutor flowExecutor = new RetryBusinessFlowExecutor(suggestConvertorFlow);  
56 - boolean result = flowExecutor.execute();  
57 - LOGGER.info("[func=convertSpiderContents.end][result={}][cost={}]", result, System.currentTimeMillis() - begin); 66 + if (lockStatus.compareAndSet(false, true)) {
  67 + try {
  68 + RetryBusinessFlowExecutor flowExecutor = new RetryBusinessFlowExecutor(suggestConvertorFlow);
  69 + boolean result = flowExecutor.execute();
  70 + LOGGER.info("[func=convertSpiderContents.end][result={}][cost={}]", result, System.currentTimeMillis() - begin);
  71 + } finally {
  72 + lockStatus.set(false);
  73 + }
  74 + }
58 } 75 }
59 } 76 }