Authored by liangyi.chen@yoho.cn

增加动态配置开关

  1 +package com.yoho.datasync.consumer.common;
  2 +
  3 +public class IndexConstant {
  4 +
  5 + public interface INDEX_NAME{
  6 + String GRASS = "grass";
  7 + }
  8 +
  9 + public interface INDEX_TYPE{
  10 + String USER_INFO = "userInfo";
  11 + }
  12 +}
@@ -69,6 +69,43 @@ @@ -69,6 +69,43 @@
69 <artifactId>elasticsearch</artifactId> 69 <artifactId>elasticsearch</artifactId>
70 <version>6.5.2</version> 70 <version>6.5.2</version>
71 </dependency> 71 </dependency>
  72 + <dependency>
  73 + <groupId>commons-collections</groupId>
  74 + <artifactId>commons-collections</artifactId>
  75 + <version>3.2.2</version>
  76 + </dependency>
  77 +
  78 + <dependency>
  79 + <groupId>com.netflix.archaius</groupId>
  80 + <artifactId>archaius-core</artifactId>
  81 + <version>0.7.3</version>
  82 + </dependency>
  83 + <dependency>
  84 + <groupId>com.netflix.archaius</groupId>
  85 + <artifactId>archaius-zookeeper</artifactId>
  86 + <version>0.7.3</version>
  87 + <exclusions>
  88 + <exclusion>
  89 + <groupId>org.slf4j</groupId>
  90 + <artifactId>slf4j-log4j12</artifactId>
  91 + </exclusion>
  92 + </exclusions>
  93 + </dependency>
  94 + <dependency>
  95 + <groupId>org.apache.curator</groupId>
  96 + <artifactId>curator-framework</artifactId>
  97 + <version>2.9.1</version>
  98 + </dependency>
  99 + <dependency>
  100 + <groupId>org.apache.curator</groupId>
  101 + <artifactId>curator-client</artifactId>
  102 + <version>2.9.1</version>
  103 + </dependency>
  104 + <dependency>
  105 + <groupId>commons-configuration</groupId>
  106 + <artifactId>commons-configuration</artifactId>
  107 + <version>1.5</version>
  108 + </dependency>
