Authored by liangyi.chen@yoho.cn

增加es的索引更新

... ... @@ -50,6 +50,26 @@
<version>1.10</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.5.2</version>
<exclusions>
<exclusion>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch -->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.5.2</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
... ...
package com.yoho.datasync.consumer.handler.listener.community;
import com.alibaba.fastjson.JSON;
import com.yoho.datasync.consumer.common.EventEnum;
import com.yoho.datasync.consumer.handler.mqcomponent.AbstractMqListener;
import com.yoho.datasync.core.base.annotation.MqConsumerListerner;
import com.yoho.datasync.core.base.model.yh_community.UserInfo;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
@MqConsumerListerner(dbName = "yh_community",tableName = "user_info")
public class UserInfoListener extends AbstractMqListener<UserInfo> {
... ... @@ -15,13 +25,37 @@ public class UserInfoListener extends AbstractMqListener<UserInfo> {
private static final Logger logger = LoggerFactory.getLogger(UserInfoListener.class);
@Qualifier("rhlClient")
@Autowired
private RestHighLevelClient rhlClient;
@Override
protected void deleteData(UserInfo sourceObject , Object checkResult) throws Exception {
}
@Override
protected void updateData(UserInfo sourceObject , Object checkResult) throws Exception {
protected void updateData(UserInfo userInfo , Object checkResult) throws Exception {
//更新用户信息索引到ES
// 批量增加
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index("grass");
updateRequest.type("userInfo");
updateRequest.id(String.valueOf(userInfo.getYohoUid()));
updateRequest.doc(XContentFactory.jsonBuilder().startObject()
.field("nick_name", userInfo.getNickName())
.field("head_ico", userInfo.getHeadIco())
.field("yoho_uid", userInfo.getYohoUid())
.field("gender", userInfo.getGender())
.endObject());
try {
// 2 获取更新后的值
UpdateResponse indexResponse = rhlClient.update(updateRequest, RequestOptions.DEFAULT);
logger.info("updateIndex: {}" , JSON.toJSONString(indexResponse));
} catch (IOException e) {
logger.info("updateIndex error :{}" ,e);
}
}
... ... @@ -30,4 +64,5 @@ public class UserInfoListener extends AbstractMqListener<UserInfo> {
protected EventEnum getEventReportEnum() {
return EventEnum.USER_INFO_UPDATE;
}
}
... ...
package com.yoho.datasync.consumer.starter.es;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
@Configuration
public class ESClientConfig {
@Value("${elasticSearch.host}")
private String host;
@Value("${elasticSearch.port}")
private int port;
@Value("${elasticSearch.client.connectNum}")
private Integer connectNum;
@Value("${elasticSearch.client.connectPerRoute}")
private Integer connectPerRoute;
@Bean
public HttpHost httpHost(){
return new HttpHost(host, port, "http");
}
@Bean(initMethod = "init", destroyMethod = "close")
public ESClientFactory getESFactory(){
return ESClientFactory.build(httpHost(), connectNum, connectPerRoute);
}
@Bean("rhlClient")
@Scope("singleton")
public RestHighLevelClient getRHLClient(){
return getESFactory().getRhlClient();
}
}
... ...
package com.yoho.datasync.consumer.starter.es;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
public class ESClientFactory {
public static final Logger logger = LoggerFactory.getLogger(ESClientFactory.class);
public static int CONNECT_TIMEOUT_MILLIS = 1000;
public static int SOCKET_TIMEOUT_MILLIS = 30000;
public static int CONNECTION_REQUEST_TIMEOUT_MILLIS = 500;
public static int MAX_CONN_PER_ROUTE = 10;
public static int MAX_CONN_TOTAL = 30;
public static HttpHost HTTP_HOST;
public RestClient restClient;
public RestClientBuilder restClientBuilder;
private RestHighLevelClient restHighLevelClient;
private static ESClientFactory esClientFactory = new ESClientFactory();
public ESClientFactory(){}
public static ESClientFactory build(HttpHost httpHost, Integer maxConnectNum, Integer maxConnPerRoute){
HTTP_HOST = httpHost;
MAX_CONN_TOTAL = maxConnectNum;
MAX_CONN_PER_ROUTE = maxConnPerRoute;
return esClientFactory;
}
public void init(){
restClientBuilder = RestClient.builder(HTTP_HOST);
restClient = restClientBuilder.build();
setConnectTimeoutConfig();
setMutiConnectConfig();
restHighLevelClient = new RestHighLevelClient(restClientBuilder);
}
//配置连接时间延迟
public void setConnectTimeoutConfig(){
restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> {
requestConfigBuilder.setConnectTimeout(CONNECT_TIMEOUT_MILLIS);
requestConfigBuilder.setSocketTimeout(SOCKET_TIMEOUT_MILLIS);
requestConfigBuilder.setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT_MILLIS);
return requestConfigBuilder;
});
}
//使用异步httpclient设置并发连接数
public void setMutiConnectConfig(){
restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
httpClientBuilder.setMaxConnTotal(MAX_CONN_TOTAL);
httpClientBuilder.setMaxConnPerRoute(MAX_CONN_PER_ROUTE);
return httpClientBuilder;
});
}
public RestHighLevelClient getRhlClient(){
return restHighLevelClient;
}
public void close(){
if(restHighLevelClient != null){
try {
restHighLevelClient.close();
} catch (IOException e) {
logger.error("close rest high level client failed, error is {}", e.getMessage());
}
}
}
}
... ...
... ... @@ -20,5 +20,12 @@ spring:
password: 9nm0icOwt6bMHjMusIfMLw==
driver-class-name: com.mysql.jdbc.Driver
elasticSearch:
host: 192.168.104.246
port: 9200
client:
connectNum: 10
connectPerRoute: 50
logging:
config: classpath:logback-boot.xml
... ...
... ... @@ -22,3 +22,10 @@ spring:
logging:
config: classpath:logback-boot.xml
elasticSearch:
host: 192.168.104.246
port: 9200
client:
connectNum: 10
connectPerRoute: 50
\ No newline at end of file
... ...