Authored by hugufei

使用ip2IndexInfo实现简单版本的_cat/shards功能

1 -package com.yoho.search.consumer.common;  
2 -  
3 -import java.util.List;  
4 -  
5 -import org.elasticsearch.action.admin.indices.analyze.AnalyzeResponse.AnalyzeToken;  
6 -import org.elasticsearch.action.bulk.BulkResponse;  
7 -  
8 -import com.yoho.search.core.es.model.ESBluk;  
9 -import com.yoho.search.core.es.model.SearchParam;  
10 -import com.yoho.search.core.es.model.SearchResult;  
11 -  
12 -/**  
13 - * 索引管理接口: 管理所有的索引和索引客户端,以及对外提供索引操作接口  
14 - */  
15 -public interface IYohoIndexService {  
16 -  
17 - /**  
18 - * 检查某索引的健康状态  
19 - *  
20 - * @param indexName  
21 - * @return  
22 - */  
23 - boolean checkHealth(String indexName);  
24 -  
25 - /**  
26 - * 通过索引名获取索引信息  
27 - *  
28 - * @param yohoIndexName 索引名称  
29 - * @return  
30 - */  
31 - IYohoIndex getIndex(String yohoIndexName);  
32 -  
33 -  
34 - /**  
35 - * 执行索引重建  
36 - *  
37 - * @param yohoIndexName  
38 - */  
39 - void rebuild(final String yohoIndexName);  
40 -  
41 - /**  
42 - * 添加索引数据  
43 - *  
44 - * @param yohoIndexName 索引名称  
45 - * @param data 索引数据  
46 - */  
47 - void addIndexData(String yohoIndexName, String id, Object data) throws Exception;  
48 -  
49 - /**  
50 - * 删除索引数据  
51 - *  
52 - * @param yohoIndexName  
53 - */  
54 - void deleteIndexData(String yohoIndexName, String id) throws Exception;  
55 -  
56 - /**  
57 - * 更新索引数据  
58 - *  
59 - * @param yohoIndexName  
60 - */  
61 - void updateIndexData(final String yohoIndexName, final String id, final Object data) throws Exception;  
62 -  
63 - /**  
64 - * 执行搜索  
65 - *  
66 - * @param yohoIndexName  
67 - * @param searchParam  
68 - * @return  
69 - */  
70 - SearchResult search(final String yohoIndexName, SearchParam searchParam);  
71 -  
72 - /**  
73 - * 执行搜索  
74 - *  
75 - * @param yohoIndexName  
76 - * @param searchParams  
77 - * @return  
78 - */  
79 - List<SearchResult> multiSearch(final String yohoIndexName, List<SearchParam> searchParams);  
80 -  
81 - /**  
82 - * 获取分词结果  
83 - *  
84 - * @param yohoIndexName  
85 - * @param text  
86 - * @return MultiGetResponse  
87 - */  
88 - List<AnalyzeToken> getAnalyzeTokens(final String yohoIndexName, String text, String analyzer);  
89 -  
90 - /**  
91 - * 批量删除或者更新插入索引  
92 - *  
93 - * @param esBluks  
94 - * @return  
95 - */  
96 - BulkResponse bulk(List<ESBluk> esBluks);  
97 -  
98 -} 1 +package com.yoho.search.consumer.common;
  2 +
  3 +import java.util.List;
  4 +
  5 +import com.yoho.search.core.es.IElasticsearchClient;
  6 +import org.elasticsearch.action.admin.indices.analyze.AnalyzeResponse.AnalyzeToken;
  7 +import org.elasticsearch.action.bulk.BulkResponse;
  8 +
  9 +import com.yoho.search.core.es.model.ESBluk;
  10 +import com.yoho.search.core.es.model.SearchParam;
  11 +import com.yoho.search.core.es.model.SearchResult;
  12 +
  13 +/**
  14 + * 索引管理接口: 管理所有的索引和索引客户端,以及对外提供索引操作接口
  15 + */
  16 +public interface IYohoIndexService {
  17 +
  18 + /**
  19 + * 获取集群客户端
  20 + *
  21 + * @return
  22 + */
  23 + IElasticsearchClient getElasticsearchClient(String indexName);
  24 +
  25 + /**
  26 + * 检查某索引的健康状态
  27 + *
  28 + * @param indexName
  29 + * @return
  30 + */
  31 + boolean checkHealth(String indexName);
  32 +
  33 + /**
  34 + * 通过索引名获取索引信息
  35 + *
  36 + * @param yohoIndexName 索引名称
  37 + * @return
  38 + */
  39 + IYohoIndex getIndex(String yohoIndexName);
  40 +
  41 +
  42 + /**
  43 + * 执行索引重建
  44 + *
  45 + * @param yohoIndexName
  46 + */
  47 + void rebuild(final String yohoIndexName);
  48 +
  49 + /**
  50 + * 添加索引数据
  51 + *
  52 + * @param yohoIndexName 索引名称
  53 + * @param data 索引数据
  54 + */
  55 + void addIndexData(String yohoIndexName, String id, Object data) throws Exception;
  56 +
  57 + /**
  58 + * 删除索引数据
  59 + *
  60 + * @param yohoIndexName
  61 + */
  62 + void deleteIndexData(String yohoIndexName, String id) throws Exception;
  63 +
  64 + /**
  65 + * 更新索引数据
  66 + *
  67 + * @param yohoIndexName
  68 + */
  69 + void updateIndexData(final String yohoIndexName, final String id, final Object data) throws Exception;
  70 +
  71 + /**
  72 + * 执行搜索
  73 + *
  74 + * @param yohoIndexName
  75 + * @param searchParam
  76 + * @return
  77 + */
  78 + SearchResult search(final String yohoIndexName, SearchParam searchParam);
  79 +
  80 + /**
  81 + * 执行搜索
  82 + *
  83 + * @param yohoIndexName
  84 + * @param searchParams
  85 + * @return
  86 + */
  87 + List<SearchResult> multiSearch(final String yohoIndexName, List<SearchParam> searchParams);
  88 +
  89 + /**
  90 + * 获取分词结果
  91 + *
  92 + * @param yohoIndexName
  93 + * @param text
  94 + * @return MultiGetResponse
  95 + */
  96 + List<AnalyzeToken> getAnalyzeTokens(final String yohoIndexName, String text, String analyzer);
  97 +
  98 + /**
  99 + * 批量删除或者更新插入索引
  100 + *
  101 + * @param esBluks
  102 + * @return
  103 + */
  104 + BulkResponse bulk(List<ESBluk> esBluks);
  105 +
  106 +}
