Authored by 张帅

用户信息全量同步es

@@ -11,9 +11,9 @@ import java.util.List; @@ -11,9 +11,9 @@ import java.util.List;
11 @Repository 11 @Repository
12 public interface UserInfoRepository extends JpaRepository<UserInfo, Integer>{ 12 public interface UserInfoRepository extends JpaRepository<UserInfo, Integer>{
13 13
14 - @Query("select count(user.uid) from UserInfo user")  
15 - int countAllBy(); 14 + @Query("select count(user.yoho_uid) from UserInfo user where yoho_uid >= ?1 and yoho_uid< ?2 ")
  15 + int countAllByYoho_uid(Integer yohoUid, Integer yohoUid2);
16 16
17 - @Query("select user from UserInfo user")  
18 - List<UserInfo> queryAllBy(Pageable pageable); 17 + @Query("select user from UserInfo user where yoho_uid >= ?1 and yoho_uid< ?2 order by yoho_uid asc")
  18 + List<UserInfo> queryAllByYoho_uid(Integer yohoUid, Integer yohoUid2,Pageable pageable);
19 } 19 }
@@ -5,23 +5,26 @@ import lombok.Data; @@ -5,23 +5,26 @@ import lombok.Data;
5 5
6 import javax.persistence.Column; 6 import javax.persistence.Column;
7 import javax.persistence.Entity; 7 import javax.persistence.Entity;
  8 +import javax.persistence.Id;
