Authored by all4you

optimize rebuild

... ... @@ -4,6 +4,7 @@ import com.yoho.search.dal.model.SearchParam;
import com.yoho.search.dal.model.SearchResult;
import com.yohomars.search.index.model.ESBluk;
import com.yohomars.search.index.model.IYohoIndex;
import com.yohomars.search.utils.Index;
import org.elasticsearch.action.admin.indices.analyze.AnalyzeResponse.AnalyzeToken;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetResponse;
... ... @@ -119,6 +120,7 @@ public interface IYohoIndexService {
/**
* 执行索引优化
* @param yohoIndexName
*/
void optimize(String yohoIndexName) throws Exception;
... ... @@ -128,13 +130,13 @@ public interface IYohoIndexService {
* @param yohoIndexName
*
*/
void rebuild(final String yohoIndexName);
void rebuildIndex(final String yohoIndexName);
/**
* 更新增量的索引数据
* @param yohoIndexName
*/
void buildAppended(final String yohoIndexName);
void appendIndex(final String yohoIndexName);
/**
* 添加索引数据
... ... @@ -146,15 +148,6 @@ public interface IYohoIndexService {
*/
void addIndexData(String yohoIndexName, String id, Object data) throws Exception;
/**
* 批量添加索引数据
*
* @param yohoIndexName
* 索引名称
* @param dataList
* 索引数据列表
*/
//void addIndexData(String yohoIndexName, List<Map<String, Object>> dataList) throws Exception;
/**
* 删除索引数据
... ... @@ -163,13 +156,6 @@ public interface IYohoIndexService {
*/
void deleteIndexData(String yohoIndexName, String id) throws Exception;
/**
* 根据query删除索引数据
*
* @param yohoIndexName
* @param query
*/
// void deleteIndexDataByQuery(String yohoIndexName, String query) throws Exception;
/**
* 更新索引数据
... ...
... ... @@ -4,6 +4,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
@Deprecated
@Service
public class RebuildFlagService {
... ...
... ... @@ -98,11 +98,6 @@ public class YohoIndexDataLoader implements ApplicationContextAware {
} catch (Exception e) {
// 如果有异常,则处理一下,并等待3s执行下一次
INDEX_REBUILD_LOG.error("doLoadAppendedData error,cause:", e);
try {
Thread.sleep(3000);
} catch (Exception e2) {
e2.printStackTrace();
}
return 0;
}
}
... ...
... ... @@ -16,7 +16,6 @@ import com.yohomars.search.index.model.ESBluk;
import com.yohomars.search.index.model.IYohoIndex;
import com.yohomars.search.index.model.impl.YohoIndexImpl;
import com.yohomars.search.index.service.IYohoIndexService;
import com.yohomars.search.index.updater.IndexUpdater;
import com.yohomars.search.utils.FileUtils;
import com.yohomars.search.utils.Index;
import com.yohomars.search.utils.JaxbBinder;
... ... @@ -52,8 +51,7 @@ public class YohoIndexServiceImpl implements IYohoIndexService {
@Autowired
private YohoIndexHelper yohoIndexHelper;
@Autowired
private RebuildFlagService rebuildFlagService;
@Autowired
private YohoIndexDataLoader yohoIndexDataLoader;
... ... @@ -322,14 +320,14 @@ public class YohoIndexServiceImpl implements IYohoIndexService {
* @param yohoIndexName 索引名称
*/
@Override
public void rebuild(final String yohoIndexName) {
public void rebuildIndex(final String yohoIndexName) {
final IYohoIndex yohoIndex = this.getIndex(yohoIndexName);
if (yohoIndex == null) {
return;
}
// 如果是重建的social_user的索引,则将记在redis中的lastUpdateTime删除,不然getTotalCount时不能获取到全部的数据
// 将记在redis中的lastUpdateTime删除,不然getTotalCount时不能获取到全部的数据
Index index = Index.getIndex(yohoIndexName);
if(index == Index.social_user){
if(index != null){
String key = index.getKey();
INDEX_REBUILD_LOG.info("delete redisKey={},yohoIndexName={}",key,yohoIndexName);
if(redisValueHelper.hasKey(key)){
... ... @@ -338,67 +336,62 @@ public class YohoIndexServiceImpl implements IYohoIndexService {
}
try {
long begin = System.currentTimeMillis();
INDEX_REBUILD_LOG.info("rebuild [{}],step=[1.rebuild begin], begin=[{}] ", yohoIndexName, begin);
INDEX_REBUILD_LOG.info("rebuildIndex [{}],step=[1.rebuildIndex begin], begin=[{}] ", yohoIndexName, begin);
// 0、当前正在全量建索引
rebuildFlagService.updateBuilding(true);
IElasticsearchClient client = yohoIndex.getIndexClient();
final IndexBuilder indexBuilder = yohoIndex.getIndexBuilder();
// 1、删除因停机而产生的临时索引
INDEX_REBUILD_LOG.info("rebuild [{}],step=[2.deleteTempIndex]", yohoIndexName);
INDEX_REBUILD_LOG.info("rebuildIndex [{}],step=[2.deleteTempIndex]", yohoIndexName);
this.deleteTempIndex(yohoIndexName);
// 2、获取临时索引的别名
String yohoTemplateIndexName = yohoIndexHelper.genTempIndexName(yohoIndexName);
// 3、创建临时索引
INDEX_REBUILD_LOG.info("rebuild [{}],step=[3.createTempIndex begin],mappingContent=[{}],setting=[{}],properties=[{}]", yohoIndexName, yohoIndex.getMappingContent(), yohoIndex.getProperties());
INDEX_REBUILD_LOG.info("rebuildIndex [{}],step=[3.createTempIndex begin],mappingContent=[{}],setting=[{}],properties=[{}]", yohoIndexName, yohoIndex.getMappingContent(), yohoIndex.getProperties());
String tempIndexRealName = this.createIndex(yohoIndexName, yohoTemplateIndexName, false);
INDEX_REBUILD_LOG.info("rebuild [{}],step=[3.createTempIndex success],tempIndexRealName=[{}],", yohoIndexName, tempIndexRealName);
INDEX_REBUILD_LOG.info("rebuildIndex [{}],step=[3.createTempIndex success],tempIndexRealName=[{}],", yohoIndexName, tempIndexRealName);
// 4、针对批量插入数据,进行索引的优化设置
INDEX_REBUILD_LOG.info("rebuild [{}],step=[4.optimumSettingsForBulkIndexing begin]", yohoIndexName);
INDEX_REBUILD_LOG.info("rebuildIndex [{}],step=[4.optimumSettingsForBulkIndexing begin]", yohoIndexName);
Map<String, String> originalSettings = client.optimumSettingsForBulkIndexing(tempIndexRealName);
INDEX_REBUILD_LOG.info("rebuild [{}],step=[4.optimumSettingsForBulkIndexing success],originalSettings=[{}]", yohoIndexName, originalSettings);
INDEX_REBUILD_LOG.info("rebuildIndex [{}],step=[4.optimumSettingsForBulkIndexing success],originalSettings=[{}]", yohoIndexName, originalSettings);
// 5、重建数据
INDEX_REBUILD_LOG.info("rebuild [{}],step=[5.loadAllData begin]", yohoIndexName);
INDEX_REBUILD_LOG.info("rebuildIndex [{}],step=[5.loadAllData begin]", yohoIndexName);
if (!yohoIndexDataLoader.loadAllData(yohoIndexName, tempIndexRealName, indexBuilder, client)) {
INDEX_REBUILD_LOG.error("rebuild [{}],loadAllData fail,direct return...", yohoIndexName);
INDEX_REBUILD_LOG.error("rebuildIndex [{}],loadAllData fail,direct return...", yohoIndexName);
return;
}else{
// 目前只有social_user的索引有增量更新操作
// 如果是重建的social_user的索引,则记住当前时间作为最后更新时间,增量更新索引时根据该时间获取变动的数据
if(index == Index.social_user){
// 记住当前时间作为最后更新时间,增量更新索引时根据该时间获取变动的数据
if(index != null){
INDEX_REBUILD_LOG.info("loadAllData success will set currentTime to redis with key={}",index.getKey());
redisValueHelper.set(index.getKey(),System.currentTimeMillis(),1, TimeUnit.DAYS);
}
}
INDEX_REBUILD_LOG.info("rebuild [{}],step=[5.loadAllData success]", yohoIndexName);
INDEX_REBUILD_LOG.info("rebuildIndex [{}],step=[5.loadAllData success]", yohoIndexName);
// 6、刷新缓存
INDEX_REBUILD_LOG.info("rebuild [{}],step=[6.refreshIndex begin],tempIndexRealName=[{}]", yohoIndexName, tempIndexRealName);
INDEX_REBUILD_LOG.info("rebuildIndex [{}],step=[6.refreshIndex begin],tempIndexRealName=[{}]", yohoIndexName, tempIndexRealName);
client.refreshIndex(tempIndexRealName);
INDEX_REBUILD_LOG.info("rebuild [{}],step=[6.refreshIndex success]", yohoIndexName);
INDEX_REBUILD_LOG.info("rebuildIndex [{}],step=[6.refreshIndex success]", yohoIndexName);
// 7、恢复原来的索引设置
INDEX_REBUILD_LOG.info("rebuild [{}],step=[7.setIndexSettings begin],tempIndexRealName=[{}],originalSettings=[{}]", yohoIndexName, tempIndexRealName, originalSettings);
INDEX_REBUILD_LOG.info("rebuildIndex [{}],step=[7.setIndexSettings begin],tempIndexRealName=[{}],originalSettings=[{}]", yohoIndexName, tempIndexRealName, originalSettings);
client.setIndexSettings(tempIndexRealName, originalSettings);
INDEX_REBUILD_LOG.info("rebuild [{}],step=[7.setIndexSettings success]", yohoIndexName);
INDEX_REBUILD_LOG.info("rebuildIndex [{}],step=[7.setIndexSettings success]", yohoIndexName);
// 8、索引替换:将老索引的索引名加到新索引上,并删除老索引
INDEX_REBUILD_LOG.info("rebuild [{}],step=[8.replaceIndex begin],yohoIndexName=[{}],yohoTemplateIndexName=[{}]", yohoIndexName, yohoIndexName, yohoTemplateIndexName);
INDEX_REBUILD_LOG.info("rebuildIndex [{}],step=[8.replaceIndex begin],yohoIndexName=[{}],yohoTemplateIndexName=[{}]", yohoIndexName, yohoIndexName, yohoTemplateIndexName);
this.replaceIndex(yohoIndexName, yohoTemplateIndexName);
INDEX_REBUILD_LOG.info("rebuild [{}],step=[8.replaceIndex success],yohoIndexName=[{}]", yohoIndexName, yohoIndexName);
INDEX_REBUILD_LOG.info("rebuildIndex [{}],step=[8.replaceIndex success],yohoIndexName=[{}]", yohoIndexName, yohoIndexName);
// 9、记录耗时时间
INDEX_REBUILD_LOG.info("rebuild [{}],step=[9.rebuild success],cost=[{}]s", yohoIndexName, (System.currentTimeMillis()-begin)/1000);
INDEX_REBUILD_LOG.info("rebuildIndex [{}],step=[9.rebuildIndex success],cost=[{}]s", yohoIndexName, (System.currentTimeMillis()-begin)/1000);
} catch (Exception e) {
INDEX_REBUILD_LOG.error(e.getMessage(), e);
} finally {
rebuildFlagService.updateBuilding(false);
}
}
... ... @@ -407,39 +400,37 @@ public class YohoIndexServiceImpl implements IYohoIndexService {
* @param yohoIndexName
*/
@Override
public void buildAppended(final String yohoIndexName) {
public void appendIndex(final String yohoIndexName) {
final IYohoIndex yohoIndex = this.getIndex(yohoIndexName);
if (yohoIndex == null) {
return;
}
Index index = Index.getIndex(yohoIndexName);
if(!this.indexExists(yohoIndexName)){
INDEX_REBUILD_LOG.info("YohoIndex does not exists when buildAppended with indexName=[{}] will call rebuild instead", yohoIndexName);
this.rebuild(yohoIndexName);
INDEX_REBUILD_LOG.info("YohoIndex does not exists when appendIndex with indexName=[{}] will call rebuildIndex instead", yohoIndexName);
this.rebuildIndex(yohoIndexName);
}else{
try {
long begin = System.currentTimeMillis();
INDEX_REBUILD_LOG.info("buildAppended [{}],step=[1.buildAppended begin], begin=[{}] ", yohoIndexName, begin);
INDEX_REBUILD_LOG.info("appendIndex [{}],step=[1.appendIndex begin], begin=[{}] ", yohoIndexName, begin);
IElasticsearchClient client = yohoIndex.getIndexClient();
final IndexAppender indexAppender = yohoIndex.getIndexAppender();
// 2、更新数据
if (yohoIndexDataLoader.loadAppendedData(yohoIndexName, indexAppender, client)) {
// 如果是重建的social_user的索引,则记住当前时间作为最后更新时间,增量更新索引时根据该时间获取变动的数据
if(index == Index.social_user){
// 记住当前时间作为最后更新时间,下一次增量更新索引时根据该时间获取变动的数据
if(index != null){
INDEX_REBUILD_LOG.info("loadAppendData success will set currentTime to redis with key={}",index.getKey());
redisValueHelper.set(index.getKey(), System.currentTimeMillis(), 1, TimeUnit.DAYS);
}
}
// 3、记录耗时时间
INDEX_REBUILD_LOG.info("buildAppended [{}],step=[3.buildAppended success],cost=[{}]s", yohoIndexName, (System.currentTimeMillis()-begin)/1000);
INDEX_REBUILD_LOG.info("appendIndex [{}],step=[3.appendIndex success],cost=[{}]s", yohoIndexName, (System.currentTimeMillis()-begin)/1000);
} catch (Exception e) {
INDEX_REBUILD_LOG.error("buildAppended [{}] error:",yohoIndexName, e);
} finally {
rebuildFlagService.updateBuilding(false);
INDEX_REBUILD_LOG.error("appendIndex [{}] error:",yohoIndexName, e);
}
}
}
... ...
package com.yohomars.search.queue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.util.Map;
import java.util.concurrent.*;
/**
* @author gris.wang
* @since 2018/3/8
**/
@Service
public class RunnableQueueFactory {
private static final Logger LOGGER = LoggerFactory.getLogger(RunnableQueueFactory.class);
private static final int MAX_QUEUE_SIZE = 100;
private Map<String,BlockingQueue<Runnable>> queueMap = new ConcurrentHashMap<>();
private ExecutorService executorService = Executors.newFixedThreadPool(10);
/**
* add task
* @param taskType
* @param task
*/
public void addTask(String taskType, Runnable task){
BlockingQueue<Runnable> queue = queueMap.get(taskType);
if(queue==null){
queue = new ArrayBlockingQueue<>(MAX_QUEUE_SIZE);
putTask(queue,taskType,task);
queueMap.putIfAbsent(taskType,queue);
initQueue(taskType);
}else{
putTask(queue,taskType,task);
}
}
private void putTask(BlockingQueue<Runnable> queue,String taskType,Runnable task){
try {
queue.put(task);
LOGGER.info("addTask success,with taskType=[{}]",taskType);
} catch (InterruptedException e) {
LOGGER.error("put task error,with taskType=[{}],cause:",taskType,e);
}
}
private void initQueue(String taskType){
executorService.execute(new Runnable() {
@Override
public void run() {
BlockingQueue<Runnable> queue = queueMap.get(taskType);
if(queue!=null){
LOGGER.info("initQueue success,with taskType=[{}]",taskType);
try {
for(;;) {
LOGGER.info("try to take a task from queue,with taskType=[{}]",taskType);
Runnable task = queue.take();
if(task!=null) {
LOGGER.info("take a task success,with taskType=[{}],will run it directly",taskType);
task.run();
}
}
}catch(Exception e){
LOGGER.error("take task error,with taskType=[{}],cause:",taskType,e);
}
}else{
LOGGER.error("initQueue failed,with queue=null and taskType=[{}]",taskType);
}
}
});
}
}
... ...
... ... @@ -30,7 +30,7 @@ public class ISearchConstans {
//public static final String PARAM_SYNC_SKN = "product_skn"; // SKN
public static final String REDIS_INDEX_REBUILDING_STATUS = "index.rebuild.status";
public static final String REDIS_INDEX_REBUILDING_STATUS = "index.rebuildIndex.status";
public static final String REDIS_INDEX_REBUILDING_STATUS_IS_REBUILDING = "isRebuilding";
public static final String REDIS_INDEX_REBUILDING_STATUS_NOT_REBUILDING = "notRebuilding";
public static final String ACTION_INSERT = "2";
... ...
package com.yohomars.search.job;
import com.yohomars.search.index.service.IYohoIndexService;
import com.yohomars.search.index.service.impl.RebuildFlagService;
import com.yohomars.search.queue.RunnableQueueFactory;
import com.yohomars.search.utils.Index;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
... ... @@ -15,13 +15,13 @@ import org.springframework.stereotype.Component;
@Component
public class IndexRebuildJob {
private static final Logger LOGGER = LoggerFactory.getLogger(IndexRebuildJob.class);
@Autowired
private IYohoIndexService yohoIndexService;
@Autowired
private RebuildFlagService rebuildFlagService;
private static final Logger logger = LoggerFactory.getLogger(IndexRebuildJob.class);
private RunnableQueueFactory runnableQueueFactory;
/**
... ... @@ -29,54 +29,60 @@ public class IndexRebuildJob {
*/
@Scheduled(cron = "0 0 */1 * * ?")
public void rebuildIndexJob() {
this.rebuildIndex(Index.social_user.getIndexName());
this.rebuildIndex(Index.social_user);
}
/**
* 定时任务重建所有索引,数据量少的索引(每10分钟执行一次)
*/
@Scheduled(cron = "0 */10 * * * ?")
public void rebuildSmallIndexJob() {
// store的数据量少,也每10分钟直接重建一次
this.rebuildIndex(Index.store.getIndexName());
}
/**
* 定时任务增量更新所有索引(每10分钟执行一次)
*/
@Scheduled(cron = "50 */10 * * * ?")
public void buildUpdateIndexJob() {
this.buildAppendedIndex(Index.social_user.getIndexName());
@Scheduled(cron = "0 */10 * * * ?")
public void appendIndexJob() {
// store的数据量少,也每10分钟直接重建一次
this.rebuildIndex(Index.store);
this.appendIndex(Index.social_user);
}
/**
* 重建索引
*/
private void rebuildIndex(String indexName) {
private void rebuildIndex(Index index) {
try {
long begin = System.currentTimeMillis();
logger.info("rebuildIndex start----[indexName={}][begin={}]", indexName, begin);
rebuildFlagService.waitWhileIsRebuilding();
yohoIndexService.rebuild(indexName);
logger.info("rebuildIndex end----[indexName={}][end={}][cost={}s]", indexName, System.currentTimeMillis(), (System.currentTimeMillis()-begin)/1000);
if(index==null){
LOGGER.error("rebuildIndex failed with index==null");
return;
}
String indexName = index.getIndexName();
runnableQueueFactory.addTask(indexName, new Runnable() {
@Override
public void run() {
yohoIndexService.rebuildIndex(indexName);
}
});
} catch (Exception e) {
logger.error(e.getMessage(), e);
LOGGER.error("rebuildIndex error,cause:", e);
}
}
/**
* 增量更新索引
* @param indexName
* @param index
*/
private void buildAppendedIndex(String indexName){
private void appendIndex(Index index){
try {
long begin = System.currentTimeMillis();
logger.info("buildAppendedIndex start----[indexName={}][begin={}]", indexName, begin);
rebuildFlagService.waitWhileIsRebuilding();
yohoIndexService.buildAppended(indexName);
logger.info("buildAppendedIndex end----[indexName={}][end={}][cost={}s]", indexName, System.currentTimeMillis(), (System.currentTimeMillis()-begin)/1000);
if(index==null){
LOGGER.error("buildAppendedIndex failed with index==null");
return;
}
String indexName = index.getIndexName();
runnableQueueFactory.addTask(indexName, new Runnable() {
@Override
public void run() {
yohoIndexService.appendIndex(indexName);
}
});
} catch (Exception e) {
logger.error(e.getMessage(), e);
LOGGER.error("buildAppendedIndex error,cause:", e);
}
}
... ...
package com.yohomars.search.restapi;
import com.yohomars.search.index.service.IYohoIndexService;
import com.yohomars.search.index.service.impl.RebuildFlagService;
import com.yohomars.search.job.IndexRebuildJob;
import com.yohomars.search.queue.RunnableQueueFactory;
import com.yohomars.search.service.IndexService;
import com.yohomars.search.utils.Index;
import org.apache.commons.lang.StringUtils;
... ... @@ -24,17 +23,25 @@ import java.util.Map;
@Controller
public class IndexController extends BaseController {
private static final Logger logger = LoggerFactory.getLogger(IndexController.class);
private static final Logger LOGGER = LoggerFactory.getLogger(IndexController.class);
@Autowired
private IYohoIndexService yohoIndexService;
@Autowired
private RebuildFlagService rebuildFlagService;
@Autowired
private IndexRebuildJob indexRebuildJob;
private RunnableQueueFactory runnableQueueFactory;
@Autowired
private IndexService indexService;
private void rebuild(String indexName){
runnableQueueFactory.addTask(indexName, new Runnable() {
@Override
public void run() {
yohoIndexService.rebuildIndex(indexName);
}
});
}
@RequestMapping(value = "/test")
@ResponseBody
... ... @@ -71,19 +78,17 @@ public class IndexController extends BaseController {
@RequestMapping(value = "/index/rebuildAll")
@ResponseBody
public Map<String, Object> rebuildAll() {
if (rebuildFlagService.isRebuilding()) {
return ReturnMessage(400, 0, "current has index rebuilding,please wait......");
}
indexRebuildJob.rebuildIndexJob();
indexRebuildJob.rebuildSmallIndexJob();
rebuild(Index.social_user.getIndexName());
rebuild(Index.store.getIndexName());
return ReturnMessage(200, 0, "rebuildAll success");
}
@RequestMapping(value = "/index/rebuild/{indexName}")
@ResponseBody
public Map<String, Object> rebuild(@PathVariable String indexName, HttpServletRequest request) {
if (rebuildFlagService.isRebuilding()) {
return ReturnMessage(400, 1, "current has index rebuilding,please wait......");
Index index = Index.getIndex(indexName);
if(index==null){
return ReturnMessage(400, 1, "indexName=["+indexName+"] is invalid");
}
try {
boolean isExist = yohoIndexService.indexExists(indexName);
... ... @@ -91,11 +96,12 @@ public class IndexController extends BaseController {
if (!isExist) {
yohoIndexService.createIndex(indexName, true);
}
yohoIndexService.rebuild(indexName);
rebuild(indexName);
} catch (Exception e) {
return ReturnMessage(400, 1, "rebuild " + indexName + " error: " + e.getMessage());
LOGGER.error("rebuildIndex indexName={} error,cause:",indexName,e);
return ReturnMessage(400, 1, "rebuildIndex " + indexName + " error: " + e.getMessage());
}
return ReturnMessage(200, 0, "rebuild " + indexName + " success");
return ReturnMessage(200, 0, "rebuildIndex " + indexName + " success");
}
/**
... ...