72 109
73 </dependencies> 110 </dependencies>
74 111
  1 +package com.yoho.datasync.consumer.handler.config;
  2 +
  3 +import com.netflix.config.DynamicWatchedConfiguration;
  4 +import com.netflix.config.source.ZooKeeperConfigurationSource;
  5 +import org.apache.curator.framework.CuratorFramework;
  6 +import org.slf4j.Logger;
  7 +import org.slf4j.LoggerFactory;
  8 +import org.springframework.beans.factory.InitializingBean;
  9 +import org.springframework.core.io.ClassPathResource;
  10 +import org.springframework.core.io.Resource;
  11 +import org.springframework.core.io.support.PropertiesLoaderUtils;
  12 +import org.springframework.stereotype.Component;
  13 +
  14 +import javax.annotation.PreDestroy;
  15 +import java.util.Properties;
  16 +import java.util.concurrent.atomic.AtomicBoolean;
  17 +
  18 +import static com.netflix.config.ConfigurationManager.install;
  19 +
  20 +/**
  21 + * 读取classpath下的config.properties文件,并且写入到zookeeper中
  22 + */
  23 +@Component
  24 +public class ConfigMangerRegister implements InitializingBean {
  25 +
  26 + @javax.annotation.Resource
  27 + private CuratorFramework curatorFramework;
  28 +
  29 +
  30 + private static final Logger logger = LoggerFactory.getLogger(ConfigMangerRegister.class);
  31 + private static final String CONFIG_ROOT_PATH = "/yh/config";
  32 + private static final String LOCAL_CONFIG_FILE = "global.properties";
  33 + private static final String CACHE_LOCAL_CONFIG_FILE = "cache.properties";
  34 +
  35 +
  36 + private static AtomicBoolean inited =new AtomicBoolean(false);
  37 +
  38 +
  39 +
  40 + @Override
  41 + public void afterPropertiesSet() throws Exception {
  42 + start();
  43 + }
  44 +
  45 + /**
  46 + * 1. 注册zookeeper配置的支持到netflix archaius中
  47 + * 2. 读取config.properties的配置,添加到zookeeper中
  48 + */
  49 + public void start() throws Exception {
  50 + /**
  51 + * return directly if zk client not be configured
  52 + */
  53 + if(null == curatorFramework){
  54 + return ;
  55 + }
  56 +
  57 +
  58 + /**
  59 + * create path if not exist
  60 + */
  61 + if (curatorFramework.checkExists().forPath(CONFIG_ROOT_PATH) == null) {
  62 + curatorFramework.create().creatingParentContainersIfNeeded().forPath(CONFIG_ROOT_PATH);
  63 + }
  64 +
  65 + //register to archaius
  66 + this.registerZookeeperConfig();
  67 +
  68 + //register all configurations to zookeeper
  69 + this.registerConfigurations();
  70 +
  71 + // register cache configurations to zookeeper
  72 + this.registerCacheConfigurations();
  73 + }
  74 +
  75 +
  76 +
  77 + /**
  78 + * register zookeeper config source to netflix
  79 + *
  80 + * @see <a href = "http://github.com/Netflix/archaius/wiki/ZooKeeper-Dynamic-Configuration"> netflix archaius </a>
  81 + */
  82 + private void registerZookeeperConfig() {
  83 +
  84 + if(!inited.compareAndSet(false, true)){
  85 + return;
  86 + }
  87 + ZooKeeperConfigurationSource zkConfigSource = new ZooKeeperConfigurationSource(curatorFramework, CONFIG_ROOT_PATH);
  88 + try {
  89 + zkConfigSource.start();
  90 + } catch (Exception e) {
  91 + logger.error("start zk config source error !");
  92 + }
  93 +
  94 + DynamicWatchedConfiguration zkDynamicConfig = new DynamicWatchedConfiguration(zkConfigSource);
  95 + install(zkDynamicConfig);
  96 + }
  97 +
  98 + /**
  99 + * 读取配置,并且注册到zk中
  100 + */
  101 + private void registerConfigurations() {
  102 +
  103 + try {
  104 +
  105 + //check if resource existed
  106 + Resource resource = new ClassPathResource(LOCAL_CONFIG_FILE);
  107 + if(!resource.exists()) {
  108 + logger.info("can not find {} at classpath.", LOCAL_CONFIG_FILE);
  109 + return;
  110 + }
  111 +
  112 + Properties props = PropertiesLoaderUtils.loadProperties(resource);
  113 +
  114 + for (String propertyName : props.stringPropertyNames()) {
  115 + final String path = ConfigMangerRegister.CONFIG_ROOT_PATH + "/" + propertyName;
  116 + //if path is not existed, create
  117 + if (this.curatorFramework.checkExists().forPath(path) == null) {
  118 + String value = String.valueOf(props.getProperty(propertyName));
  119 + this.curatorFramework.create().forPath(path, value.getBytes("UTF-8"));
  120 +
  121 + logger.debug("register configuration: {} --> {} success", propertyName, value);
  122 + }
  123 + }
  124 +
  125 + logger.info("register all configurations success");
  126 +
  127 + }catch (Exception e){
  128 + logger.error("register all configurations to zookeeper failed with exception.", e);
  129 + }
  130 + }
  131 +
  132 +
  133 +
  134 + /**
  135 + * 读取配置,并且注册到zk中
  136 + */
  137 + private void registerCacheConfigurations() {
  138 +
  139 + try {
  140 +
  141 + //check if resource existed
  142 + Resource resource = new ClassPathResource(CACHE_LOCAL_CONFIG_FILE);
  143 + if(!resource.exists()) {
  144 + logger.info("can not find {} at classpath.", CACHE_LOCAL_CONFIG_FILE);
  145 + return;
  146 + }
  147 + Properties props = PropertiesLoaderUtils.loadProperties(resource);
  148 + for (String propertyName : props.stringPropertyNames()) {
  149 + final String path = ConfigMangerRegister.CONFIG_ROOT_PATH + "/" + propertyName;
  150 + //if path is not existed, create
  151 + if (this.curatorFramework.checkExists().forPath(path) == null) {
  152 + String value = String.valueOf(props.getProperty(propertyName));
  153 + this.curatorFramework.create().forPath(path, value.getBytes("UTF-8"));
  154 +
  155 + logger.debug("register configuration: {} --> {} success", propertyName, value);
  156 + }
  157 + }
  158 +
  159 + logger.info("register all configurations success");
  160 + }catch (Exception e){
  161 + logger.error("register all configurations to zookeeper failed with exception.", e);
  162 + }
  163 + }
  164 +
  165 +
  166 + //called by spring
  167 + @PreDestroy
  168 + public void destroy() {
  169 + curatorFramework.close();
  170 + }
  171 +}
  1 +package com.yoho.datasync.consumer.handler.config;
  2 +
  3 +import com.netflix.config.*;
  4 +import org.springframework.stereotype.Component;
  5 +
  6 +/**
  7 + * 读取配置。 当前支持读取zookeeper中的配置
  8 + */
  9 +@Component
  10 +public class ConfigReader {
  11 +
  12 + /**
  13 + * 读取Spring类型的配置
  14 + * @param key Key
  15 + * @param defaultValue 默认值
  16 + * @return value
  17 + */
  18 + public String getString(String key, String defaultValue) {
  19 + final DynamicStringProperty property = DynamicPropertyFactory
  20 + .getInstance().getStringProperty(key, defaultValue);
  21 + return property.get();
  22 + }
  23 +
  24 + /**
  25 + * 读取int类型的配置
  26 + * @param key Key
  27 + * @param defaultValue 默认值
  28 + * @return value
  29 + */
  30 + public int getInt(String key, int defaultValue) {
  31 + final DynamicIntProperty property = DynamicPropertyFactory
  32 + .getInstance().getIntProperty(key, defaultValue);
  33 + return property.get();
  34 + }
  35 +
  36 + /**
  37 + * 读取int类型的配置
  38 + * @param key Key
  39 + * @param defaultValue 默认值
  40 + * @return value
  41 + */
  42 + public double getDouble(String key, double defaultValue) {
  43 + final DynamicDoubleProperty property = DynamicPropertyFactory
  44 + .getInstance().getDoubleProperty(key, defaultValue);
  45 + return property.get();
  46 + }
  47 +
  48 +
  49 + /**
  50 + * 读取long类型的配置
  51 + * @param key Key
  52 + * @param defaultValue 默认值
  53 + * @return value
  54 + */
  55 + public long getLong(String key, long defaultValue) {
  56 + final DynamicLongProperty property = DynamicPropertyFactory
  57 + .getInstance().getLongProperty(key, defaultValue);
  58 + return property.get();
  59 + }
  60 +
  61 +
  62 + /**
  63 + * 读取boolean类型的配置
  64 + * @param key Key
  65 + * @param defaultValue 默认值
  66 + * @return value
  67 + */
  68 + public boolean getBoolean(String key, boolean defaultValue) {
  69 + final DynamicBooleanProperty property = DynamicPropertyFactory
  70 + .getInstance().getBooleanProperty(key, defaultValue);
  71 + return property.get();
  72 + }
  73 +
  74 +
  75 +}
  1 +package com.yoho.datasync.consumer.handler.config;
  2 +
  3 +import org.springframework.stereotype.Component;
  4 +
  5 +import javax.annotation.Resource;
  6 +
  7 +/**
  8 + * Created by markeloff on 16/10/8.
  9 + */
  10 +@Component
  11 +public class ConfigReaderUtil {
  12 +
  13 + @Resource
  14 + private ConfigReader configReader;
  15 +
  16 + public String getString(String key, String defaultValue) {
  17 + if(configReader==null){
  18 + return defaultValue;
  19 + }
  20 + return configReader.getString(key, defaultValue);
  21 + }
  22 +
  23 + public int getInt(String key, int defaultValue) {
  24 + if(configReader==null){
  25 + return defaultValue;
  26 + }
  27 + return configReader.getInt(key,defaultValue);
  28 + }
  29 +
  30 + public long getLong(String key, long defaultValue) {
  31 + if(configReader==null){
  32 + return defaultValue;
  33 + }
  34 + return configReader.getLong(key,defaultValue);
  35 + }
  36 +
  37 + public boolean getBoolean(String key, boolean defaultValue) {
  38 + if(configReader==null){
  39 + return defaultValue;
  40 + }
  41 + return configReader.getBoolean(key,defaultValue);
  42 + }
  43 +
  44 +}
  1 +package com.yoho.datasync.consumer.handler.config;
  2 +
  3 +import org.apache.commons.lang3.StringUtils;
  4 +import org.apache.curator.RetryPolicy;
  5 +import org.apache.curator.framework.CuratorFramework;
  6 +import org.apache.curator.framework.CuratorFrameworkFactory;
  7 +import org.apache.curator.retry.ExponentialBackoffRetry;
  8 +import org.slf4j.Logger;
  9 +import org.slf4j.LoggerFactory;
  10 +import org.springframework.boot.context.properties.ConfigurationProperties;
  11 +import org.springframework.context.annotation.Bean;
  12 +import org.springframework.context.annotation.Configuration;
  13 +
  14 +import javax.annotation.PreDestroy;
  15 +
  16 +
  17 +/**
  18 + * 获取CuratorFramework 客户端
  19 + */
  20 +@Configuration
  21 +@ConfigurationProperties(prefix = "zk")
  22 +public class CuratorFrameworkFactoryBean {
  23 + private final static Logger logger = LoggerFactory.getLogger(CuratorFrameworkFactoryBean.class);
  24 +
  25 + private CuratorFramework curator;
  26 + private String connectString;
  27 + private RetryPolicy retryPolicy;
  28 + private Integer sessionTimeout;
  29 + private String namespace;
  30 +
  31 + @Bean
  32 + public CuratorFramework curatorFramework(){
  33 + if(StringUtils.isEmpty(connectString)){
  34 + return null;
  35 + }
  36 +
  37 + CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
  38 +
  39 + builder.connectString(connectString);
  40 +
  41 + if (retryPolicy == null) {
  42 + retryPolicy = new ExponentialBackoffRetry(1000, 3);
  43 + }
  44 + builder.retryPolicy(retryPolicy);
  45 +
  46 + if (sessionTimeout != null) {
  47 + builder.sessionTimeoutMs(sessionTimeout);
  48 + }
  49 +
  50 + if (namespace != null) {
  51 + builder.namespace(namespace);
  52 + }
  53 +
  54 + curator = builder.build();
  55 + curator.start();
  56 + logger.info("start zookeeper client success. connected to {}", connectString);
  57 + return curator;
  58 + }
  59 +
  60 +
  61 +
  62 + @PreDestroy
  63 + public void destroy() throws Exception {
  64 + curator.close();
  65 + }
  66 +
  67 + public String getConnectString() {
  68 + return connectString;
  69 + }
  70 +
  71 + public void setConnectString(String connectString) {
  72 + this.connectString = connectString;
  73 + }
  74 +
  75 + public Integer getSessionTimeout() {
  76 + return sessionTimeout;
  77 + }
  78 +
  79 + public void setSessionTimeout(Integer sessionTimeout) {
  80 + this.sessionTimeout = sessionTimeout;
  81 + }
  82 +
  83 + public RetryPolicy getRetryPolicy() {
  84 + return retryPolicy;
  85 + }
  86 +
  87 + public void setRetryPolicy(RetryPolicy retryPolicy) {
  88 + this.retryPolicy = retryPolicy;
  89 + }
  90 +
  91 + public String getNamespace() {
  92 + return namespace;
  93 + }
  94 +
  95 + public void setNamespace(String namespace) {
  96 + this.namespace = namespace;
  97 + }
  98 +
  99 +}
