...
|
...
|
@@ -35,6 +35,8 @@ public class YohoIndexDataLoader implements IYohoIndexDataLoader, ApplicationEve |
|
|
|
|
|
private static final ExecutorService tblThreadPool = Executors.newFixedThreadPool(1);
|
|
|
|
|
|
private static final int MAX_RETRY_TIMES = 5;
|
|
|
|
|
|
private ApplicationEventPublisher publisher;
|
|
|
|
|
|
ApplicationContext applicationContext;
|
...
|
...
|
@@ -107,10 +109,10 @@ public class YohoIndexDataLoader implements IYohoIndexDataLoader, ApplicationEve |
|
|
int pageNo, int limit) throws Exception {
|
|
|
int tryCount = 1;
|
|
|
boolean result = false;
|
|
|
while (tryCount <= 5 && !result) {
|
|
|
while (tryCount <= MAX_RETRY_TIMES && !result) {
|
|
|
long begin = System.currentTimeMillis();
|
|
|
INDEX_REBUILD_LOG.info("[yohoIndexName=[{}]],[pageNo={}],[tryCount={}],[begin={}]", yohoIndexName, pageNo, tryCount, begin);
|
|
|
result = this.doLoadData(yohoIndexName, tempIndexRealName, indexBuilder, client, pageNo, limit);
|
|
|
result = this.doLoadData(yohoIndexName, tempIndexRealName, indexBuilder, client, pageNo, limit, tryCount);
|
|
|
long cost = System.currentTimeMillis() - begin;
|
|
|
INDEX_REBUILD_LOG.info("[yohoIndexName=[{}]],[pageNo={}],[tryCount={}],[begin={}],[cost={}],[{result={}]", yohoIndexName, pageNo, tryCount, begin, cost, result);
|
|
|
tryCount++;
|
...
|
...
|
@@ -119,7 +121,7 @@ public class YohoIndexDataLoader implements IYohoIndexDataLoader, ApplicationEve |
|
|
}
|
|
|
|
|
|
private boolean doLoadData(final String yohoIndexName, final String tempIndexRealName, final IIndexBuilder indexBuilder, final IElasticsearchClient client, int pageNo,
|
|
|
int limit) {
|
|
|
int limit, int tryCount) {
|
|
|
try {
|
|
|
int start = (pageNo - 1) * limit;
|
|
|
List<?> dataList = indexBuilder.getPageLists(start, limit);
|
...
|
...
|
@@ -135,11 +137,8 @@ public class YohoIndexDataLoader implements IYohoIndexDataLoader, ApplicationEve |
|
|
}
|
|
|
BulkResponse bulkResponse = client.bulk(bluks);
|
|
|
if (bulkResponse.hasFailures()) {
|
|
|
Exception e = new Exception(String.format("addIndexDataBean has fail,[yohoIndexName=[%s]],[pageNo=%s],[failureMessage=%s]", yohoIndexName, pageNo,
|
|
|
throw new Exception(String.format("addIndexDataBean has fail,[yohoIndexName=[%s]],[pageNo=%s],[failureMessage=%s]", yohoIndexName, pageNo,
|
|
|
bulkResponse.buildFailureMessage()));
|
|
|
publisher.publishEvent(new SearchEvent(EventReportEnum.YOHOINDEXDATALOADER_DOLOADDATA.getEventName(), EventReportEnum.YOHOINDEXDATALOADER_DOLOADDATA
|
|
|
.getFunctionName(), EventReportEnum.YOHOINDEXDATALOADER_DOLOADDATA.getMoudleName(), "exception", e, null));
|
|
|
throw e;
|
|
|
}
|
|
|
long end = System.currentTimeMillis();
|
|
|
INDEX_REBUILD_LOG.info("[addDataToEs end][yohoIndexName=[{}]],[pageNo={}],[count={}][cost={}]", yohoIndexName, pageNo, dataList.size(), end - begin);
|
...
|
...
|
@@ -148,7 +147,14 @@ public class YohoIndexDataLoader implements IYohoIndexDataLoader, ApplicationEve |
|
|
return true;
|
|
|
} catch (Exception e) {
|
|
|
// 如果有异常,则处理一下,并等待30s执行下一次
|
|
|
handelException(yohoIndexName, pageNo, e);
|
|
|
INDEX_REBUILD_LOG.error("Load data failed![index=" + yohoIndexName + "][pageNo=" + pageNo + "][message=" + e.getMessage() + "]", e);
|
|
|
if (tryCount == MAX_RETRY_TIMES){
|
|
|
// 达到最大重试次数上报异常至日志
|
|
|
publisher.publishEvent(new SearchEvent(EventReportEnum.YOHOINDEXDATALOADER_DOLOADDATA.getEventName(), EventReportEnum.YOHOINDEXDATALOADER_DOLOADDATA
|
|
|
.getFunctionName(), EventReportEnum.YOHOINDEXDATALOADER_DOLOADDATA.getMoudleName(), "exception", e, "yohoIndexName=" + yohoIndexName + "/pageNo=" + pageNo));
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
try {
|
|
|
Thread.sleep(30000);
|
|
|
} catch (Exception e2) {
|
...
|
...
|
@@ -157,13 +163,6 @@ public class YohoIndexDataLoader implements IYohoIndexDataLoader, ApplicationEve |
|
|
}
|
|
|
}
|
|
|
|
|
|
private void handelException(String yohoIndexName, int pageNo, Exception e) {
|
|
|
// 1、上报异常至日志
|
|
|
INDEX_REBUILD_LOG.error(e.getMessage(), e);
|
|
|
publisher.publishEvent(new SearchEvent(EventReportEnum.YOHOINDEXDATALOADER_HANDELEXCEPTION.getEventName(), EventReportEnum.YOHOINDEXDATALOADER_HANDELEXCEPTION
|
|
|
.getFunctionName(), EventReportEnum.YOHOINDEXDATALOADER_HANDELEXCEPTION.getMoudleName(), "exception", e, "yohoIndexName=" + yohoIndexName + "/pageNo=" + pageNo));
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
|
|
|
this.applicationContext = applicationContext;
|
...
|
...
|
|