Authored by 胡古飞

批量更新时需要检查当前是否正在全量建索引

package com.yoho.search.consumer.index.common.impl;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;
import javax.annotation.PostConstruct;
import org.apache.commons.lang.StringUtils;
import org.elasticsearch.action.admin.indices.analyze.AnalyzeResponse;
import org.elasticsearch.action.admin.indices.analyze.AnalyzeResponse.AnalyzeToken;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.stereotype.Component;
import com.yoho.search.base.utils.FileUtils;
import com.yoho.search.base.utils.JaxbBinder;
import com.yoho.search.consumer.index.common.IIndexBuilder;
... ... @@ -17,28 +41,6 @@ import com.yoho.search.core.es.impl.YohoIndexHelper;
import com.yoho.search.core.es.model.ESBluk;
import com.yoho.search.core.es.model.SearchParam;
import com.yoho.search.core.es.model.SearchResult;
import org.apache.commons.lang.StringUtils;
import org.elasticsearch.action.admin.indices.analyze.AnalyzeResponse;
import org.elasticsearch.action.admin.indices.analyze.AnalyzeResponse.AnalyzeToken;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;
@Component
public class YohoIndexServiceImpl implements IYohoIndexService, ApplicationEventPublisherAware {
... ... @@ -55,6 +57,7 @@ public class YohoIndexServiceImpl implements IYohoIndexService, ApplicationEvent
private IYohoIndexDataLoader yohoIndexDataLoader;
@Autowired
private IndexRebuildListenerMgr indexRebuildListenerMgr;
ApplicationEventPublisher publisher;
@Override
... ... @@ -138,7 +141,8 @@ public class YohoIndexServiceImpl implements IYohoIndexService, ApplicationEvent
@Override
public String createIndex(final String yohoIndexName, final boolean force) throws Exception {
return this.createIndex(yohoIndexName, yohoIndexName, force);
String yohoIndexAliasName = yohoIndexName;
return this.createIndex(yohoIndexName, yohoIndexAliasName, force);
}
@Override
... ... @@ -164,7 +168,7 @@ public class YohoIndexServiceImpl implements IYohoIndexService, ApplicationEvent
client.removeAlias(oldRealIndexName, alias);
}
// 5、添加新索引别名
client.addAlias(alias, newRealIndexName);
client.addAlias(newRealIndexName, alias);
// 6、删除旧索引
if (oldRealIndexName != null) {
client.deleteIndex(oldRealIndexName);
... ... @@ -281,29 +285,31 @@ public class YohoIndexServiceImpl implements IYohoIndexService, ApplicationEvent
Map<String, List<String>> realNameToAliasesMap = client.getRealNameToAliasesMap();
// 2、获取真实的索引名称
String oldIndexId = yohoIndexHelper.getRealIndexName(yohoIndexName, realNameToAliasesMap);
String newIndexId = yohoIndexHelper.getRealIndexName(yohoTemplateIndexName, realNameToAliasesMap);
if (newIndexId == null) {
String oldIndexRealName = yohoIndexHelper.getRealIndexName(yohoIndexName, realNameToAliasesMap);
String newIndexRealName = yohoIndexHelper.getRealIndexName(yohoTemplateIndexName, realNameToAliasesMap);
if (newIndexRealName == null) {
return;
}
// 3、获取有货索引的别名
String yohoIndexAliasName = yohoIndexName;
// 4. 把老索引的索引名先删除
if (oldIndexId != null) {
client.removeAlias(oldIndexId, yohoIndexName);
if (oldIndexRealName != null) {
client.removeAlias(oldIndexRealName, yohoIndexAliasName);
}
// 5. 把新索引的别名全部删除
List<String> yohoTemplateIndexNameAlias = realNameToAliasesMap.get(newIndexId);
for (String alias : yohoTemplateIndexNameAlias) {
client.removeAlias(newIndexId, alias);
}
List<String> aliasList = realNameToAliasesMap.get(newIndexRealName);
String[] aliases = aliasList.toArray(new String[aliasList.size()]);
client.removeAlias(newIndexRealName, aliases);
// 6. 把老索引的索引名加到新索引上面
client.addAlias(yohoIndexName, newIndexId);
client.addAlias(newIndexRealName, yohoIndexAliasName);
// 7. 删除老索引
if (oldIndexId != null) {
client.deleteIndex(oldIndexId);
if (oldIndexRealName != null) {
client.deleteIndex(oldIndexRealName);
}
}
... ... @@ -419,22 +425,6 @@ public class YohoIndexServiceImpl implements IYohoIndexService, ApplicationEvent
client.addIndexData(realIndexName, yohoIndexName, id, data);
}
// @Override
// public void addIndexData(final String yohoIndexName, final
// List<Map<String, Object>> dataList) throws Exception {
// IYohoIndex index = this.nameToIndexMap.get(yohoIndexName);
// if (index == null) {
// return;
// }
// IElasticsearchClient client = index.getIndexClient();
// String realIndexName = yohoIndexHelper.getRealIndexName(yohoIndexName,
// client);
// if (StringUtils.isBlank(realIndexName)) {
// return;
// }
// client.addIndexData(realIndexName, yohoIndexName, dataList);
// }
@Override
public void deleteIndexData(final String yohoIndexName, final String id) throws Exception {
IYohoIndex index = this.nameToIndexMap.get(yohoIndexName);
... ... @@ -449,22 +439,6 @@ public class YohoIndexServiceImpl implements IYohoIndexService, ApplicationEvent
client.deleteIndexData(realIndexName, yohoIndexName, id);
}
// @Override
// public void deleteIndexDataByQuery(final String yohoIndexName, final
// String query) throws Exception {
// IYohoIndex index = this.nameToIndexMap.get(yohoIndexName);
// if (index == null) {
// return;
// }
// IElasticsearchClient client = index.getIndexClient();
// String realIndexName = yohoIndexHelper.getRealIndexName(yohoIndexName,
// client);
// if (StringUtils.isBlank(realIndexName)) {
// return;
// }
// client.deleteIndexDataByQuery(realIndexName, yohoIndexName, query);
// }
@Override
public void updateIndexData(final String yohoIndexName, final String id, final Object data) throws Exception {
IYohoIndex index = this.nameToIndexMap.get(yohoIndexName);
... ... @@ -566,6 +540,7 @@ public class YohoIndexServiceImpl implements IYohoIndexService, ApplicationEvent
if (index == null) {
return null;
}
rebuildFlagService.waitingRebuildingIndex();
IElasticsearchClient client = index.getIndexClient();
BulkResponse bulkResponse = client.bulk(esBluks);
if (bulkResponse.hasFailures()) {
... ...
package com.yoho.search.consumer.index.increment;
import com.alibaba.fastjson.JSONArray;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import com.yoho.error.event.SearchEvent;
... ... @@ -10,26 +24,13 @@ import com.yoho.search.base.utils.ISearchConstants;
import com.yoho.search.consumer.common.CostStatistics;
import com.yoho.search.consumer.index.increment.bulks.StorageSkuIndexBulkService;
import com.yoho.search.consumer.index.increment.rule.AbstractStorageRelatedMqListener;
import com.yoho.search.consumer.service.base.GoodsService;
import com.yoho.search.consumer.service.base.ProductService;
import com.yoho.search.consumer.service.base.StorageService;
import com.yoho.search.consumer.service.bo.ProductGoodsBO;
import com.yoho.search.consumer.service.logic.productIndex.ProductGoodsLogicService;
import com.yoho.search.consumer.service.logic.productIndex.StorageUpdateTimeLogicService;
import com.yoho.search.core.es.utils.IgnoreSomeException;
import com.yoho.search.dal.model.Product;
import com.yoho.search.dal.model.Storage;
import com.yoho.search.dal.model.StorageUpdateTime;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.*;
@Component
public class StorageMqListener extends AbstractStorageRelatedMqListener implements ChannelAwareMessageListener {
... ...
package com.yoho.search.consumer.index.increment;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import com.yoho.error.event.SearchEvent;
... ... @@ -14,20 +29,6 @@ import com.yoho.search.consumer.service.logic.TblShopsLogicService;
import com.yoho.search.core.es.model.ESBluk;
import com.yoho.search.core.es.utils.IgnoreSomeException;
import com.yoho.search.dal.model.TblBrand;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
/**
* Created by wangnan on 2016/12/16.
... ...
... ... @@ -27,11 +27,11 @@ public class RebuildFlagService {
rebuildFlag.setRebuilding(true);
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
}finally{
writeLock.unlock();
}
}
public void updateIsBuildingFalse() {
try {
writeLock.lock();
... ... @@ -39,7 +39,7 @@ public class RebuildFlagService {
rebuildFlag.setRebuilding(false);
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
}finally{
writeLock.unlock();
}
}
... ...