Authored by hugufei

fix

... ... @@ -56,15 +56,9 @@ public class YohoIndexDataLoader implements IYohoIndexDataLoader, ApplicationEve
INDEX_REBUILD_LOG.error("为索引[{}]加载数据失败,, 不存在[yohoIndexName={}]的EsMapping", tempIndexRealName, yohoIndexName);
return false;
}
boolean result = true;
long begin = System.currentTimeMillis();
// 如果是全球购索引/图片向量索引,大批量会内存溢出,每次只取500条数据
int limit = rebuildPageSize == null ? 500 : rebuildPageSize;
// 如果是全球购的,则只用1个线程就行了【全球购的数据库太烂,并发太高返回不了】
ExecutorService executorService = threadPool;
List<Future<Boolean>> futureResults = new ArrayList<Future<Boolean>>();
for (IIndexBuilder indexBuilder : indexBuilderList) {
// 2、获取总的记录数,设置每页取1条,因为无需拿数据
... ... @@ -79,7 +73,7 @@ public class YohoIndexDataLoader implements IYohoIndexDataLoader, ApplicationEve
futureResults.clear();
for (int pageNo = 1; pageNo <= totalPageSize; pageNo++) {
final int taskPageNo = pageNo;
Future<Boolean> futureResult = executorService.submit(new Callable<Boolean>() {
Future<Boolean> futureResult = threadPool.submit(new Callable<Boolean>() {
@Override
public Boolean call() {
return doLoadDataWithRetry(yohoIndexName, tempIndexRealName, indexBuilder, client, taskPageNo, limit);
... ... @@ -100,7 +94,6 @@ public class YohoIndexDataLoader implements IYohoIndexDataLoader, ApplicationEve
break;
}
}
INDEX_REBUILD_LOG.info("[{}:mysql-->index]完毕,耗时:{}ms", tempIndexRealName, System.currentTimeMillis() - begin);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
... ...
... ... @@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSON;
import com.yoho.error.event.SearchEvent;
import com.yoho.search.base.utils.EventReportEnum;
import com.yoho.search.base.utils.ISearchConstants;
import com.yoho.search.base.utils.MoudleEnum;
import com.yoho.search.consumer.service.utils.CostStatistics;
import com.yoho.search.consumer.common.IYohoIndexService;
import com.yoho.search.consumer.index.fullbuild.tbl.TblProductIndexNewBuilder;
... ... @@ -189,10 +190,8 @@ public class GlobalIndexBulkService implements ApplicationEventPublisherAware {
return THREAD_SLEEP_WORK;// 让线程休息50ms继续工作
} catch (Exception e) {
publisher.publishEvent(new SearchEvent(EventReportEnum.PRODUCTPOOLINDEXSERVICE_DELETEGLOBALINDEX.getEventName(),
EventReportEnum.PRODUCTPOOLINDEXSERVICE_DELETEGLOBALINDEX.getFunctionName(), EventReportEnum.PRODUCTPOOLINDEXSERVICE_DELETEGLOBALINDEX.getMoudleName(),
"exception", IgnoreSomeException.filterSomeException(e), null));
logger.error(e.getMessage(), e);
publisher.publishEvent(new SearchEvent("doBulkDeleteGlobalIndex","doBulkDeleteGlobalIndex.GlobalIndexBulkService", MoudleEnum.consumer, "exception", IgnoreSomeException.filterSomeException(e), null));
return THREAD_SLEEP_WORK;
}
}
... ...