@@ -2,21 +2,18 @@ package com.yoho.datasync.consumer.handler.listener.community; @@ -2,21 +2,18 @@ package com.yoho.datasync.consumer.handler.listener.community;
2 2
3 import com.alibaba.fastjson.JSON; 3 import com.alibaba.fastjson.JSON;
4 import com.yoho.datasync.consumer.common.EventEnum; 4 import com.yoho.datasync.consumer.common.EventEnum;
  5 +import com.yoho.datasync.consumer.common.IndexConstant;
  6 +import com.yoho.datasync.consumer.handler.config.ConfigReaderUtil;
  7 +import com.yoho.datasync.consumer.handler.model.ESBluk;
5 import com.yoho.datasync.consumer.handler.mqcomponent.AbstractMqListener; 8 import com.yoho.datasync.consumer.handler.mqcomponent.AbstractMqListener;
  9 +import com.yoho.datasync.consumer.handler.processor.ESUpdateProcess;
6 import com.yoho.datasync.core.base.annotation.MqConsumerListerner; 10 import com.yoho.datasync.core.base.annotation.MqConsumerListerner;
7 import com.yoho.datasync.core.base.model.yh_community.UserInfo; 11 import com.yoho.datasync.core.base.model.yh_community.UserInfo;
8 -import org.elasticsearch.action.update.UpdateRequest;  
9 -import org.elasticsearch.action.update.UpdateResponse;  
10 -import org.elasticsearch.client.RequestOptions;  
11 -import org.elasticsearch.client.RestHighLevelClient;  
12 -import org.elasticsearch.common.xcontent.XContentFactory;  
13 import org.slf4j.Logger; 12 import org.slf4j.Logger;
14 import org.slf4j.LoggerFactory; 13 import org.slf4j.LoggerFactory;
15 -import org.springframework.beans.factory.annotation.Autowired;  
16 -import org.springframework.beans.factory.annotation.Qualifier;  
17 import org.springframework.stereotype.Component; 14 import org.springframework.stereotype.Component;
18 15
19 -import java.io.IOException; 16 +import javax.annotation.Resource;
20 17
21 @Component 18 @Component
22 @MqConsumerListerner(dbName = "yh_community",tableName = "user_info") 19 @MqConsumerListerner(dbName = "yh_community",tableName = "user_info")
@@ -25,38 +22,34 @@ public class UserInfoListener extends AbstractMqListener<UserInfo> { @@ -25,38 +22,34 @@ public class UserInfoListener extends AbstractMqListener<UserInfo> {
25 22
26 private static final Logger logger = LoggerFactory.getLogger(UserInfoListener.class); 23 private static final Logger logger = LoggerFactory.getLogger(UserInfoListener.class);
27 24
28 - @Qualifier("rhlClient")  
29 - @Autowired  
30 - private RestHighLevelClient rhlClient; 25 + @Resource
  26 + private ESUpdateProcess esUpdateProcess;
31 27
32 - @Override  
33 - protected void deleteData(UserInfo sourceObject , Object checkResult) throws Exception { 28 + @Resource
  29 + private ConfigReaderUtil configReaderUtil;
34 30
  31 + @Override
  32 + protected void deleteData(UserInfo userInfo , Object checkResult) throws Exception {
  33 + logger.info("enter UserInfoListener delete userInfo data is {}" , userInfo);
  34 + boolean syncFlag = configReaderUtil.getBoolean("grass.search.es.flag", false);
  35 + if(syncFlag){
  36 + //更新用户信息索引到ES 添加到队列中
  37 + ESBluk esBluk = new ESBluk(null,String.valueOf(userInfo.getYohoUid()),
  38 + IndexConstant.INDEX_NAME.GRASS,IndexConstant.INDEX_TYPE.USER_INFO,true);
  39 + esUpdateProcess.add(esBluk);
  40 + }
35 } 41 }
36 42
37 @Override 43 @Override
38 protected void updateData(UserInfo userInfo , Object checkResult) throws Exception { 44 protected void updateData(UserInfo userInfo , Object checkResult) throws Exception {
39 - //更新用户信息索引到ES  
40 - // 批量增加  
41 - UpdateRequest updateRequest = new UpdateRequest();  
42 - updateRequest.index("grass");  
43 - updateRequest.type("userInfo");  
44 - updateRequest.id(String.valueOf(userInfo.getYohoUid()));  
45 - updateRequest.doc(XContentFactory.jsonBuilder().startObject()  
46 - .field("nick_name", userInfo.getNickName())  
47 - .field("head_ico", userInfo.getHeadIco())  
48 - .field("yoho_uid", userInfo.getYohoUid())  
49 - .field("gender", userInfo.getGender())  
50 - .endObject());  
51 -  
52 - try {  
53 - // 2 获取更新后的值  
54 - UpdateResponse indexResponse = rhlClient.update(updateRequest, RequestOptions.DEFAULT);  
55 - logger.info("updateIndex: {}" , JSON.toJSONString(indexResponse));  
56 - } catch (IOException e) {  
57 - logger.info("updateIndex error :{}" ,e); 45 + logger.info("enter UserInfoListener update userInfo data is {}" , userInfo);
  46 + boolean syncFlag = configReaderUtil.getBoolean("grass.search.es.flag", false);
  47 + if(syncFlag){
  48 + //更新用户信息索引到ES 添加到队列中
  49 + ESBluk esBluk = new ESBluk(JSON.toJSONString(userInfo),String.valueOf(userInfo.getYohoUid()),
  50 + IndexConstant.INDEX_NAME.GRASS,IndexConstant.INDEX_TYPE.USER_INFO,false);
  51 + esUpdateProcess.add(esBluk);
58 } 52 }
59 -  
60 } 53 }
61 54
62 55
  1 +package com.yoho.datasync.consumer.handler.model;
  2 +
  3 +/**
  4 + * Created by apple on 16/8/10.
  5 + */
  6 +public class ESBluk {
  7 +
  8 + private String indexName;
  9 + private String type;
  10 + private String id;
  11 + private String dataJson;
  12 + private boolean isDelete;
  13 +
  14 + public ESBluk(String dataJson, String id, String indexName, String type, boolean isDelete) {
  15 + this.dataJson = dataJson;
  16 + this.id = id;
  17 + this.indexName = indexName;
  18 + this.isDelete = isDelete;
  19 + this.type = type;
  20 + }
  21 +
  22 + @Override
  23 + public String toString() {
  24 + return "ESBluk{" + "indexName='" + indexName + '\'' + ", type='" + type + '\'' + ", id='" + id + '\'' + ", dataJson='" + dataJson + '\'' + ", isDelete=" + isDelete + '}';
  25 + }
  26 +
  27 + public void replace(ESBluk bluk) {
  28 + this.dataJson = bluk.getDataJson();
  29 + this.id = bluk.getId();
  30 + this.indexName = bluk.getIndexName();
  31 + this.isDelete = bluk.isDelete();
  32 + this.type = bluk.getType();
  33 + }
  34 +
  35 + public String getDataJson() {
  36 + return dataJson;
  37 + }
  38 +
  39 + public void setDataJson(String dataJson) {
  40 + this.dataJson = dataJson;
  41 + }
  42 +
  43 + public String getId() {
  44 + return id;
  45 + }
  46 +
  47 + public void setId(String id) {
  48 + this.id = id;
  49 + }
  50 +
  51 + public String getIndexName() {
  52 + return indexName;
  53 + }
  54 +
  55 + public void setIndexName(String indexName) {
  56 + this.indexName = indexName;
  57 + }
  58 +
  59 + public boolean isDelete() {
  60 + return isDelete;
  61 + }
  62 +
  63 + public void setDelete(boolean delete) {
  64 + isDelete = delete;
  65 + }
  66 +
  67 + public String getType() {
  68 + return type;
  69 + }
  70 +
  71 + public void setType(String type) {
  72 + this.type = type;
  73 + }
  74 +}
  1 +package com.yoho.datasync.consumer.handler.processor;
  2 +
  3 +import com.alibaba.fastjson.JSON;
  4 +import com.yoho.datasync.consumer.handler.model.ESBluk;
  5 +import org.apache.commons.collections.CollectionUtils;
  6 +import org.elasticsearch.action.bulk.BulkRequest;
  7 +import org.elasticsearch.action.bulk.BulkResponse;
  8 +import org.elasticsearch.action.delete.DeleteRequest;
  9 +import org.elasticsearch.action.index.IndexRequest;
  10 +import org.elasticsearch.client.RequestOptions;
  11 +import org.elasticsearch.client.RestHighLevelClient;
  12 +import org.elasticsearch.common.xcontent.XContentType;
  13 +import org.slf4j.Logger;
  14 +import org.slf4j.LoggerFactory;
  15 +import org.springframework.beans.factory.annotation.Qualifier;
  16 +import org.springframework.stereotype.Component;
  17 +
  18 +import javax.annotation.PostConstruct;
  19 +import javax.annotation.Resource;
  20 +import java.io.IOException;
  21 +import java.util.ArrayList;
  22 +import java.util.List;
  23 +import java.util.concurrent.ArrayBlockingQueue;
  24 +import java.util.concurrent.ExecutorService;
  25 +import java.util.concurrent.Executors;
  26 +
  27 +@Component
  28 +public class ESUpdateProcess {
  29 + private static final Logger logger = LoggerFactory.getLogger(ESUpdateProcess.class);
  30 +
  31 + private final ArrayBlockingQueue<ESBluk> queue = new ArrayBlockingQueue<>(200);
  32 +
  33 +
  34 + @Qualifier("rhlClient")
  35 + @Resource
  36 + private RestHighLevelClient rhlClient;
  37 +
  38 + @PostConstruct
  39 + void init() {
  40 + // 批量更新ES
  41 + ExecutorService executorService = Executors.newSingleThreadExecutor();
  42 + executorService.submit(() -> {
  43 + while (true) {
  44 + doBulk();
  45 + }
  46 + });
  47 + }
  48 +
  49 + private void doBulk() {
  50 + try {
  51 + long begin = System.currentTimeMillis();
  52 +
  53 + //1、从队列中获取全部数据
  54 + List<ESBluk> blukList = new ArrayList<>();
  55 + queue.drainTo(blukList);
  56 +
  57 + //2、批量更新Es
  58 + if (CollectionUtils.isNotEmpty(blukList)) {
  59 + BulkRequest bulkARequest = new BulkRequest();
  60 + for (ESBluk eSBluk : blukList) {
  61 + if(!eSBluk.isDelete()){
  62 + //新增或者更新
  63 + IndexRequest indexRequest = new IndexRequest(eSBluk.getIndexName(), eSBluk.getType(), eSBluk.getId());
  64 + indexRequest.source(eSBluk.getDataJson(), XContentType.JSON);
  65 + bulkARequest.add(indexRequest);
  66 + }else{
  67 + //删除
  68 + DeleteRequest deleteRequest = new DeleteRequest(eSBluk.getIndexName(), eSBluk.getType(), eSBluk.getId());
  69 + bulkARequest.add(deleteRequest);
  70 + }
  71 +
  72 + }
  73 + BulkResponse bulkResponse = null;
  74 + try {
  75 + bulkResponse = rhlClient.bulk(bulkARequest, RequestOptions.DEFAULT);
  76 + if(bulkResponse.hasFailures()){
  77 + logger.warn("bulk failure[{}]", bulkResponse.buildFailureMessage());
  78 + }
  79 + logger.info("writeData: {}" , JSON.toJSONString(bulkResponse));
  80 +
  81 + } catch (IOException e) {
  82 + logger.info("writeData error :{}" ,e);
  83 + }
  84 + logger.info("doBulk, the blukList size is {} and cost {} ms,", blukList.size(), System.currentTimeMillis() - begin);
  85 + Thread.sleep(50);
  86 + } else {
  87 + Thread.sleep(1000);
  88 + }
  89 + } catch (Exception e) {
  90 + logger.error(e.getMessage(), e);
  91 + }
  92 + }
  93 +
  94 + public void add(ESBluk esBluk) {
  95 + try {
  96 + queue.put(esBluk);
  97 + } catch (Exception e) {
  98 + logger.error(e.getMessage(), e);
  99 + }
  100 + }
  101 +
  102 +}
@@ -27,5 +27,8 @@ elasticSearch: @@ -27,5 +27,8 @@ elasticSearch:
27 connectNum: 10 27 connectNum: 10
28 connectPerRoute: 50 28 connectPerRoute: 50
29 29
  30 +zk:
  31 + connectString: 192.168.102.45:2181
  32 +
30 logging: 33 logging:
31 config: classpath:logback-boot.xml 34 config: classpath:logback-boot.xml
@@ -29,3 +29,6 @@ elasticSearch: @@ -29,3 +29,6 @@ elasticSearch:
29 client: 29 client:
30 connectNum: 10 30 connectNum: 10
31 connectPerRoute: 50 31 connectPerRoute: 50
  32 +
  33 +zk:
  34 + connectString: 192.168.104.246:2181