|
|
/**
|
|
|
* Project: petstore-dal
|
|
|
*
|
|
|
* File Created at 2015年11月4日
|
|
|
* $Id$
|
|
|
*
|
|
|
* Copyright 2015 Yoho.cn Croporation Limited.
|
|
|
* All rights reserved.
|
|
|
*
|
|
|
* This software is the confidential and proprietary information of
|
|
|
* Yoho Company. ("Confidential Information"). You shall not
|
|
|
* disclose such Confidential Information and shall use it only in
|
|
|
* accordance with the terms of the license agreement you entered into
|
|
|
* with Yoho.cn.
|
|
|
*/
|
|
|
package com.taobao.sample.petstore.dal.elasticsearch.client.impl;
|
|
|
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Date;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
import java.util.regex.Pattern;
|
|
|
|
|
|
import org.apache.commons.lang.time.DateFormatUtils;
|
|
|
import org.elasticsearch.action.ActionFuture;
|
|
|
import org.elasticsearch.action.ActionResponse;
|
|
|
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
|
|
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
|
|
|
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
|
|
|
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
|
|
|
import org.elasticsearch.action.admin.indices.close.CloseIndexResponse;
|
|
|
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
|
|
|
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
|
|
|
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
|
|
|
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
|
|
|
import org.elasticsearch.action.admin.indices.optimize.OptimizeResponse;
|
|
|
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
|
|
|
import org.elasticsearch.action.bulk.BulkRequest;
|
|
|
import org.elasticsearch.action.bulk.BulkResponse;
|
|
|
import org.elasticsearch.action.delete.DeleteRequest;
|
|
|
import org.elasticsearch.action.delete.DeleteResponse;
|
|
|
import org.elasticsearch.action.deletebyquery.DeleteByQueryRequest;
|
|
|
import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
|
|
|
import org.elasticsearch.action.index.IndexRequest;
|
|
|
import org.elasticsearch.action.index.IndexResponse;
|
|
|
import org.elasticsearch.action.update.UpdateRequestBuilder;
|
|
|
import org.elasticsearch.action.update.UpdateResponse;
|
|
|
import org.elasticsearch.client.Client;
|
|
|
import org.elasticsearch.client.ClusterAdminClient;
|
|
|
import org.elasticsearch.client.IndicesAdminClient;
|
|
|
import org.elasticsearch.client.Requests;
|
|
|
import org.elasticsearch.cluster.ClusterState;
|
|
|
import org.elasticsearch.cluster.metadata.AliasMetaData;
|
|
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
|
|
import org.elasticsearch.cluster.metadata.MetaData;
|
|
|
import org.elasticsearch.common.settings.ImmutableSettings;
|
|
|
|
|
|
import com.alibaba.fastjson.JSON;
|
|
|
import com.alibaba.fastjson.JSONObject;
|
|
|
import com.taobao.sample.petstore.dal.elasticsearch.callback.ClientCallback;
|
|
|
import com.taobao.sample.petstore.dal.elasticsearch.callback.ClusterCallback;
|
|
|
import com.taobao.sample.petstore.dal.elasticsearch.callback.NodeCallback;
|
|
|
import com.taobao.sample.petstore.dal.elasticsearch.client.ElasticAdminClientTemplate;
|
|
|
|
|
|
/**
|
|
|
* TODO Comment of ElasticAdminClientImpl
|
|
|
*
|
|
|
* @author ibm
|
|
|
* @version $Id: ElasticAdminClientImpl.java 2015年11月4日 上午2:09:11 $
|
|
|
*/
|
|
|
public class ElasticAdminClientTemplateImpl extends ElasticClientTemplateImpl implements ElasticAdminClientTemplate {
|
|
|
public ElasticAdminClientTemplateImpl(Client client) {
|
|
|
super(client);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 判断索引是否存在
|
|
|
*
|
|
|
* @param indexName 索引名
|
|
|
* @return
|
|
|
*/
|
|
|
@Override
|
|
|
public boolean indexExists(String indexName) {
|
|
|
boolean exists = false;
|
|
|
final String indexId = this.getIndexId(indexName);
|
|
|
|
|
|
if (indexId != null) {
|
|
|
// 等待服务集群就绪
|
|
|
executeGet(new ClusterCallback<ClusterHealthResponse>() {
|
|
|
@Override
|
|
|
public ActionFuture<ClusterHealthResponse> execute(final ClusterAdminClient admin) {
|
|
|
return admin.health(Requests.clusterHealthRequest().waitForStatus(ClusterHealthStatus.YELLOW));
|
|
|
}
|
|
|
});
|
|
|
|
|
|
// 获取服务状态
|
|
|
final IndicesExistsResponse response = executeGet(new NodeCallback<IndicesExistsResponse>() {
|
|
|
@Override
|
|
|
public ActionFuture<IndicesExistsResponse> execute(final IndicesAdminClient admin) {
|
|
|
return admin.exists(Requests.indicesExistsRequest(indexId));
|
|
|
}
|
|
|
});
|
|
|
exists = response.isExists();
|
|
|
}
|
|
|
|
|
|
return exists;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 判断索引是否健康
|
|
|
*
|
|
|
* @param indexName 索引名称
|
|
|
* @return
|
|
|
*/
|
|
|
public boolean indexHealth(final String indexName) {
|
|
|
boolean health = true;
|
|
|
String indexId = this.getIndexId(indexName);
|
|
|
|
|
|
if (indexId != null) {
|
|
|
|
|
|
// 健康状况
|
|
|
final ClusterHealthResponse response = executeGet(new ClusterCallback<ClusterHealthResponse>() {
|
|
|
@Override
|
|
|
public ActionFuture<ClusterHealthResponse> execute(final ClusterAdminClient admin) {
|
|
|
return admin.health(Requests.clusterHealthRequest().timeout("10s"));
|
|
|
}
|
|
|
});
|
|
|
if (response.getIndices().get(indexId).getStatus().equals(ClusterHealthStatus.RED)) {
|
|
|
health = false;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
return health;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 创建索引
|
|
|
*
|
|
|
* @param indexName 索引别名
|
|
|
* @param properties 索引配置
|
|
|
* @param force 是否强制创建索引,不论索引是否已经存在
|
|
|
*/
|
|
|
@Override
|
|
|
public void createIndex(final String indexName, final Map<String, String> properties, final String mappingContent, boolean force) {
|
|
|
String oldIndexId = this.getIndexId(indexName);
|
|
|
final String indexId;
|
|
|
|
|
|
if (oldIndexId == null || force) {
|
|
|
// 旧索引不存在 OR 旧索引已存在但强制重建
|
|
|
|
|
|
indexId = INDEX_ID_PREFIX + indexName + "_" + DateFormatUtils.format(new Date(), "yyyyMMddHHmmssSSS");
|
|
|
} else {
|
|
|
indexId = null;
|
|
|
}
|
|
|
|
|
|
if (indexId != null) {
|
|
|
// 创建新索引
|
|
|
executeGet(new NodeCallback<CreateIndexResponse>() {
|
|
|
@Override
|
|
|
public ActionFuture<CreateIndexResponse> execute(final IndicesAdminClient admin) {
|
|
|
Map<String, String> settings = new HashMap<String, String>();
|
|
|
settings.put("number_of_shards", properties.get("number_of_shards"));
|
|
|
settings.put("number_of_replicas", properties.get("number_of_replicas"));
|
|
|
settings.put("refresh_interval", properties.get("refresh_interval"));
|
|
|
settings.put("translog.flush_threshold_ops", properties.get("translog.flush_threshold_ops"));
|
|
|
logger.info("[client={}] [indexName={}] [indexId={}] [props={}] [mapping={}]创建索引", new Object[] { name, indexName, indexId, properties, mappingContent });
|
|
|
return admin.create(Requests.createIndexRequest(indexId).settings(settings).mapping(getIndexDataType(indexName), mappingContent));
|
|
|
}
|
|
|
});
|
|
|
|
|
|
if (oldIndexId != null) {
|
|
|
// 删除旧索引名称
|
|
|
this.deleteIndexName(oldIndexId, indexName);
|
|
|
}
|
|
|
|
|
|
// 建立别名
|
|
|
this.createIndexName(indexId, indexName);
|
|
|
|
|
|
if (oldIndexId != null) {
|
|
|
// 删除旧索引
|
|
|
this.deleteIndex(oldIndexId);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void createIndex(final String indexName, final String tempIndexName, final Map<String, String> properties, final String mappingContent, boolean force) {
|
|
|
String oldIndexId = this.getIndexId(indexName);
|
|
|
final String indexId;
|
|
|
|
|
|
if (oldIndexId == null || force) {
|
|
|
// 旧索引不存在 OR 旧索引已存在但强制重建
|
|
|
indexId = INDEX_ID_PREFIX + tempIndexName + "_" + DateFormatUtils.format(new Date(), "yyyyMMddHHmmssSSS");
|
|
|
} else {
|
|
|
indexId = null;
|
|
|
}
|
|
|
|
|
|
if (indexId != null) {
|
|
|
// 创建新索引
|
|
|
executeGet(new NodeCallback<CreateIndexResponse>() {
|
|
|
@Override
|
|
|
public ActionFuture<CreateIndexResponse> execute(final IndicesAdminClient admin) {
|
|
|
Map<String, String> settings = new HashMap<String, String>();
|
|
|
settings.put("number_of_shards", properties.get("number_of_shards"));
|
|
|
settings.put("number_of_replicas", properties.get("number_of_replicas"));
|
|
|
settings.put("refresh_interval", properties.get("refresh_interval"));
|
|
|
settings.put("translog.flush_threshold_ops", properties.get("translog.flush_threshold_ops"));
|
|
|
logger.info("[client={}] [indexName={}] [indexId={}] [props={}] [mapping={}]创建索引", new Object[] { name, indexName, indexId, properties, mappingContent });
|
|
|
return admin.create(Requests.createIndexRequest(indexId).settings(settings).mapping(getIndexDataType(tempIndexName), mappingContent));
|
|
|
}
|
|
|
});
|
|
|
|
|
|
if (oldIndexId != null) {
|
|
|
// 删除旧索引名称
|
|
|
this.deleteIndexName(oldIndexId, tempIndexName);
|
|
|
}
|
|
|
|
|
|
// 建立别名
|
|
|
this.createIndexName(indexId, indexName);
|
|
|
|
|
|
if (oldIndexId != null) {
|
|
|
// 删除旧索引
|
|
|
this.deleteIndex(oldIndexId);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 删除索引
|
|
|
*
|
|
|
* @param indexName 索引名称
|
|
|
*/
|
|
|
@Override
|
|
|
public void deleteIndex(final String indexName) {
|
|
|
final String indexId = this.getIndexId(indexName);
|
|
|
if (indexId != null) {
|
|
|
executeGet(new NodeCallback<DeleteIndexResponse>() {
|
|
|
@Override
|
|
|
public ActionFuture<DeleteIndexResponse> execute(final IndicesAdminClient admin) {
|
|
|
return admin.delete(Requests.deleteIndexRequest(indexId));
|
|
|
}
|
|
|
});
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 更新索引缓存
|
|
|
*
|
|
|
* @param indexName 索引名称
|
|
|
*/
|
|
|
@Override
|
|
|
public void refreshIndex(final String indexName) {
|
|
|
final String indexId = this.getIndexId(indexName);
|
|
|
if (indexId != null) {
|
|
|
executeGet(new NodeCallback<RefreshResponse>() {
|
|
|
@Override
|
|
|
public ActionFuture<RefreshResponse> execute(final IndicesAdminClient admin) {
|
|
|
return admin.refresh(Requests.refreshRequest(indexId));
|
|
|
}
|
|
|
});
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 关闭索引
|
|
|
*
|
|
|
* @param indexName 索引名称
|
|
|
*/
|
|
|
@Override
|
|
|
public void closeIndex(final String indexName) {
|
|
|
final String indexId = this.getIndexId(indexName);
|
|
|
if (indexId != null) {
|
|
|
executeGet(new NodeCallback<CloseIndexResponse>() {
|
|
|
@Override
|
|
|
public ActionFuture<CloseIndexResponse> execute(final IndicesAdminClient admin) {
|
|
|
return admin.close(Requests.closeIndexRequest(indexId));
|
|
|
}
|
|
|
});
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 刷新索引文件
|
|
|
*
|
|
|
* @param indexName 索引名称
|
|
|
*/
|
|
|
@Override
|
|
|
public void flushIndex(final String indexName) {
|
|
|
final String indexId = this.getIndexId(indexName);
|
|
|
if (indexId != null) {
|
|
|
executeGet(new NodeCallback<FlushResponse>() {
|
|
|
@Override
|
|
|
public ActionFuture<FlushResponse> execute(final IndicesAdminClient admin) {
|
|
|
return admin.flush(Requests.flushRequest(indexId));
|
|
|
}
|
|
|
});
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 索引替换:将老索引的索引名加到新索引上,并删除老索引
|
|
|
*
|
|
|
* @param oldIndexName 旧索引名
|
|
|
* @param newIndexName 新索引名
|
|
|
*/
|
|
|
public void replaceIndex(final String oldIndexName, final String newIndexName) {
|
|
|
String oldIndexId = this.getIndexId(oldIndexName);
|
|
|
String newIndexId = this.getIndexId(newIndexName);
|
|
|
if (newIndexId != null) {
|
|
|
if (oldIndexId != null) {
|
|
|
// 1. 把老索引的索引名先删除
|
|
|
this.deleteIndexName(oldIndexId, oldIndexName);
|
|
|
}
|
|
|
// 2. 把新索引的索引名删除
|
|
|
this.deleteIndexName(newIndexId, newIndexName);
|
|
|
// 3. 把老索引的索引名加到新索引上面
|
|
|
this.createIndexName(newIndexId, oldIndexName);
|
|
|
if (oldIndexId != null) {
|
|
|
// 4. 删除老索引
|
|
|
this.deleteIndex(oldIndexId);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void deleteTempIndex(String indexName) {
|
|
|
// 获取系统实际存在的索引名列表
|
|
|
ClusterState clusterState = client.admin().cluster().prepareState().execute().actionGet().getState();
|
|
|
String[] indices = clusterState.metaData().indices().keys().toArray(String.class);
|
|
|
|
|
|
ClusterStateRequest clusterStateRequest = Requests.clusterStateRequest().routingTable(true).nodes(true).indices(indices);
|
|
|
MetaData md = client.admin().cluster().state(clusterStateRequest).actionGet(30000).getState().getMetaData();
|
|
|
|
|
|
String regEx = "^" + INDEX_ID_PREFIX + indexName + "_(\\d){13}_(\\d){17}$";
|
|
|
Pattern p = Pattern.compile(regEx);
|
|
|
for (IndexMetaData imd : md) {
|
|
|
if (p.matcher(imd.index()).find()) {
|
|
|
boolean validate = false;
|
|
|
for (AliasMetaData amd : imd.aliases().values().toArray(AliasMetaData.class)) {
|
|
|
if (indexName.equals(amd.alias())) {
|
|
|
validate = true;
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
|
if (!validate) {
|
|
|
this.deleteIndex(imd.index());
|
|
|
logger.info("[client={}] 删除遗留的临时索引:[id={}]", name, imd.index());
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 执行索引优化
|
|
|
*
|
|
|
* @param indexName
|
|
|
*/
|
|
|
@Override
|
|
|
public void optimize(String indexName) {
|
|
|
final String indexId = this.getIndexId(indexName);
|
|
|
|
|
|
if (indexId != null) {
|
|
|
executeGet(new NodeCallback<OptimizeResponse>() {
|
|
|
@Override
|
|
|
public ActionFuture<OptimizeResponse> execute(final IndicesAdminClient admin) {
|
|
|
return admin.optimize(Requests.optimizeRequest(indexId));
|
|
|
}
|
|
|
});
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 添加索引数据
|
|
|
*
|
|
|
* @param indexName 索引名称
|
|
|
* @param id 索引id
|
|
|
* @param data 索引数据
|
|
|
*/
|
|
|
@Override
|
|
|
public IndexResponse addIndexData(final String indexName, final String id, final Object data) {
|
|
|
final String indexId = this.getIndexId(indexName);
|
|
|
if (indexId != null) {
|
|
|
final IndexResponse response = executeGet(new ClientCallback<IndexResponse>() {
|
|
|
@Override
|
|
|
public ActionFuture<IndexResponse> execute(final Client client) {
|
|
|
String source = JSONObject.toJSONString(data);
|
|
|
final IndexRequest request = Requests.indexRequest(indexId).type(getIndexDataType(indexName)).id(id).source(source);
|
|
|
return client.index(request);
|
|
|
}
|
|
|
});
|
|
|
return response;
|
|
|
}
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 添加索引数据
|
|
|
*
|
|
|
* @param indexName 索引名称
|
|
|
* @param datas 索引数据列表
|
|
|
*/
|
|
|
@Override
|
|
|
public BulkResponse addIndexData(final String indexName, final List<?> datas) {
|
|
|
if (datas != null && datas.size() > 0) {
|
|
|
final String indexId = this.getIndexId(indexName);
|
|
|
if (indexId != null) {
|
|
|
final BulkResponse response = executeGet(new ClientCallback<BulkResponse>() {
|
|
|
@Override
|
|
|
public ActionFuture<BulkResponse> execute(final Client client) {
|
|
|
final BulkRequest request = Requests.bulkRequest();
|
|
|
for (Object data : datas) {
|
|
|
request.add(Requests.indexRequest(indexId).type(getIndexDataType(indexName)).source((Map<?, ?>) data));
|
|
|
}
|
|
|
ActionFuture<BulkResponse> future = client.bulk(request);
|
|
|
return future;
|
|
|
}
|
|
|
});
|
|
|
if (response.hasFailures()) {
|
|
|
logger.error("[更新索引异常:{}]", response.buildFailureMessage());
|
|
|
}
|
|
|
|
|
|
return response;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 添加索引数据
|
|
|
*
|
|
|
* @param indexName 索引名称
|
|
|
* @param datas 索引数据列表
|
|
|
*/
|
|
|
@Override
|
|
|
public BulkResponse addIndexDataBean(final String indexName, final List<?> datas) {
|
|
|
if (datas != null && datas.size() > 0) {
|
|
|
final String indexId = this.getIndexId(indexName);
|
|
|
if (indexId != null) {
|
|
|
final BulkResponse response = executeGet(new ClientCallback<BulkResponse>() {
|
|
|
@Override
|
|
|
public ActionFuture<BulkResponse> execute(final Client client) {
|
|
|
final BulkRequest request = Requests.bulkRequest();
|
|
|
for (Object data : datas) {
|
|
|
request.add(Requests.indexRequest(indexId).type(getIndexDataType(indexName)).source(JSON.toJSONString(data)));
|
|
|
}
|
|
|
ActionFuture<BulkResponse> future = client.bulk(request);
|
|
|
return future;
|
|
|
}
|
|
|
});
|
|
|
if (response.hasFailures()) {
|
|
|
logger.error("[更新索引异常:{}]", response.buildFailureMessage());
|
|
|
}
|
|
|
|
|
|
return response;
|
|
|
}
|
|
|
|
|
|
}
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 删除索引数据
|
|
|
*
|
|
|
* @param indexName
|
|
|
*/
|
|
|
@Override
|
|
|
public DeleteResponse deleteIndexData(final String indexName, final String id) {
|
|
|
final String indexId = this.getIndexId(indexName);
|
|
|
if (indexId != null) {
|
|
|
final DeleteResponse response = executeGet(new ClientCallback<DeleteResponse>() {
|
|
|
@Override
|
|
|
public ActionFuture<DeleteResponse> execute(final Client client) {
|
|
|
final DeleteRequest request = Requests.deleteRequest(indexId).type(getIndexDataType(indexName)).id(id);
|
|
|
return client.delete(request);
|
|
|
}
|
|
|
});
|
|
|
|
|
|
return response;
|
|
|
}
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public DeleteByQueryResponse deleteIndexDataByQuery(final String indexName, final String query) {
|
|
|
final String indexId = this.getIndexId(indexName);
|
|
|
if (indexId != null) {
|
|
|
final DeleteByQueryResponse response = executeGet(new ClientCallback<DeleteByQueryResponse>() {
|
|
|
@Override
|
|
|
public ActionFuture<DeleteByQueryResponse> execute(Client client) {
|
|
|
final DeleteByQueryRequest request = Requests.deleteByQueryRequest(indexId).types(getIndexDataType(indexName)).source(query);
|
|
|
return client.deleteByQuery(request);
|
|
|
}
|
|
|
});
|
|
|
|
|
|
return response;
|
|
|
}
|
|
|
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 更新索引数据
|
|
|
*
|
|
|
* @param indexName
|
|
|
*/
|
|
|
@Override
|
|
|
public UpdateResponse updateIndexData(final String indexName, final String id, final Object data) {
|
|
|
final String indexId = this.getIndexId(indexName);
|
|
|
if (indexId != null) {
|
|
|
final UpdateResponse response = executeGet(new ClientCallback<UpdateResponse>() {
|
|
|
@Override
|
|
|
public ActionFuture<UpdateResponse> execute(Client client) {
|
|
|
UpdateRequestBuilder request = new UpdateRequestBuilder(client, indexId, getIndexDataType(indexName), id);
|
|
|
String source = JSONObject.toJSONString(data);
|
|
|
request.setDoc(source).setUpsert(source);
|
|
|
return request.execute();
|
|
|
}
|
|
|
});
|
|
|
|
|
|
return response;
|
|
|
}
|
|
|
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 更新索引数据
|
|
|
*
|
|
|
* @param indexName
|
|
|
*/
|
|
|
@Override
|
|
|
public UpdateResponse updateIndexDataStr(final String indexName, final String id, final String jsonData) {
|
|
|
final String indexId = this.getIndexId(indexName);
|
|
|
if (indexId != null) {
|
|
|
final UpdateResponse response = executeGet(new ClientCallback<UpdateResponse>() {
|
|
|
@Override
|
|
|
public ActionFuture<UpdateResponse> execute(Client client) {
|
|
|
UpdateRequestBuilder request = new UpdateRequestBuilder(client, indexId, getIndexDataType(indexName), id);
|
|
|
request.setDoc(jsonData).setUpsert(jsonData);
|
|
|
return request.execute();
|
|
|
}
|
|
|
});
|
|
|
return response;
|
|
|
}
|
|
|
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public BulkResponse bulkUpdateIndexData(final String indexName, @SuppressWarnings("rawtypes") final List<Map> datas) {
|
|
|
final String indexId = this.getIndexId(indexName);
|
|
|
if (indexId != null) {
|
|
|
final BulkResponse response = executeGet(new ClientCallback<BulkResponse>() {
|
|
|
@Override
|
|
|
public ActionFuture<BulkResponse> execute(final Client client) {
|
|
|
final BulkRequest request = Requests.bulkRequest();
|
|
|
if (datas != null) {
|
|
|
for (Map<?, ?> data : datas) {
|
|
|
request.add(Requests.indexRequest(indexId).type(getIndexDataType(indexName)).source(data));
|
|
|
}
|
|
|
}
|
|
|
ActionFuture<BulkResponse> future = client.bulk(request);
|
|
|
return future;
|
|
|
}
|
|
|
});
|
|
|
|
|
|
if (response.hasFailures()) {
|
|
|
logger.error("[更新索引异常:{}]", response.buildFailureMessage());
|
|
|
}
|
|
|
return response;
|
|
|
}
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
// 执行集群操作请求
|
|
|
@SuppressWarnings("unchecked")
|
|
|
public <T extends ActionResponse> T executeGet(final ClusterCallback<T> callback) {
|
|
|
final ClusterAdminClient clusterAdmin = client.admin().cluster();
|
|
|
final ActionFuture<?> action = callback.execute(clusterAdmin);
|
|
|
final T response = (T) action.actionGet();
|
|
|
return response;
|
|
|
}
|
|
|
|
|
|
// 执行索引操作请求
|
|
|
@SuppressWarnings("unchecked")
|
|
|
public <T extends ActionResponse> T executeGet(final NodeCallback<T> callback) {
|
|
|
final IndicesAdminClient indicesAdmin = client.admin().indices();
|
|
|
final ActionFuture<?> action = callback.execute(indicesAdmin);
|
|
|
final T response = (T) action.actionGet();
|
|
|
|
|
|
return response;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 为索引创建名称
|
|
|
*
|
|
|
* @param indexId
|
|
|
* @param indexName
|
|
|
*/
|
|
|
public void createIndexName(String indexId, String indexName) {
|
|
|
// 检查别名是否存在
|
|
|
if (this.getIndexId(indexName) == null) {
|
|
|
IndicesAliasesRequest req = new IndicesAliasesRequest();
|
|
|
req.addAlias(indexName, indexId);
|
|
|
client.admin().indices().aliases(req).actionGet(30000L);
|
|
|
logger.info("[client=" + name + "] 创建索引[indexId=" + indexId + "]的别名[indexName=" + indexName + "]");
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 为索引删除名称
|
|
|
*
|
|
|
* @param indexId
|
|
|
* @param indexName
|
|
|
*/
|
|
|
public void deleteIndexName(String indexId, String indexName) {
|
|
|
// 检查别名是否存在
|
|
|
if (this.getIndexId(indexName) != null) {
|
|
|
IndicesAliasesRequest req = new IndicesAliasesRequest();
|
|
|
req.removeAlias(indexId, indexName);
|
|
|
client.admin().indices().aliases(req).actionGet(30000L);
|
|
|
logger.info("[client=" + name + "] 删除索引[indexId=" + indexId + "]的别名[indexName=" + indexName + "]");
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 获取索引唯一ID标识
|
|
|
*
|
|
|
* @param indexName
|
|
|
* @return
|
|
|
*/
|
|
|
public String getIndexId(String indexName) {
|
|
|
String indexId = null;
|
|
|
|
|
|
if (indexName != null) {
|
|
|
if (indexName.startsWith(INDEX_ID_PREFIX)) {
|
|
|
// 兼容输入的索引名称是索引ID的情况
|
|
|
indexId = indexName;
|
|
|
} else {
|
|
|
indexId = this.getIndexNameToIdMap().get(indexName);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
return indexId;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 获取索引名称和索引ID的对应关系
|
|
|
*
|
|
|
* @return
|
|
|
*/
|
|
|
public Map<String, String> getIndexNameToIdMap() {
|
|
|
// 查询所有别名
|
|
|
Map<String, String> map = getIndexIdToNameMap();
|
|
|
Map<String, String> indexNameToIdMap = new HashMap<String, String>();
|
|
|
for (String idx : map.keySet()) {
|
|
|
indexNameToIdMap.put(map.get(idx), idx);
|
|
|
}
|
|
|
return indexNameToIdMap;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 获取索引ID和索引名称的对应关系
|
|
|
*
|
|
|
* @return
|
|
|
*/
|
|
|
public Map<String, String> getIndexIdToNameMap() {
|
|
|
// 获取系统实际存在的索引名列表
|
|
|
ClusterState clusterState = client.admin().cluster().prepareState().execute().actionGet().getState();
|
|
|
String[] indices = clusterState.metaData().indices().keys().toArray(String.class);
|
|
|
|
|
|
Map<String, String> indexIdToNameMap = new HashMap<String, String>();
|
|
|
|
|
|
ClusterStateRequest clusterStateRequest = Requests.clusterStateRequest().routingTable(true).nodes(true).indices(indices);
|
|
|
MetaData md = client.admin().cluster().state(clusterStateRequest).actionGet(30000).getState().metaData();
|
|
|
|
|
|
for (IndexMetaData imd : md) {
|
|
|
for (AliasMetaData amd : imd.aliases().values().toArray(AliasMetaData.class)) {
|
|
|
indexIdToNameMap.put(imd.index(), amd.alias());
|
|
|
}
|
|
|
}
|
|
|
return indexIdToNameMap;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 通过索引名称获取索引数据类型 标准索引名称中不含下划线,临时索引名称含义下划线,以下划线分割,前半部为标准索引名称,后半部为时间戳 参考方法
|
|
|
* generateTempIndexName 在本系统中,索引数据类型与标准索引名称保持一致,因此如果为临时索引名称,取前半部
|
|
|
*
|
|
|
* @param indexName
|
|
|
* @return
|
|
|
*/
|
|
|
public String getIndexDataType(String indexName) {
|
|
|
return indexName.split("_")[0];
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 根据indexName找到所有相关的
|
|
|
*
|
|
|
* @return
|
|
|
*/
|
|
|
public List<String> getRelatedIndexs(String name) {
|
|
|
List<String> result = new ArrayList<String>();
|
|
|
Map<String, String> idToNameMap = getIndexIdToNameMap();
|
|
|
Set<String> nameSet = idToNameMap.keySet();
|
|
|
String regEx = "^" + INDEX_ID_PREFIX + name + "_(\\d){13}_(\\d){17}$";
|
|
|
Pattern p = Pattern.compile(regEx);
|
|
|
for (String indexId : nameSet) {
|
|
|
if (p.matcher(indexId).find()) {
|
|
|
result.add(idToNameMap.get(indexId));
|
|
|
}
|
|
|
}
|
|
|
return result;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void setIndexSettings(String indexName, Map<String, String> settings) {
|
|
|
String indexId = this.getIndexId(indexName);
|
|
|
if (indexId != null && settings != null && settings.size() > 0) {
|
|
|
ImmutableSettings.Builder updateSettings = ImmutableSettings.settingsBuilder();
|
|
|
Set<String> keySet = settings.keySet();
|
|
|
for (String key : keySet) {
|
|
|
updateSettings.put(key, settings.get(key));
|
|
|
}
|
|
|
client.admin().indices().prepareUpdateSettings(indexId).setSettings(updateSettings).execute().actionGet();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public Map<String, String> optimumSettingsForBulkIndexing(String indexName) {
|
|
|
String indexId = this.getIndexId(indexName);
|
|
|
IndexMetaData indexMetaData = client.admin().cluster().prepareState().execute().actionGet().getState().metaData().index(indexId);
|
|
|
Map<String, String> map = new HashMap<String, String>();
|
|
|
map.put("index.refresh_interval", indexMetaData.settings().get("index.refresh_interval"));
|
|
|
map.put("index.number_of_replicas", indexMetaData.settings().get("index.number_of_replicas"));
|
|
|
map.put("index.translog.flush_threshold_ops", indexMetaData.settings().get("index.translog.flush_threshold_ops"));
|
|
|
|
|
|
ImmutableSettings.Builder updateSettings = ImmutableSettings.settingsBuilder();
|
|
|
updateSettings.put("index.refresh_interval", -1);
|
|
|
updateSettings.put("index.number_of_replicas", 0);
|
|
|
updateSettings.put("index.translog.flush_threshold_ops", 100000);
|
|
|
client.admin().indices().prepareUpdateSettings(indexId).setSettings(updateSettings).execute().actionGet();
|
|
|
return map;
|
|
|
}
|
|
|
|
|
|
} |
...
|
...
|
|