8 import javax.persistence.Table; 9 import javax.persistence.Table;
9 import java.io.Serializable; 10 import java.io.Serializable;
10 11
11 @Data 12 @Data
12 @Entity 13 @Entity
13 @Table(name = "user_info") 14 @Table(name = "user_info")
14 -public class UserInfo extends BaseEntity implements Serializable { 15 +public class UserInfo implements Serializable {
15 16
16 private static final long serialVersionUID = -7163835024334835792L; 17 private static final long serialVersionUID = -7163835024334835792L;
  18 +
17 @Column(name = "nick_name") 19 @Column(name = "nick_name")
18 - private String nickName; 20 + private String nick_name;
19 21
20 @Column(name = "head_ico") 22 @Column(name = "head_ico")
21 - private String headIco; 23 + private String head_ico;
22 24
  25 + @Id
23 @Column(name = "yoho_uid") 26 @Column(name = "yoho_uid")
24 - private Integer uid; 27 + private Integer yoho_uid;
25 28
26 @Column(name = "gender") 29 @Column(name = "gender")
27 private Integer gender; 30 private Integer gender;
@@ -3,6 +3,7 @@ package com.yoho.datasync.fullsync.controller; @@ -3,6 +3,7 @@ package com.yoho.datasync.fullsync.controller;
3 import com.yoho.datasync.fullsync.service.impl.UserInfoDataSyncServiceImpl; 3 import com.yoho.datasync.fullsync.service.impl.UserInfoDataSyncServiceImpl;
4 import org.springframework.beans.factory.annotation.Autowired; 4 import org.springframework.beans.factory.annotation.Autowired;
5 import org.springframework.web.bind.annotation.RequestMapping; 5 import org.springframework.web.bind.annotation.RequestMapping;
  6 +import org.springframework.web.bind.annotation.RequestParam;
6 import org.springframework.web.bind.annotation.RestController; 7 import org.springframework.web.bind.annotation.RestController;
7 8
8 9
@@ -12,11 +13,15 @@ public class UserInfoController { @@ -12,11 +13,15 @@ public class UserInfoController {
12 @Autowired 13 @Autowired
13 private UserInfoDataSyncServiceImpl userInfoDataSyncService; 14 private UserInfoDataSyncServiceImpl userInfoDataSyncService;
14 15
15 -  
16 -  
17 @RequestMapping("/syncUserInfoToES") 16 @RequestMapping("/syncUserInfoToES")
18 - public String syncArticle(){ 17 + public String syncArticle(@RequestParam("uid") Integer uid, @RequestParam("flag") Integer flag,@RequestParam("size") Integer size,
  18 + @RequestParam("pageSize") Integer pageSize){
  19 +
  20 + return userInfoDataSyncService.sync(uid, flag,size, pageSize);
  21 + }
19 22
20 - return userInfoDataSyncService.sync(); 23 + @RequestMapping("/search")
  24 + public String search(){
  25 + return userInfoDataSyncService.searchAllUserData();
21 } 26 }
22 } 27 }
@@ -2,7 +2,18 @@ package com.yoho.datasync.fullsync.service.impl; @@ -2,7 +2,18 @@ package com.yoho.datasync.fullsync.service.impl;
2 2
3 3
4 import com.alibaba.fastjson.JSON; 4 import com.alibaba.fastjson.JSON;
5 -import com.google.common.collect.Lists; 5 +import org.elasticsearch.action.delete.DeleteRequest;
  6 +import org.elasticsearch.action.delete.DeleteResponse;
  7 +import org.elasticsearch.action.search.SearchRequest;
  8 +import org.elasticsearch.action.search.SearchResponse;
  9 +import org.elasticsearch.index.query.BoolQueryBuilder;
  10 +import org.elasticsearch.index.query.QueryBuilder;
  11 +import org.elasticsearch.index.query.QueryBuilders;
  12 +import org.elasticsearch.index.reindex.BulkByScrollResponse;
  13 +import org.elasticsearch.index.reindex.DeleteByQueryRequest;
  14 +import org.elasticsearch.search.builder.SearchSourceBuilder;
  15 +import org.springframework.data.domain.PageRequest;
  16 +import org.springframework.data.domain.Pageable;
6 import com.yoho.datasync.fullsync.dal.repository.yhcommunity.model.UserInfo; 17 import com.yoho.datasync.fullsync.dal.repository.yhcommunity.model.UserInfo;
7 import org.elasticsearch.action.bulk.BulkRequest; 18 import org.elasticsearch.action.bulk.BulkRequest;
8 import org.elasticsearch.action.bulk.BulkResponse; 19 import org.elasticsearch.action.bulk.BulkResponse;
@@ -17,6 +28,7 @@ import org.slf4j.LoggerFactory; @@ -17,6 +28,7 @@ import org.slf4j.LoggerFactory;
17 import org.springframework.beans.factory.annotation.Autowired; 28 import org.springframework.beans.factory.annotation.Autowired;
18 import org.springframework.beans.factory.annotation.Qualifier; 29 import org.springframework.beans.factory.annotation.Qualifier;
19 import org.springframework.stereotype.Service; 30 import org.springframework.stereotype.Service;
  31 +import org.springframework.util.CollectionUtils;
20 32
21 import java.io.IOException; 33 import java.io.IOException;
22 import java.util.List; 34 import java.util.List;
@@ -33,34 +45,105 @@ public class UserInfoDataSyncServiceImpl { @@ -33,34 +45,105 @@ public class UserInfoDataSyncServiceImpl {
33 @Autowired 45 @Autowired
34 private UserInfoRepository userInfoRepository; 46 private UserInfoRepository userInfoRepository;
35 47
36 - public String sync(){  
37 - List list = Lists.newArrayList();  
38 - UserInfo userInfo = new UserInfo();  
39 - userInfo.setNickName("zhangsan");  
40 - userInfo.setHeadIco("www");  
41 - userInfo.setUid(500031572);  
42 - userInfo.setGender(1);  
43 - list.add(userInfo);  
44 - writeData(list);  
45 - return ""; 48 + public String sync(Integer uid,Integer flag, Integer size, Integer pageSize){
  49 + int endUid;
  50 + //flag =0 只跑真实用户uid flag=1 时跑mars 马甲信息
  51 + if(flag == 0){
  52 + endUid = 1000000000;
  53 + }else {
  54 + endUid = 2100000000;
  55 + }
  56 + int count = userInfoRepository.countAllByYoho_uid(uid, endUid);
  57 + int totalPage;
  58 + String result = "";
  59 + if(count > size){
  60 + totalPage = size/pageSize;
  61 + }else {
  62 + totalPage = count%pageSize==0 ? count/pageSize: count/pageSize+1;
  63 + }
  64 + logger.info("userInfo count is {}, size is {}, pagesize is {}, totalpage is {}",count,size,pageSize,totalPage);
  65 + for (int i = 0; i < totalPage; i++) {
  66 + Pageable pageReq = PageRequest.of(i, pageSize);
  67 + List<UserInfo> userInfoList = userInfoRepository.queryAllByYoho_uid(uid,endUid, pageReq);
  68 + if(!CollectionUtils.isEmpty(userInfoList)){
  69 + writeData(userInfoList);
  70 + }
  71 + try {
  72 + Thread.sleep(200);
  73 + } catch (InterruptedException e) {
  74 + e.printStackTrace();
  75 + }
  76 + if(i == totalPage-1){
  77 + result = String.valueOf(userInfoList.get(userInfoList.size()-1).getYoho_uid());
  78 + }
  79 + }
  80 +
  81 + return result;
46 } 82 }
47 83
48 private void writeData(List<UserInfo> userInfoList){ 84 private void writeData(List<UserInfo> userInfoList){
49 // 批量增加 85 // 批量增加
50 BulkRequest bulkAddRequest = new BulkRequest(); 86 BulkRequest bulkAddRequest = new BulkRequest();
51 - for (int i = 0; i < userInfoList.size(); i++) {  
52 - UserInfo info = userInfoList.get(i);  
53 - IndexRequest indexRequest = new IndexRequest("grass", "userInfo", ""); 87 + for (UserInfo info : userInfoList) {
  88 + IndexRequest indexRequest = new IndexRequest("grass", "userInfo", String.valueOf(info.getYoho_uid()));
54 indexRequest.source(JSON.toJSONString(info), XContentType.JSON); 89 indexRequest.source(JSON.toJSONString(info), XContentType.JSON);
55 bulkAddRequest.add(indexRequest); 90 bulkAddRequest.add(indexRequest);
56 } 91 }
57 BulkResponse bulkAddResponse = null; 92 BulkResponse bulkAddResponse = null;
58 try { 93 try {
59 bulkAddResponse = rhlClient.bulk(bulkAddRequest, RequestOptions.DEFAULT); 94 bulkAddResponse = rhlClient.bulk(bulkAddRequest, RequestOptions.DEFAULT);
60 - logger.info("bulkAdd: {}" , JSON.toJSONString(bulkAddResponse)); 95 + logger.info("writeData: {}" , JSON.toJSONString(bulkAddResponse));
  96 +
  97 + } catch (IOException e) {
  98 + logger.info("writeData error :{}" ,e);
  99 + }
  100 + }
61 101
  102 + private void deleteData(){
  103 + try {
  104 + DeleteRequest deleteRequest = new DeleteRequest("grass", "userInfo", "");
  105 + DeleteResponse response = rhlClient.delete(deleteRequest, RequestOptions.DEFAULT);
  106 + logger.info("deleteData: {}" , JSON.toJSONString(response));
62 } catch (IOException e) { 107 } catch (IOException e) {
63 e.printStackTrace(); 108 e.printStackTrace();
64 } 109 }
65 } 110 }
  111 +
  112 + public void deleteAllUserData(){
  113 + try {
  114 + BoolQueryBuilder queryBuilder = new BoolQueryBuilder();
  115 + queryBuilder.must(QueryBuilders.matchAllQuery());
  116 + DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest("grass");
  117 + deleteRequest.setQuery(queryBuilder);
  118 + deleteRequest.setDocTypes("userInfo");
  119 + BulkByScrollResponse response = rhlClient.deleteByQuery(deleteRequest, RequestOptions.DEFAULT);
  120 + logger.info("deleteAllUserData: {}" , JSON.toJSONString(response));
  121 + } catch (IOException e) {
  122 + e.printStackTrace();
  123 + }
  124 + }
  125 +
  126 + public String searchAllUserData(){
  127 + try {
  128 + BoolQueryBuilder queryBuilder = new BoolQueryBuilder();
  129 + queryBuilder.must(QueryBuilders.matchAllQuery());
  130 +
  131 + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
  132 + sourceBuilder.query(queryBuilder);
  133 + sourceBuilder.from(0);
  134 + sourceBuilder.size(200); // 获取记录数,默认10
  135 + SearchRequest searchRequest = new SearchRequest("grass");
  136 + searchRequest.source(sourceBuilder);
  137 + searchRequest.types("userInfo");
  138 +
  139 + SearchResponse response = rhlClient.search(searchRequest, RequestOptions.DEFAULT);
  140 + logger.info("searchAllUserData: {}" , JSON.toJSONString(response));
  141 + return JSON.toJSONString(response.getHits().totalHits);
  142 + } catch (IOException e) {
  143 + e.printStackTrace();
  144 + }
  145 + return "";
  146 + }
  147 +
  148 +
66 } 149 }
@@ -24,5 +24,18 @@ spring: @@ -24,5 +24,18 @@ spring:
24 password: Z5BMngayHLUxyWLSv6+koA== 24 password: Z5BMngayHLUxyWLSv6+koA==
25 driver-class-name: com.mysql.jdbc.Driver 25 driver-class-name: com.mysql.jdbc.Driver
26 26
  27 + yhCommunity:
  28 + url: jdbc:mysql://192.168.102.219:3306/yh_community?useUnicode=true&amp;characterEncoding=UTF-8
  29 + username: yh_test
  30 + password: 9nm0icOwt6bMHjMusIfMLw==
  31 + driver-class-name: com.mysql.jdbc.Driver
  32 +
  33 +elasticSearch:
  34 + host: 192.168.104.246
  35 + port: 9200
  36 + client:
  37 + connectNum: 10
  38 + connectPerRoute: 50
  39 +
27 logging: 40 logging:
28 config: classpath:logback-boot.xml 41 config: classpath:logback-boot.xml