Authored by Gino Zhang

BaikeBulkService可停止

... ... @@ -9,6 +9,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
/**
... ... @@ -24,29 +25,42 @@ public class BaikeBOBulkService {
private static final List<BaikeBO> blukList = new ArrayList<>();
private static volatile boolean started = false;
private static AtomicInteger counter = new AtomicInteger(1);
public static void submitBaike(BaikeBO baikeBO) {
if (!started) {
throw new RuntimeException("The bulk thread is destroyed!");
}
queue.add(baikeBO);
}
static {
public static void start() {
started = true;
new Thread(() -> {
while (true) {
while (started) {
try {
blukList.addAll(getElementsFromBlockingQueue());
if (blukList.size() > 0) {
doBulk();
Thread.sleep(100);
} else {
Thread.sleep(1000);
} else {
Thread.sleep(10000);
}
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}
}
}, "BaikeBOBulkThread-" + counter.getAndIncrement()
).start();
}
public static void destroy() {
started = false;
}
private static void doBulk() {
SpiderContentService spiderContentService = (SpiderContentService) ApplicationContextUtil.getBean(SpiderContentService.class);
List<SpiderContent> spiderContentList = blukList.stream().map(baikeBO -> baikeBO.toSpiderContent()).collect(Collectors.toList());
... ...
... ... @@ -66,22 +66,28 @@ public class BaikeSpiderService {
Set<String> existSubjects = spiderContentService.getAllSubjects();
//Set<String> yohoKeywords = getAllYohoKeywords();
LOGGER.info("[func=BaikeSpiderService.init][cost={}]", System.currentTimeMillis() - begin);
int newCount = 0;
ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE, thread -> new Thread(thread, "BaikeSpider-" + atomicInteger.getAndIncrement()));
List<Future<Integer>> futures = new ArrayList<>(baikeUrlFiles.length);
for (File baikeUrlFile : baikeUrlFiles) {
futures.add(pool.submit(new WebCrawler(baikeUrlFile, existSubjects)));
}
try {
BaikeBOBulkService.start();
ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE, thread -> new Thread(thread, "BaikeSpider-" + atomicInteger.getAndIncrement()));
List<Future<Integer>> futures = new ArrayList<>(baikeUrlFiles.length);
for (File baikeUrlFile : baikeUrlFiles) {
futures.add(pool.submit(new WebCrawler(baikeUrlFile, existSubjects)));
}
int newCount = 0;
for (Future<Integer> future : futures) {
try {
newCount += future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
throw new RuntimeException(e);
for (Future<Integer> future : futures) {
try {
newCount += future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
BaikeBOBulkService.destroy();
}
result.put("existSubjects", existSubjects);
... ...