@@ -124,6 +124,15 @@ public class YohoIndexServiceImpl implements IYohoIndexService, ApplicationEvent @@ -124,6 +124,15 @@ public class YohoIndexServiceImpl implements IYohoIndexService, ApplicationEvent
124 } 124 }
125 125
126 @Override 126 @Override
  127 + public IElasticsearchClient getElasticsearchClient(String yohoIndexName) {
  128 + IYohoIndex index = this.nameToIndexMap.get(yohoIndexName);
  129 + if (index == null) {
  130 + return null;
  131 + }
  132 + return index.getIndexClient();
  133 + }
  134 +
  135 + @Override
127 public IYohoIndex getIndex(String yohoIndexName) { 136 public IYohoIndex getIndex(String yohoIndexName) {
128 return this.nameToIndexMap.get(yohoIndexName); 137 return this.nameToIndexMap.get(yohoIndexName);
129 } 138 }
@@ -454,6 +463,7 @@ public class YohoIndexServiceImpl implements IYohoIndexService, ApplicationEvent @@ -454,6 +463,7 @@ public class YohoIndexServiceImpl implements IYohoIndexService, ApplicationEvent
454 return bulkResponse; 463 return bulkResponse;
455 } 464 }
456 465
  466 +
457 @Override 467 @Override
458 public boolean checkHealth(String indexName) { 468 public boolean checkHealth(String indexName) {
459 IYohoIndex index = this.nameToIndexMap.get(indexName); 469 IYohoIndex index = this.nameToIndexMap.get(indexName);
1 package com.yoho.search.consumer.restapi; 1 package com.yoho.search.consumer.restapi;
2 2
  3 +import com.alibaba.fastjson.JSONObject;
  4 +import com.yoho.search.base.utils.ISearchConstants;
  5 +import com.yoho.search.consumer.common.IYohoIndexService;
3 import com.yoho.search.consumer.service.logicService.personal.PersonalVectorVersionManager; 6 import com.yoho.search.consumer.service.logicService.personal.PersonalVectorVersionManager;
  7 +import com.yoho.search.consumer.service.logicService.tbl.util.StringUtils;
  8 +import com.yoho.search.core.es.IElasticsearchClient;
4 import com.yoho.search.core.personalized.service.BidataServiceCaller; 9 import com.yoho.search.core.personalized.service.BidataServiceCaller;
  10 +import org.apache.commons.collections.MapUtils;
  11 +import org.elasticsearch.cluster.ClusterState;
  12 +import org.elasticsearch.cluster.node.DiscoveryNodes;
  13 +import org.elasticsearch.cluster.routing.ShardRouting;
5 import org.springframework.beans.factory.annotation.Autowired; 14 import org.springframework.beans.factory.annotation.Autowired;
6 import org.springframework.stereotype.Controller; 15 import org.springframework.stereotype.Controller;
7 import org.springframework.web.bind.annotation.RequestMapping; 16 import org.springframework.web.bind.annotation.RequestMapping;
8 import org.springframework.web.bind.annotation.ResponseBody; 17 import org.springframework.web.bind.annotation.ResponseBody;
9 18
10 -import java.util.HashMap;  
11 -import java.util.Map; 19 +import java.util.*;
12 20
13 @Controller 21 @Controller
14 public class ToolsController { 22 public class ToolsController {
15 23
16 - @Autowired  
17 - private PersonalVectorVersionManager personalVectorVersionManager;  
18 - @Autowired  
19 - private BidataServiceCaller bidataServiceCaller;  
20 -  
21 - @RequestMapping(value = "/vectorVersion")  
22 - @ResponseBody  
23 - public Map<String, Object> vectorVersion(){  
24 - Map<String, Object> results = new HashMap<String, Object>();  
25 - //大数据目前推荐的版本  
26 - String bigDataRecomDateStr = personalVectorVersionManager.getBigDataRecomDateStr();  
27 - results.put("bigDataRecomDateStr", bigDataRecomDateStr==null?"":bigDataRecomDateStr);  
28 - //zk中目前的版本  
29 - String currentVersionInZk = personalVectorVersionManager.getCurrentVersionInZk();  
30 - results.put("currentVersionInZk", currentVersionInZk);  
31 - //经过计算目前可以使用的版本  
32 - String currentVersion = personalVectorVersionManager.getCurrentVersion();  
33 - results.put("currentVersion", currentVersion);  
34 - return results;  
35 - }  
36 -  
37 - @RequestMapping(value = "/bigdataServiceTest")  
38 - @ResponseBody  
39 - public Map<String, Object> bigdataServiceTets(Integer uid){  
40 - Map<String, Object> results = new HashMap<String, Object>();  
41 - //大数据目前推荐的版本  
42 - String bigDataRecomDateStr = bidataServiceCaller.getBigDataRecomDateStr();  
43 - results.put("bigDataRecomDateStr", bigDataRecomDateStr==null?"":bigDataRecomDateStr);  
44 - results.put("userFavoriteSizes",bidataServiceCaller.getUserFavoriteSizes(uid.toString()));  
45 - results.put("userGenderFeature",bidataServiceCaller.getUserGenderFeature(uid.toString()));  
46 - results.put("userFavoriteSizes",bidataServiceCaller.getUserFavoriteSizes(uid.toString()));  
47 - results.put("userVectorFeature",bidataServiceCaller.getUserVectorFeature(uid.toString(),bigDataRecomDateStr));  
48 - results.put("userPersionalFactor",bidataServiceCaller.queryUserPersionalFactor(uid,null,null));  
49 - return results;  
50 - } 24 + @Autowired
  25 + private PersonalVectorVersionManager personalVectorVersionManager;
  26 + @Autowired
  27 + private BidataServiceCaller bidataServiceCaller;
  28 + @Autowired
  29 + private IYohoIndexService yohoIndexService;
  30 +
  31 + @RequestMapping(value = "/vectorVersion")
  32 + @ResponseBody
  33 + public Map<String, Object> vectorVersion() {
  34 + Map<String, Object> results = new HashMap<String, Object>();
  35 + //大数据目前推荐的版本
  36 + String bigDataRecomDateStr = personalVectorVersionManager.getBigDataRecomDateStr();
  37 + results.put("bigDataRecomDateStr", bigDataRecomDateStr == null ? "" : bigDataRecomDateStr);
  38 + //zk中目前的版本
  39 + String currentVersionInZk = personalVectorVersionManager.getCurrentVersionInZk();
  40 + results.put("currentVersionInZk", currentVersionInZk);
  41 + //经过计算目前可以使用的版本
  42 + String currentVersion = personalVectorVersionManager.getCurrentVersion();
  43 + results.put("currentVersion", currentVersion);
  44 + return results;
  45 + }
  46 +
  47 + @RequestMapping(value = "/bigdataServiceTest")
  48 + @ResponseBody
  49 + public Map<String, Object> bigdataServiceTets(Integer uid) {
  50 + Map<String, Object> results = new HashMap<String, Object>();
  51 + //大数据目前推荐的版本
  52 + String bigDataRecomDateStr = bidataServiceCaller.getBigDataRecomDateStr();
  53 + results.put("bigDataRecomDateStr", bigDataRecomDateStr == null ? "" : bigDataRecomDateStr);
  54 + results.put("userFavoriteSizes", bidataServiceCaller.getUserFavoriteSizes(uid.toString()));
  55 + results.put("userGenderFeature", bidataServiceCaller.getUserGenderFeature(uid.toString()));
  56 + results.put("userFavoriteSizes", bidataServiceCaller.getUserFavoriteSizes(uid.toString()));
  57 + results.put("userVectorFeature", bidataServiceCaller.getUserVectorFeature(uid.toString(), bigDataRecomDateStr));
  58 + results.put("userPersionalFactor", bidataServiceCaller.queryUserPersionalFactor(uid, null, null));
  59 + return results;
  60 + }
  61 +
  62 + @RequestMapping(value = "/ip2IndexInfo")
  63 + @ResponseBody
  64 + public Map<String, Object> nodeAndShardInfo() {
  65 + IElasticsearchClient client = yohoIndexService.getElasticsearchClient(ISearchConstants.INDEX_NAME_PRODUCT_INDEX);
  66 + ClusterState clusterState = client.getClusterStateResponse().getState();
  67 + DiscoveryNodes discoveryNodes = clusterState.nodes();
  68 +
  69 + Map<String, List<Map<String, Object>>> ip2IndexInfo = new HashMap<>();
  70 + Set<String> allIps = new HashSet<>();
  71 + Set<String> hasPrimaryIps = new HashSet<>();
  72 + for (ShardRouting shard : clusterState.routingTable().allShards()) {
  73 + String indexName = shard.getIndexName();
  74 + String hostAddress = discoveryNodes.get(shard.currentNodeId()).getHostAddress();
  75 + boolean isPrimary = shard.primary();
  76 + if(isPrimary){
  77 + hasPrimaryIps.add(hostAddress);
  78 + }
  79 + allIps.add(hostAddress);
  80 +
  81 + List<Map<String, Object>> indexInfoList = ip2IndexInfo.computeIfAbsent(hostAddress,a->new ArrayList<>());
  82 + JSONObject indexInfo = new JSONObject();
  83 + indexInfo.put("index_name", indexName);
  84 + indexInfo.put("node_id", shard.currentNodeId());
  85 + indexInfo.put("is_primary", isPrimary);
  86 + indexInfo.put("host_address", hostAddress);
  87 + indexInfoList.add(indexInfo);
  88 + }
  89 + Map<String, Object> result = new HashMap<>();
  90 + result.put("all",ip2IndexInfo);
  91 + result.put("hasPrimaryIps", StringUtils.join(hasPrimaryIps,","));
  92 + allIps.removeAll(hasPrimaryIps);
  93 + result.put("noPrimaryIps", StringUtils.join(allIps,","));
  94 + return result;
  95 + }
51 96
52 } 97 }