|
|
package com.yoho.datasync.fullsync.service.impl;
|
|
|
|
|
|
import com.alibaba.fastjson.JSONObject;
|
|
|
import com.google.common.collect.Lists;
|
|
|
import com.yoho.datasync.fullsync.constant.UserAttentionConstant;
|
|
|
import com.yoho.datasync.fullsync.constant.UserCommentConstant;
|
|
|
import com.yoho.datasync.fullsync.constant.UserPraiseConstant;
|
|
|
import com.yoho.datasync.fullsync.dal.repository.BaseEntity;
|
|
|
import com.yoho.datasync.fullsync.dal.repository.grass.GrassArticlePraiseRepository;
|
|
|
import com.yoho.datasync.fullsync.dal.repository.grass.GrassArticleRepository;
|
|
|
import com.yoho.datasync.fullsync.dal.repository.grass.model.GrassArticle;
|
|
|
import com.yoho.datasync.fullsync.dal.repository.grass.model.GrassArticlePraise;
|
|
|
import com.yoho.datasync.fullsync.dal.repository.pcms.PublicArticleRepository;
|
|
|
import com.yoho.datasync.fullsync.dal.repository.pcms.PublicUserPraiseRepository;
|
|
|
import com.yoho.datasync.fullsync.dal.repository.pcms.model.PublicArticle;
|
|
|
import com.yoho.datasync.fullsync.dal.repository.pcms.model.PublicUserPraise;
|
|
|
import com.yoho.datasync.fullsync.dal.repository.grass.*;
|
|
|
import com.yoho.datasync.fullsync.dal.repository.grass.model.*;
|
|
|
import com.yoho.datasync.fullsync.dal.repository.pcms.*;
|
|
|
import com.yoho.datasync.fullsync.dal.repository.pcms.model.*;
|
|
|
import com.yoho.datasync.fullsync.service.IGrassInteractiveDataSyncService;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
...
|
...
|
@@ -48,6 +47,24 @@ public class GrassInteractiveDataSyncServiceImpl implements IGrassInteractiveDat |
|
|
@Autowired
|
|
|
private PublicArticleRepository publicArticleRepository;
|
|
|
|
|
|
@Autowired
|
|
|
private UserFavoriteArticleRepository userFavoriteArticleRepository;
|
|
|
|
|
|
@Autowired
|
|
|
private PublicUserFavoriteRepository publicUserFavoriteRepository;
|
|
|
|
|
|
@Autowired
|
|
|
private GrassUserAttentionRepository grassUserAttentionRepository;
|
|
|
|
|
|
@Autowired
|
|
|
private PublicUserAttentionRepository publicUserAttentionRepository;
|
|
|
|
|
|
@Autowired
|
|
|
private GrassArticleCommentRepository grassArticleCommentRepository;
|
|
|
|
|
|
@Autowired
|
|
|
private PublicUserCommentRepository publicUserCommentRepository;
|
|
|
|
|
|
private static final int PRAISE = 1;
|
|
|
private static final int COMMENT = 2;
|
|
|
private static final int FAVORITE = 3;
|
...
|
...
|
@@ -57,12 +74,21 @@ public class GrassInteractiveDataSyncServiceImpl implements IGrassInteractiveDat |
|
|
private static final int SLAVE = 2;
|
|
|
|
|
|
@Override
|
|
|
public String syncInteractiveData(Long startTime, Long endTime, Integer timeType, Integer syncType) {
|
|
|
public String syncInteractiveData(Integer startTime, Integer endTime, Integer timeType, Integer syncType) {
|
|
|
// syncType : 1、同步点赞 2、 同步评论 3、 同步收藏 4、 同步关注
|
|
|
switch (syncType){
|
|
|
case PRAISE:
|
|
|
syncPraise(startTime, endTime, timeType);
|
|
|
break;
|
|
|
case FAVORITE:
|
|
|
syncFavorite(startTime, endTime, timeType);
|
|
|
break;
|
|
|
case ATTENTION:
|
|
|
syncUserAttention(startTime, endTime);
|
|
|
break;
|
|
|
case COMMENT:
|
|
|
syncComments(startTime, endTime,timeType);
|
|
|
break;
|
|
|
default:
|
|
|
break;
|
|
|
|
...
|
...
|
@@ -70,7 +96,7 @@ public class GrassInteractiveDataSyncServiceImpl implements IGrassInteractiveDat |
|
|
return "success";
|
|
|
}
|
|
|
|
|
|
private void syncPraise(Long startTime, Long endTime, Integer timeType){
|
|
|
private void syncPraise(Integer startTime, Integer endTime, Integer timeType){
|
|
|
switch (timeType){
|
|
|
case MASTER:
|
|
|
syncPraiseByArticle(startTime,endTime);
|
...
|
...
|
@@ -81,11 +107,35 @@ public class GrassInteractiveDataSyncServiceImpl implements IGrassInteractiveDat |
|
|
}
|
|
|
}
|
|
|
|
|
|
private void syncPraiseByArticle(Long startTime, Long endTime){
|
|
|
private void syncFavorite(Integer startTime, Integer endTime, Integer timeType){
|
|
|
switch (timeType){
|
|
|
case MASTER:
|
|
|
syncFavoriteByArticle(startTime,endTime);
|
|
|
break;
|
|
|
case SLAVE:
|
|
|
syncFavoriteBySelf(startTime,endTime);
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
private void syncComments(Integer startTime, Integer endTime, Integer timeType){
|
|
|
switch (timeType){
|
|
|
case MASTER:
|
|
|
syncCommentsByArticle(startTime,endTime);
|
|
|
break;
|
|
|
case SLAVE:
|
|
|
syncCommentsBySelf(startTime,endTime);
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
private void syncPraiseByArticle(Integer startTime, Integer endTime){
|
|
|
logger.info("syncPraiseByArticle begin, startTime is {}, endTime is {}", startTime, endTime);
|
|
|
ExecutorService es = Executors.newSingleThreadExecutor();
|
|
|
int pageSize = 100;
|
|
|
int total = grassArticleRepository.countByCreateTimeAfterAndCreateTimeBeforeAndArticleTypeInAndAuthStatusIn(startTime, endTime,needSyncArticleType,needSyncArticleStatus);
|
|
|
Long startTimeLong = (long)startTime * 1000;
|
|
|
Long endTimeLong = (long)endTime * 1000;
|
|
|
int total = grassArticleRepository.countByCreateTimeAfterAndCreateTimeBeforeAndArticleTypeInAndAuthStatusIn(startTimeLong, endTimeLong,needSyncArticleType,needSyncArticleStatus);
|
|
|
logger.info("syncPraiseByArticle get grassArticle count is {}", total);
|
|
|
int totalPage = total % pageSize ==0 ? total / pageSize : (total / pageSize)+1;
|
|
|
try{
|
...
|
...
|
@@ -93,7 +143,7 @@ public class GrassInteractiveDataSyncServiceImpl implements IGrassInteractiveDat |
|
|
for (int index=0; index < totalPage; index++) {
|
|
|
int indexPage = index;
|
|
|
es.execute(() -> {
|
|
|
syncPraiseByArticlePage(startTime, endTime, indexPage, pageSize);
|
|
|
syncPraiseByArticlePage(startTimeLong, endTimeLong, indexPage, pageSize);
|
|
|
});
|
|
|
}
|
|
|
}finally {
|
...
|
...
|
@@ -107,9 +157,9 @@ public class GrassInteractiveDataSyncServiceImpl implements IGrassInteractiveDat |
|
|
startTime, endTime,needSyncArticleType,needSyncArticleStatus,pageReq);
|
|
|
|
|
|
List<Integer> grassArticleIds = grassArticleList.stream().map(BaseEntity::getId).collect(Collectors.toList());
|
|
|
List<PublicArticle> publicUserPraiseList = publicArticleRepository.findAllByRelateIdInAndArticleTypeIn(grassArticleIds, needSyncArticleType);
|
|
|
List<PublicArticle> publicArticleList = publicArticleRepository.findAllByRelateIdInAndArticleTypeIn(grassArticleIds, needSyncArticleType);
|
|
|
Map<Integer, Integer> publicArticleIdAndGrassIdMap = new HashMap<>();
|
|
|
publicUserPraiseList.forEach(publicArticle -> {
|
|
|
publicArticleList.forEach(publicArticle -> {
|
|
|
publicArticleIdAndGrassIdMap.put(publicArticle.getRelateId(), publicArticle.getId());
|
|
|
});
|
|
|
List<GrassArticlePraise> grassArticlePraiseList = grassArticlePraiseRepository.findAllByArticleIdIn(grassArticleIds);
|
...
|
...
|
@@ -118,14 +168,18 @@ public class GrassInteractiveDataSyncServiceImpl implements IGrassInteractiveDat |
|
|
publicUserPraiseRepository.saveAll(needSyncData);
|
|
|
}
|
|
|
|
|
|
private List<PublicUserPraise> buildPublicUserPraise( Map<Integer, Integer> publicArticleIdAndGrassIdMap, List<GrassArticlePraise> grassArticlePraiseList){
|
|
|
private List<PublicUserPraise> buildPublicUserPraise(Map<Integer, Integer> publicArticleIdAndGrassIdMap, List<GrassArticlePraise> grassArticlePraiseList){
|
|
|
List<PublicUserPraise> resultList = Lists.newArrayList();
|
|
|
if(CollectionUtils.isEmpty(grassArticlePraiseList)){
|
|
|
return resultList;
|
|
|
}
|
|
|
grassArticlePraiseList.forEach(grassArticlePraise -> {
|
|
|
Integer targetId = publicArticleIdAndGrassIdMap.get(grassArticlePraise.getArticleId());
|
|
|
if(targetId == null){
|
|
|
return;
|
|
|
}
|
|
|
PublicUserPraise publicUserPraise = new PublicUserPraise();
|
|
|
publicUserPraise.setTargetId(publicArticleIdAndGrassIdMap.get(grassArticlePraise.getArticleId()));
|
|
|
publicUserPraise.setTargetId(targetId);
|
|
|
publicUserPraise.setUid(grassArticlePraise.getUid());
|
|
|
publicUserPraise.setPraiseType(UserPraiseConstant.ARTICLE_PRAISE);
|
|
|
publicUserPraise.setStatus(convertPraiseStatus(grassArticlePraise.getStatus()));
|
...
|
...
|
@@ -149,13 +203,11 @@ public class GrassInteractiveDataSyncServiceImpl implements IGrassInteractiveDat |
|
|
}
|
|
|
}
|
|
|
|
|
|
private void syncPraiseBySelf(Long startTime, Long endTime){
|
|
|
private void syncPraiseBySelf(Integer startTime, Integer endTime){
|
|
|
logger.info("syncPraiseBySelf begin, startTime is {}, endTime is {}", startTime, endTime);
|
|
|
ExecutorService es = Executors.newSingleThreadExecutor();
|
|
|
int pageSize = 100;
|
|
|
Integer startTimeInt = startTime.intValue();
|
|
|
Integer endTimeInt = endTime.intValue();
|
|
|
int total = grassArticlePraiseRepository.countByCreateTimeBetween(startTimeInt, endTimeInt);
|
|
|
int total = grassArticlePraiseRepository.countByCreateTimeBetween(startTime, endTime);
|
|
|
logger.info("syncPraiseBySelf get grassArticlePraise count is {}", total);
|
|
|
int totalPage = total % pageSize ==0 ? total / pageSize : (total / pageSize)+1;
|
|
|
try{
|
...
|
...
|
@@ -163,7 +215,7 @@ public class GrassInteractiveDataSyncServiceImpl implements IGrassInteractiveDat |
|
|
for (int index=0; index < totalPage; index++) {
|
|
|
int indexPage = index;
|
|
|
es.execute(() -> {
|
|
|
syncPraiseBySelfPage(startTimeInt, endTimeInt, indexPage, pageSize);
|
|
|
syncPraiseBySelfPage(startTime, endTime, indexPage, pageSize);
|
|
|
});
|
|
|
}
|
|
|
}finally {
|
...
|
...
|
@@ -173,16 +225,455 @@ public class GrassInteractiveDataSyncServiceImpl implements IGrassInteractiveDat |
|
|
|
|
|
private void syncPraiseBySelfPage(int startTime, int endTime, int indexPage, int pageSize){
|
|
|
Pageable pageReq = PageRequest.of(indexPage, pageSize);
|
|
|
List<GrassArticlePraise> grassArticleList = grassArticlePraiseRepository.findAllByCreateTimeBetween(
|
|
|
List<GrassArticlePraise> grassArticlePraiseList = grassArticlePraiseRepository.findAllByCreateTimeBetween(
|
|
|
startTime, endTime,pageReq);
|
|
|
|
|
|
List<Integer> grassArticleIds = grassArticleList.stream().map(BaseEntity::getId).collect(Collectors.toList());
|
|
|
List<Integer> grassArticleIds = grassArticlePraiseList.stream().map(GrassArticlePraise::getArticleId).collect(Collectors.toList());
|
|
|
List<PublicArticle> publicUserPraiseList = publicArticleRepository.findAllByRelateIdInAndArticleTypeIn(grassArticleIds, needSyncArticleType);
|
|
|
Map<Integer, Integer> publicArticleIdAndGrassIdMap = new HashMap<>();
|
|
|
publicUserPraiseList.forEach(publicArticle -> {
|
|
|
publicArticleIdAndGrassIdMap.put(publicArticle.getRelateId(), publicArticle.getId());
|
|
|
});
|
|
|
List<PublicUserPraise> needSyncData = buildPublicUserPraise(publicArticleIdAndGrassIdMap, grassArticleList);
|
|
|
List<PublicUserPraise> needSyncData = buildPublicUserPraise(publicArticleIdAndGrassIdMap, grassArticlePraiseList);
|
|
|
publicUserPraiseRepository.saveAll(needSyncData);
|
|
|
}
|
|
|
|
|
|
|
|
|
private void syncFavoriteByArticle(Integer startTime, Integer endTime){
|
|
|
logger.info("syncFavoriteByArticle begin, startTime is {}, endTime is {}", startTime, endTime);
|
|
|
ExecutorService es = Executors.newSingleThreadExecutor();
|
|
|
Long startTimeLong = (long)startTime * 1000;
|
|
|
Long endTimeLong = (long)endTime * 1000;
|
|
|
int pageSize = 100;
|
|
|
int total = grassArticleRepository.countByCreateTimeAfterAndCreateTimeBeforeAndArticleTypeInAndAuthStatusIn(startTimeLong, endTimeLong,needSyncArticleType,needSyncArticleStatus);
|
|
|
logger.info("syncFavoriteByArticle get grassArticle count is {}", total);
|
|
|
int totalPage = total % pageSize ==0 ? total / pageSize : (total / pageSize)+1;
|
|
|
try{
|
|
|
//每次查询100条,增加detail
|
|
|
for (int index=0; index < totalPage; index++) {
|
|
|
int indexPage = index;
|
|
|
es.execute(() -> {
|
|
|
syncFavoriteByArticlePage(startTimeLong, endTimeLong, indexPage, pageSize);
|
|
|
});
|
|
|
}
|
|
|
}finally {
|
|
|
es.shutdown();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
private void syncFavoriteByArticlePage(Long startTime, Long endTime, int indexPage, int pageSize){
|
|
|
Pageable pageReq = PageRequest.of(indexPage, pageSize);
|
|
|
List<GrassArticle> grassArticleList = grassArticleRepository.findAllByCreateTimeAfterAndCreateTimeBeforeAndArticleTypeInAndAuthStatusIn(
|
|
|
startTime, endTime,needSyncArticleType,needSyncArticleStatus,pageReq);
|
|
|
|
|
|
List<Integer> grassArticleIds = grassArticleList.stream().map(BaseEntity::getId).collect(Collectors.toList());
|
|
|
List<PublicArticle> publicArticleList = publicArticleRepository.findAllByRelateIdInAndArticleTypeIn(grassArticleIds, needSyncArticleType);
|
|
|
Map<Integer, Integer> publicArticleIdAndGrassIdMap = new HashMap<>();
|
|
|
publicArticleList.forEach(publicArticle -> {
|
|
|
publicArticleIdAndGrassIdMap.put(publicArticle.getRelateId(), publicArticle.getId());
|
|
|
});
|
|
|
List<UserFavoriteArticle> userFavoriteArticleList = userFavoriteArticleRepository.findAllByArticleIdIn(grassArticleIds);
|
|
|
logger.info("syncFavoriteByArticlePage find userFavoriteArticle num is {}", userFavoriteArticleList.size());
|
|
|
List<PublicUserFavorite> needSyncData = buildPublicUserFavorite(publicArticleIdAndGrassIdMap, userFavoriteArticleList);
|
|
|
publicUserFavoriteRepository.saveAll(needSyncData);
|
|
|
}
|
|
|
|
|
|
|
|
|
private List<PublicUserFavorite> buildPublicUserFavorite(Map<Integer, Integer> publicArticleIdAndGrassIdMap, List<UserFavoriteArticle> userFavoriteArticleList){
|
|
|
List<PublicUserFavorite> resultList = Lists.newArrayList();
|
|
|
if(CollectionUtils.isEmpty(userFavoriteArticleList)){
|
|
|
return resultList;
|
|
|
}
|
|
|
userFavoriteArticleList.forEach(userFavoriteArticle -> {
|
|
|
Integer targetId = publicArticleIdAndGrassIdMap.get(userFavoriteArticle.getArticleId());
|
|
|
if(null == targetId){
|
|
|
return;
|
|
|
}
|
|
|
PublicUserFavorite publicUserFavorite = new PublicUserFavorite();
|
|
|
publicUserFavorite.setUid(userFavoriteArticle.getUid());
|
|
|
publicUserFavorite.setStatus(1);
|
|
|
publicUserFavorite.setSrcChannel(1);
|
|
|
publicUserFavorite.setTargetId(targetId);
|
|
|
publicUserFavorite.setCreateTime((long)(userFavoriteArticle.getCreateTime()) * 1000);
|
|
|
publicUserFavorite.setFavoriteType(1);
|
|
|
resultList.add(publicUserFavorite);
|
|
|
});
|
|
|
|
|
|
return resultList;
|
|
|
}
|
|
|
|
|
|
private void syncFavoriteBySelf(Integer startTime, Integer endTime){
|
|
|
logger.info("syncFavoriteBySelf begin, startTime is {}, endTime is {}", startTime, endTime);
|
|
|
ExecutorService es = Executors.newSingleThreadExecutor();
|
|
|
int pageSize = 100;
|
|
|
int total = userFavoriteArticleRepository.countByCreateTimeBetween(startTime, endTime);
|
|
|
logger.info("syncFavoriteBySelf get grassArticlePraise count is {}", total);
|
|
|
int totalPage = total % pageSize ==0 ? total / pageSize : (total / pageSize)+1;
|
|
|
try{
|
|
|
//每次查询100条,增加detail
|
|
|
for (int index=0; index < totalPage; index++) {
|
|
|
int indexPage = index;
|
|
|
es.execute(() -> {
|
|
|
syncFavoriteBySelfPage(startTime, endTime, indexPage, pageSize);
|
|
|
});
|
|
|
}
|
|
|
}finally {
|
|
|
es.shutdown();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
private void syncFavoriteBySelfPage(int startTime, int endTime, int indexPage, int pageSize){
|
|
|
Pageable pageReq = PageRequest.of(indexPage, pageSize);
|
|
|
List<UserFavoriteArticle> userFavoriteArticleList = userFavoriteArticleRepository.findAllByCreateTimeBetween(
|
|
|
startTime, endTime,pageReq);
|
|
|
|
|
|
List<Integer> grassArticleIds = userFavoriteArticleList.stream().map(UserFavoriteArticle::getArticleId).collect(Collectors.toList());
|
|
|
List<PublicArticle> publicArticleList = publicArticleRepository.findAllByRelateIdInAndArticleTypeIn(grassArticleIds, needSyncArticleType);
|
|
|
Map<Integer, Integer> publicArticleIdAndGrassIdMap = new HashMap<>();
|
|
|
publicArticleList.forEach(publicArticle -> {
|
|
|
publicArticleIdAndGrassIdMap.put(publicArticle.getRelateId(), publicArticle.getId());
|
|
|
});
|
|
|
List<PublicUserFavorite> needSyncData = buildPublicUserFavorite(publicArticleIdAndGrassIdMap, userFavoriteArticleList);
|
|
|
publicUserFavoriteRepository.saveAll(needSyncData);
|
|
|
}
|
|
|
|
|
|
private void syncUserAttention(Integer startTime, Integer endTime){
|
|
|
logger.info("syncUserAttention begin, startTime is {}, endTime is {}", startTime, endTime);
|
|
|
ExecutorService es = Executors.newSingleThreadExecutor();
|
|
|
int pageSize = 100;
|
|
|
int total = grassUserAttentionRepository.countByCreateTimeBetween(startTime, endTime);
|
|
|
logger.info("syncUserAttention get attention count is {}", total);
|
|
|
int totalPage = total % pageSize ==0 ? total / pageSize : (total / pageSize)+1;
|
|
|
try{
|
|
|
//每次查询100条,增加detail
|
|
|
for (int index=0; index < totalPage; index++) {
|
|
|
int indexPage = index;
|
|
|
es.execute(() -> {
|
|
|
syncUserAttentionPage(startTime, endTime, indexPage, pageSize);
|
|
|
});
|
|
|
}
|
|
|
}finally {
|
|
|
es.shutdown();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
private void syncUserAttentionPage(int startTime, int endTime, int indexPage, int pageSize){
|
|
|
Pageable pageReq = PageRequest.of(indexPage, pageSize);
|
|
|
List<GrassUserAttention> grassUserAttentionList = grassUserAttentionRepository.findAllByCreateTimeBetween(
|
|
|
startTime, endTime,pageReq);
|
|
|
|
|
|
List<PublicUserAttention> needSyncData = buildPublicUserAttention(grassUserAttentionList);
|
|
|
publicUserAttentionRepository.saveAll(needSyncData);
|
|
|
}
|
|
|
|
|
|
private List<PublicUserAttention> buildPublicUserAttention( List<GrassUserAttention> grassUserAttentionList){
|
|
|
List<PublicUserAttention> resultList = Lists.newArrayList();
|
|
|
if(CollectionUtils.isEmpty(grassUserAttentionList)){
|
|
|
return resultList;
|
|
|
}
|
|
|
grassUserAttentionList.forEach(grassUserAttention -> {
|
|
|
PublicUserAttention publicUserAttention = new PublicUserAttention();
|
|
|
publicUserAttention.setUid(grassUserAttention.getUid());
|
|
|
publicUserAttention.setStatus(convertAttentionStatus(grassUserAttention.getStatus()));
|
|
|
publicUserAttention.setSrcChannel(1);
|
|
|
publicUserAttention.setTargetId(grassUserAttention.getFollowUid());
|
|
|
publicUserAttention.setCreateTime((long)(grassUserAttention.getCreateTime()) *1000);
|
|
|
publicUserAttention.setUpdateTime((long)(Optional.ofNullable(grassUserAttention.getUpdateTime()).orElse(0)) *1000);
|
|
|
publicUserAttention.setAuthorType(grassUserAttention.getAuthorType());
|
|
|
publicUserAttention.setAttentionType(1);
|
|
|
resultList.add(publicUserAttention);
|
|
|
});
|
|
|
|
|
|
return resultList;
|
|
|
}
|
|
|
|
|
|
private int convertAttentionStatus(int grassAttentionStatus){
|
|
|
switch (grassAttentionStatus){
|
|
|
case UserAttentionConstant.GRASS_HAS_ATTENTION:
|
|
|
return UserAttentionConstant.PUBLIC_HAS_ATTENTION;
|
|
|
case UserAttentionConstant.GRASS_CANCEL_ATTENTION:
|
|
|
return UserAttentionConstant.PUBLIC_CANCEL_ATTENTION;
|
|
|
default:
|
|
|
return UserAttentionConstant.PUBLIC_CANCEL_ATTENTION;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
/**
|
|
|
* 根据文章主表的时间区间同步 评论
|
|
|
* 数据构成主要是:1 文章的所有根评论
|
|
|
* 2 所有子评论
|
|
|
* @param startTime
|
|
|
* @param endTime
|
|
|
*/
|
|
|
private void syncCommentsByArticle(Integer startTime, Integer endTime){
|
|
|
logger.info("syncCommentsByArticle begin, startTime is {}, endTime is {}", startTime, endTime);
|
|
|
ExecutorService es = Executors.newSingleThreadExecutor();
|
|
|
int pageSize = 100;
|
|
|
Long startTimeLong = (long)startTime * 1000;
|
|
|
Long endTimeLong = (long)endTime * 1000;
|
|
|
int total = grassArticleRepository.countByCreateTimeAfterAndCreateTimeBeforeAndArticleTypeInAndAuthStatusIn(startTimeLong, endTimeLong,needSyncArticleType,needSyncArticleStatus);
|
|
|
logger.info("syncCommentsByArticle get grassArticle count is {}", total);
|
|
|
int totalPage = total % pageSize ==0 ? total / pageSize : (total / pageSize)+1;
|
|
|
try{
|
|
|
//每次查询100条,增加detail
|
|
|
for (int index=0; index < totalPage; index++) {
|
|
|
int indexPage = index;
|
|
|
es.execute(() -> {
|
|
|
syncCommentsByArticlePage(startTimeLong, endTimeLong, indexPage, pageSize);
|
|
|
});
|
|
|
}
|
|
|
}finally {
|
|
|
es.shutdown();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 根据评论表自己的时间区间同步 评论
|
|
|
* 数据构成主要是:1 时间区间内所有新产生的根评论
|
|
|
* 2 时间区间内所有新产生的子评论
|
|
|
* 3 非时间区间内的根评论的子评论
|
|
|
* @param startTime
|
|
|
* @param endTime
|
|
|
*/
|
|
|
private void syncCommentsBySelf(Integer startTime, Integer endTime){
|
|
|
logger.info("syncCommentsBySelf begin, startTime is {}, endTime is {}", startTime, endTime);
|
|
|
ExecutorService es = Executors.newSingleThreadExecutor();
|
|
|
int pageSize = 100;
|
|
|
int total = grassArticleCommentRepository.countByCreateTimeBetween(startTime, endTime);
|
|
|
logger.info("syncCommentsBySelf get grassArticlePraise count is {}", total);
|
|
|
int totalPage = total % pageSize ==0 ? total / pageSize : (total / pageSize)+1;
|
|
|
try{
|
|
|
//每次查询100条,增加detail
|
|
|
for (int index=0; index < totalPage; index++) {
|
|
|
int indexPage = index;
|
|
|
es.execute(() -> {
|
|
|
syncCommentsBySelfPage(startTime, endTime, indexPage, pageSize);
|
|
|
});
|
|
|
}
|
|
|
}finally {
|
|
|
es.shutdown();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 根据评论表自己的时间区间同步 评论
|
|
|
* 数据构成主要是:1 时间区间内所有新产生的根评论
|
|
|
* 2 时间区间内所有新产生的子评论
|
|
|
* 3 非时间区间内的根评论的子评论
|
|
|
* @param startTime
|
|
|
* @param endTime
|
|
|
*/
|
|
|
private void syncCommentsBySelfPage(int startTime, int endTime, int indexPage, int pageSize){
|
|
|
Pageable pageReq = PageRequest.of(indexPage, pageSize);
|
|
|
List<GrassArticleComment> grassArticleCommentsList = grassArticleCommentRepository.findAllByCreateTimeBetween(
|
|
|
startTime, endTime,pageReq);
|
|
|
|
|
|
List<Integer> grassArticleIds = grassArticleCommentsList.stream().map(GrassArticleComment::getDestId).collect(Collectors.toList());
|
|
|
List<PublicArticle> publicArticleList = publicArticleRepository.findAllByRelateIdInAndArticleTypeIn(grassArticleIds, needSyncArticleType);
|
|
|
Map<Integer, Integer> publicArticleIdAndGrassIdMap = new HashMap<>();
|
|
|
publicArticleList.forEach(publicArticle -> {
|
|
|
publicArticleIdAndGrassIdMap.put(publicArticle.getRelateId(), publicArticle.getId());
|
|
|
});
|
|
|
|
|
|
//先把时间区间内所有的新产生的根评论同步
|
|
|
List<GrassArticleComment> grassRootArticleComments = grassArticleCommentsList.stream().filter(grassArticleComment -> grassArticleComment.getParentId() == null).collect(Collectors.toList());;
|
|
|
List<PublicUserComment> newRootPublicComments = buildRootComments(publicArticleIdAndGrassIdMap, grassRootArticleComments);
|
|
|
List<PublicUserComment> rootPublicComments = publicUserCommentRepository.saveAll(newRootPublicComments);
|
|
|
|
|
|
//同步之后,查询所有新产生子评论的父评论id
|
|
|
//要同步的子评论数据中包含两部分 1 父评论已经存在在新表中 刚刚同步的 或者是时间区间之前产生的
|
|
|
// 2 父评论也在这一部分子评论中 此时需要递归进行插入
|
|
|
List<GrassArticleComment> grassChildrenArticleComments = grassArticleCommentsList.stream().filter(grassArticleComment -> grassArticleComment.getParentId() != null).collect(Collectors.toList());
|
|
|
List<Integer> rootIds = grassChildrenArticleComments.stream().map(GrassArticleComment::getRootId).distinct().collect(Collectors.toList());
|
|
|
List<Integer> parentIds = grassChildrenArticleComments.stream().map(GrassArticleComment::getParentId).distinct().collect(Collectors.toList());
|
|
|
rootIds.addAll(parentIds);
|
|
|
List<Integer> relatedIds = rootIds.stream().distinct().collect(Collectors.toList());
|
|
|
List<PublicUserComment> publicRootIds = publicUserCommentRepository.findAllByRelateIdInAndSrcChannelIs(relatedIds, 1);
|
|
|
Map<Integer, Integer> relateIdMap = new HashMap<>();
|
|
|
publicRootIds.forEach(publicUserComment -> {
|
|
|
relateIdMap.put(publicUserComment.getRelateId(), publicUserComment.getId());
|
|
|
});
|
|
|
|
|
|
buildAndSaveChildrenCommentBySelf(publicArticleIdAndGrassIdMap, grassChildrenArticleComments, relateIdMap);
|
|
|
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 根据文章主表的时间区间同步 评论
|
|
|
* 数据构成主要是:1 文章的所有根评论
|
|
|
* 2 所有子评论
|
|
|
* @param startTime
|
|
|
* @param endTime
|
|
|
*/
|
|
|
private void syncCommentsByArticlePage(Long startTime, Long endTime, int indexPage, int pageSize){
|
|
|
Pageable pageReq = PageRequest.of(indexPage, pageSize);
|
|
|
List<GrassArticle> grassArticleList = grassArticleRepository.findAllByCreateTimeAfterAndCreateTimeBeforeAndArticleTypeInAndAuthStatusIn(
|
|
|
startTime, endTime,needSyncArticleType,needSyncArticleStatus,pageReq);
|
|
|
|
|
|
List<Integer> grassArticleIds = grassArticleList.stream().map(BaseEntity::getId).collect(Collectors.toList());
|
|
|
List<PublicArticle> publicArticleList = publicArticleRepository.findAllByRelateIdInAndArticleTypeIn(grassArticleIds, needSyncArticleType);
|
|
|
Map<Integer, Integer> publicArticleIdAndGrassIdMap = new HashMap<>();
|
|
|
publicArticleList.forEach(publicArticle -> {
|
|
|
publicArticleIdAndGrassIdMap.put(publicArticle.getRelateId(), publicArticle.getId());
|
|
|
});
|
|
|
|
|
|
//先把所有根评论同步
|
|
|
List<PublicUserComment> roots = queryAndBuildRootComment(grassArticleIds, publicArticleIdAndGrassIdMap);
|
|
|
List<PublicUserComment> rootPublicComments = publicUserCommentRepository.saveAll(roots);
|
|
|
//审核未通过的不查
|
|
|
List<GrassArticleComment> childrenComments = grassArticleCommentRepository.findAllByDestIdInAndArticleTypeInAndStatusIsNotAndParentIdIsNotNull(grassArticleIds,
|
|
|
needSyncArticleType, 2);
|
|
|
Map<Integer, Integer> rootIdMap = new HashMap<>();
|
|
|
rootPublicComments.forEach(publicUserComment -> {
|
|
|
rootIdMap.put(publicUserComment.getRelateId(), publicUserComment.getId());
|
|
|
});
|
|
|
|
|
|
//同步所有层级的子评论
|
|
|
buildAndSaveChildrenComment(publicArticleIdAndGrassIdMap,rootPublicComments, childrenComments, rootIdMap);
|
|
|
|
|
|
}
|
|
|
|
|
|
private List<PublicUserComment> queryAndBuildRootComment(List<Integer> grassArticleIds, Map<Integer, Integer> publicArticleIdAndGrassIdMap){
|
|
|
List<PublicUserComment> result;
|
|
|
|
|
|
//审核未通过的不查
|
|
|
List<GrassArticleComment> rootComments = grassArticleCommentRepository.findAllByDestIdInAndArticleTypeInAndStatusIsNotAndParentIdIsNull(grassArticleIds,
|
|
|
needSyncArticleType, 2);
|
|
|
|
|
|
result = buildRootComments(publicArticleIdAndGrassIdMap, rootComments);
|
|
|
return result;
|
|
|
}
|
|
|
|
|
|
private List<PublicUserComment> buildRootComments( Map<Integer, Integer> publicArticleIdAndGrassIdMap, List<GrassArticleComment> grassRootArticleComments){
|
|
|
List<PublicUserComment> result = Lists.newArrayList();
|
|
|
grassRootArticleComments.forEach(grassArticleComment -> {
|
|
|
Integer articleId = publicArticleIdAndGrassIdMap.get(grassArticleComment.getDestId());
|
|
|
if(articleId == null){
|
|
|
return;
|
|
|
}
|
|
|
PublicUserComment publicUserComment = new PublicUserComment();
|
|
|
publicUserComment.setUid(grassArticleComment.getUid());
|
|
|
publicUserComment.setSrcChannel(1);
|
|
|
publicUserComment.setCreateTime((long)(grassArticleComment.getCreateTime()) * 1000);
|
|
|
publicUserComment.setUpdateTime((long)(Optional.ofNullable(grassArticleComment.getUpdateTime()).orElse(0)) *1000);
|
|
|
publicUserComment.setArticleId(articleId);
|
|
|
publicUserComment.setContentData(convertCommentContent(grassArticleComment.getContent()));
|
|
|
publicUserComment.setRelateId(grassArticleComment.getId());
|
|
|
publicUserComment.setPraiseNum(grassArticleComment.getPraiseTotal());
|
|
|
publicUserComment.setAuditStatus(convertCommentStatus(grassArticleComment.getStatus()));
|
|
|
publicUserComment.setAuditAccount(grassArticleComment.getReviewer());
|
|
|
publicUserComment.setAuditTime((long)(Optional.ofNullable(grassArticleComment.getUpdateTime()).orElse(0)) *1000);
|
|
|
result.add(publicUserComment);
|
|
|
});
|
|
|
return result;
|
|
|
}
|
|
|
|
|
|
//同步所有层级的子评论 各层级的自关联子评论 递归处理
|
|
|
private void buildAndSaveChildrenComment( Map<Integer, Integer> publicArticleIdAndGrassIdMap,
|
|
|
List<PublicUserComment> rootPublicComments, List<GrassArticleComment> childrenComments,
|
|
|
Map<Integer, Integer> rootIdMap){
|
|
|
|
|
|
if(CollectionUtils.isEmpty(childrenComments) || CollectionUtils.isEmpty(rootPublicComments)){
|
|
|
return;
|
|
|
}
|
|
|
List<PublicUserComment> syncPublicUserCommentList;
|
|
|
List<Integer> rootIds = rootPublicComments.stream().map(PublicUserComment::getRelateId).collect(Collectors.toList());
|
|
|
Map<Integer, Integer> commentRootIdAndPublicId = new HashMap<>();
|
|
|
rootPublicComments.forEach(publicUserComment -> {
|
|
|
commentRootIdAndPublicId.put(publicUserComment.getRelateId(), publicUserComment.getId());
|
|
|
});
|
|
|
|
|
|
List<GrassArticleComment> needSync = childrenComments.stream().filter(grassArticleComment ->
|
|
|
rootIds.contains(grassArticleComment.getParentId())).collect(Collectors.toList());
|
|
|
|
|
|
List<GrassArticleComment> nextSync = childrenComments.stream().filter(grassArticleComment ->
|
|
|
!rootIds.contains(grassArticleComment.getParentId())).collect(Collectors.toList());
|
|
|
|
|
|
syncPublicUserCommentList = buildChildrenComments(publicArticleIdAndGrassIdMap, needSync, commentRootIdAndPublicId, rootIdMap);
|
|
|
|
|
|
List<PublicUserComment> publicUserCommentList = publicUserCommentRepository.saveAll(syncPublicUserCommentList);
|
|
|
buildAndSaveChildrenComment(publicArticleIdAndGrassIdMap, publicUserCommentList, nextSync, rootIdMap);
|
|
|
}
|
|
|
|
|
|
//同步之后,查询所有新产生子评论的父评论id
|
|
|
//要同步的子评论数据中包含两部分 1 父评论已经存在在新表中 刚刚同步的 或者是时间区间之前产生的
|
|
|
// 2 父评论也在这一部分子评论中 此时需要递归进行插入
|
|
|
private void buildAndSaveChildrenCommentBySelf( Map<Integer, Integer> publicArticleIdAndGrassIdMap,
|
|
|
List<GrassArticleComment> childrenComments,
|
|
|
Map<Integer, Integer> relateIdMap){
|
|
|
|
|
|
if(CollectionUtils.isEmpty(childrenComments) ){
|
|
|
return;
|
|
|
}
|
|
|
List<PublicUserComment> syncPublicUserCommentList;
|
|
|
|
|
|
// 父评论已经存在在新表中
|
|
|
List<GrassArticleComment> needSync = childrenComments.stream().filter(grassArticleComment ->
|
|
|
relateIdMap.keySet().contains(grassArticleComment.getParentId())).collect(Collectors.toList());
|
|
|
|
|
|
//父评论也在这一部分子评论中
|
|
|
List<GrassArticleComment> nextSync = childrenComments.stream().filter(grassArticleComment ->
|
|
|
!relateIdMap.keySet().contains(grassArticleComment.getParentId())).collect(Collectors.toList());
|
|
|
|
|
|
syncPublicUserCommentList = buildChildrenComments(publicArticleIdAndGrassIdMap, needSync, relateIdMap, relateIdMap);
|
|
|
|
|
|
//新同步的数据id对应关系 直接加到map中来
|
|
|
List<PublicUserComment> publicUserCommentList = publicUserCommentRepository.saveAll(syncPublicUserCommentList);
|
|
|
publicUserCommentList.forEach(publicUserComment -> {
|
|
|
relateIdMap.put(publicUserComment.getRelateId(), publicUserComment.getId());
|
|
|
});
|
|
|
|
|
|
buildAndSaveChildrenCommentBySelf(publicArticleIdAndGrassIdMap, nextSync, relateIdMap);
|
|
|
}
|
|
|
|
|
|
private List<PublicUserComment> buildChildrenComments(Map<Integer, Integer> publicArticleIdAndGrassIdMap,List<GrassArticleComment> needSync,
|
|
|
Map<Integer, Integer> commentRootIdAndPublicId,
|
|
|
Map<Integer, Integer> rootIdMap){
|
|
|
List<PublicUserComment> syncPublicUserCommentList = Lists.newArrayList();
|
|
|
needSync.forEach(grassArticleComment -> {
|
|
|
Integer articleId = publicArticleIdAndGrassIdMap.get(grassArticleComment.getDestId());
|
|
|
if(articleId == null){
|
|
|
return;
|
|
|
}
|
|
|
PublicUserComment publicUserComment = new PublicUserComment();
|
|
|
publicUserComment.setUid(grassArticleComment.getUid());
|
|
|
publicUserComment.setSrcChannel(1);
|
|
|
publicUserComment.setCreateTime((long)(grassArticleComment.getCreateTime()) * 1000);
|
|
|
publicUserComment.setUpdateTime((long)(Optional.ofNullable(grassArticleComment.getUpdateTime()).orElse(0)) *1000);
|
|
|
publicUserComment.setArticleId(articleId);
|
|
|
publicUserComment.setToCommentId(commentRootIdAndPublicId.get(grassArticleComment.getParentId()));
|
|
|
publicUserComment.setContentData(convertCommentContent(grassArticleComment.getContent()));
|
|
|
publicUserComment.setRootId(rootIdMap.get(grassArticleComment.getRootId()));
|
|
|
publicUserComment.setRelateId(grassArticleComment.getId());
|
|
|
publicUserComment.setRelateParentId(grassArticleComment.getParentId());
|
|
|
publicUserComment.setPraiseNum(grassArticleComment.getPraiseTotal());
|
|
|
publicUserComment.setAuditStatus(convertCommentStatus(grassArticleComment.getStatus()));
|
|
|
publicUserComment.setAuditAccount(grassArticleComment.getReviewer());
|
|
|
publicUserComment.setAuditTime((long)(Optional.ofNullable(grassArticleComment.getUpdateTime()).orElse(0)) *1000);
|
|
|
syncPublicUserCommentList.add(publicUserComment);
|
|
|
});
|
|
|
return syncPublicUserCommentList;
|
|
|
}
|
|
|
|
|
|
private String convertCommentContent(String text){
|
|
|
JSONObject jsonObject = new JSONObject();
|
|
|
jsonObject.put("type","text");
|
|
|
jsonObject.put("content",text);
|
|
|
return jsonObject.toString();
|
|
|
}
|
|
|
|
|
|
private int convertCommentStatus(int grassCommentStatus){
|
|
|
switch (grassCommentStatus){
|
|
|
case UserCommentConstant.GRASS_NOT_AUDIT:
|
|
|
return UserCommentConstant.PUBLIC_NOT_AUDIT;
|
|
|
case UserCommentConstant.GRASS_HAS_AUDIT:
|
|
|
return UserCommentConstant.PUBLIC_HAS_AUDIT;
|
|
|
case UserCommentConstant.GRASS_AUDIT_REFUSED:
|
|
|
return UserCommentConstant.PUBLIC_AUDIT_REFUSED;
|
|
|
default:
|
|
|
return UserCommentConstant.PUBLIC_NOT_AUDIT;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
} |
...
|
...
|
|