Authored by csgyoho

syn -now-public

... ... @@ -13,4 +13,6 @@ public interface PublicLabelRepository extends JpaRepository<PublicLabel, Intege
List<PublicLabel> findAllByLabelNameIn(List<String> labelNames);
@Query("select p from PublicLabel p where p.id in ?1")
List<PublicLabel> selectByIds(List<Integer> ids);
@Query("select p from PublicLabel p where p.relateId in ?1")
List<PublicLabel> selectByRelateIds(List<Integer> relateIds);
}
... ...
... ... @@ -73,7 +73,7 @@ public class YohoNowDataSynService {
int total = postsRepository.selectTotal(begTime,endTime);
for(int offset=0;offset<total;offset+=size){
List<Posts> postsList = postsRepository.selectByTime(begTime,endTime,offset,size);
if(postsList.isEmpty()){//TODO postsList可能为null吗?
if(postsList.isEmpty()){
return "success";
}
List<Integer> postIds = postsList.stream().map(Posts::getId).collect(Collectors.toList());
... ... @@ -146,48 +146,85 @@ public class YohoNowDataSynService {
//同步标签
List<Integer> topicIds = topicPosts.stream().map(TopicPost::getTopicId).distinct().collect(Collectors.toList());
List<PublicLabel> needSyncLabelList = new ArrayList<>();
if(!CollectionUtils.isEmpty(topicIds)){
if(!CollectionUtils.isEmpty(topicIds)){
List<Topic> topicList = topicRepository.selectByIds(topicIds);
if(!CollectionUtils.isEmpty(topicList)){
long now = System.currentTimeMillis();
topicList.forEach(x -> {
PublicLabel grassLabel = new PublicLabel();
grassLabel.setLabelName(x.getTopicName().replace("#",""));
grassLabel.setStatus(1);
grassLabel.setCreateTime(now);
grassLabel.setIsVisible(2); //用户不可见
needSyncLabelList.add(grassLabel);
});
}
}
}
if(!CollectionUtils.isEmpty(needSyncLabelList)){
publicLabelRepository.saveAll(needSyncLabelList);
if(CollectionUtils.isEmpty(topicIds)){
return;
}
//需要同步过来的文章和话题关联关系
List<PublicArticleLabel> needSyncList = new ArrayList<>();
if(!CollectionUtils.isEmpty(topicPosts)){
long now = System.currentTimeMillis();
topicPosts.forEach(x -> {
PublicArticleLabel articleLabel = new PublicArticleLabel();
articleLabel.setLabelId(x.getTopicId());
articleLabel.setArticleId(articleIdAndPostIdMap.get(x.getPostId()));
articleLabel.setCreateTime(now);
articleLabel.setSrcChannel(3); // now社区
needSyncList.add(articleLabel);
});
List<Topic> topicList = topicRepository.selectByIds(topicIds);
if(CollectionUtils.isEmpty(topicList)){
return;
}
logger.info("before synArticleLabel.size is {}",needSyncList.size());
if(!CollectionUtils.isEmpty(needSyncList)){
publicArticleLabelRepository.saveAll(needSyncList);
long now = System.currentTimeMillis();
topicList.forEach(x -> {
PublicLabel grassLabel = new PublicLabel();
grassLabel.setLabelName(x.getTopicName().replace("#",""));
grassLabel.setStatus(1);
grassLabel.setCreateTime(now);
grassLabel.setIsVisible(2); //用户不可见
grassLabel.setRelateId(x.getId());
needSyncLabelList.add(grassLabel);
});
if(CollectionUtils.isEmpty(needSyncLabelList)){
return;
}
savePublicLabels(needSyncLabelList);
savePublicArticleLabels(topicPosts,articleIdAndPostIdMap,topicIds);
}catch (Exception e){
e.printStackTrace();
}
}
private void savePublicArticleLabels(List<TopicPost> topicPosts, Map<Integer, Integer> articleIdAndPostIdMap, List<Integer> topicIds){
List<PublicLabel> publicLabels = publicLabelRepository.selectByRelateIds(topicIds);
Map<Integer,Integer> publicLabelMap = publicLabels.stream().
collect(Collectors.toMap(PublicLabel::getRelateId, PublicLabel::getId,(k1, k2)->k1));
//需要同步过来的文章和话题关联关系
List<PublicArticleLabel> needSyncList = new ArrayList<>();
if(!CollectionUtils.isEmpty(topicPosts)){
long now = System.currentTimeMillis();
topicPosts.forEach(x -> {
PublicArticleLabel articleLabel = new PublicArticleLabel();
articleLabel.setLabelId(publicLabelMap.get(x.getTopicId()));
articleLabel.setArticleId(articleIdAndPostIdMap.get(x.getPostId()));
articleLabel.setCreateTime(now);
articleLabel.setSrcChannel(3); // now社区
needSyncList.add(articleLabel);
});
}
publicEntityManager.getTransaction().begin();
StringBuilder sb = null;
for(PublicArticleLabel attention : needSyncList){
if(null == sb ){
sb = new StringBuilder("insert into public_article_label(label_id,article_id,src_channel,create_time) values ");
}else {
sb.append(",");
}
sb.append("(").append(attention.getLabelId()).append(",").append(attention.getArticleId()).append(",").append(attention.getSrcChannel()).append(",")
.append(attention.getCreateTime()).append(")");
}
sb.append("ON DUPLICATE KEY UPDATE label_id=values(label_id),article_id=values(article_id),src_channel=values(src_channel),create_time=values(create_time)");
publicEntityManager.createNativeQuery(sb.toString()).executeUpdate();
publicEntityManager.getTransaction().commit();
}
private void savePublicLabels(List<PublicLabel> needSyncLabelList){
publicEntityManager.getTransaction().begin();
StringBuilder sb = null;
for(PublicLabel attention : needSyncLabelList){
if(null == sb ){
sb = new StringBuilder("insert into public_label(label_name,group_id,status,is_visible,relate_id,src_channel,create_time,update_time) values ");
}else {
sb.append(",");
}
String labelName = attention.getLabelName() == null ? null : "'"+attention.getLabelName()+"'";
sb.append("(").append(labelName).append(",").append(attention.getGroupId()).append(",").append(attention.getStatus()).append(",")
.append(attention.getIsVisible()).append(",").append(attention.getRelateId()).append(",").
append(attention.getSrcChannel()).append(",").append(attention.getCreateTime()).append(",").append(attention.getUpdateTime()).append(")");
}
sb.append("ON DUPLICATE KEY UPDATE label_name=values(label_name),group_id=values(group_id),status=values(status),is_visible=values(is_visible),relate_id=values(relate_id),src_channel=values(src_channel),create_time=values(create_time),update_time=values(update_time)");
publicEntityManager.createNativeQuery(sb.toString()).executeUpdate();
publicEntityManager.getTransaction().commit();
}
private void synArticleBlock(List<PublicArticle> pArticles, Map<Integer, List<PostsBlock>> postsBlockMap) {
List<PublicArticleBlock> pBlocks = new ArrayList<>();
for(PublicArticle article : pArticles){
... ... @@ -345,28 +382,7 @@ public class YohoNowDataSynService {
List<PublicUserAttention> userAttentions = buildSynAttentionAuthor(fanUsersMap,filtedFanUids);
logger.info("before synAttentionAuthorByArticle.size is {}",userAttentions.size());
if(!CollectionUtils.isEmpty(userAttentions)){
publicEntityManager.getTransaction().begin();
StringBuilder sb = null;
for(PublicUserAttention attention : userAttentions){
if(null == sb ){
sb = new StringBuilder("insert into public_user_attention(uid,target_id,attention_type,author_type,status,src_channel,create_time,update_time) values ");
}else {
sb.append(",");
}
sb.append("(").append(attention.getUid()).append(",").append(attention.getTargetId()).append(",").append(attention.getAttentionType()).append(",")
.append(attention.getAuthorType()).append(",").append(attention.getStatus()).append(",").
append(attention.getSrcChannel()).append(",").append(attention.getCreateTime()).append(",").append(attention.getUpdateTime()).append(")");
}
sb.append("ON DUPLICATE KEY UPDATE uid=values(uid),target_id=values(target_id),attention_type=values(attention_type),author_type=values(author_type),status=values(status),src_channel=values(src_channel),create_time=values(create_time),update_time=values(update_time)");
publicEntityManager.createNativeQuery(sb.toString()).executeUpdate();
publicEntityManager.flush();
publicEntityManager.getTransaction().commit();
try{
Thread.sleep(200);
}catch (Exception exp){
exp.printStackTrace();
}
savePublicUserAttentions(userAttentions);
}
}
}
... ... @@ -385,17 +401,35 @@ public class YohoNowDataSynService {
List<PublicUserAttention> userAttentions = buildSynAttentionAuthor(fanUsersMap,filtedFanUids);
logger.info("before synAttentionAuthorByTime.size is {}",userAttentions.size());
if(!CollectionUtils.isEmpty(userAttentions)){
publicUserAttentionRepository.saveAll(userAttentions);
try{
Thread.sleep(200);
}catch (Exception exp){
exp.printStackTrace();
}
savePublicUserAttentions(userAttentions);
}
}
}
}
private void savePublicUserAttentions(List<PublicUserAttention> userAttentions){
publicEntityManager.getTransaction().begin();
StringBuilder sb = null;
for(PublicUserAttention attention : userAttentions){
if(null == sb ){
sb = new StringBuilder("insert into public_user_attention(uid,target_id,attention_type,author_type,status,src_channel,create_time,update_time) values ");
}else {
sb.append(",");
}
sb.append("(").append(attention.getUid()).append(",").append(attention.getTargetId()).append(",").append(attention.getAttentionType()).append(",")
.append(attention.getAuthorType()).append(",").append(attention.getStatus()).append(",").
append(attention.getSrcChannel()).append(",").append(attention.getCreateTime()).append(",").append(attention.getUpdateTime()).append(")");
}
sb.append("ON DUPLICATE KEY UPDATE uid=values(uid),target_id=values(target_id),attention_type=values(attention_type),author_type=values(author_type),status=values(status),src_channel=values(src_channel),create_time=values(create_time),update_time=values(update_time)");
publicEntityManager.createNativeQuery(sb.toString()).executeUpdate();
publicEntityManager.getTransaction().commit();
try{
Thread.sleep(200);
}catch (Exception exp){
exp.printStackTrace();
}
}
private List<PublicUserAttention> buildSynAttentionAuthor(Map<Integer, List<TblAttentionUser>> fanUsersMap, List<Integer> filtedFanUids) {
List<PublicUserAttention> userAttentions = new ArrayList<>();
for(Integer fanUid : filtedFanUids){
... ... @@ -444,12 +478,7 @@ public class YohoNowDataSynService {
List<PublicUserPraise> articlePraises = buildSynArticlePraises(articleMap,praiseList);
logger.info("before synPraiseDataByArticle.size is {}",articlePraises.size());
if(!articlePraises.isEmpty()){
publicUserPraiseRepository.saveAll(articlePraises);
try{
Thread.sleep(500);
}catch (Exception exp){
exp.printStackTrace();
}
savePublicUserPraises(articlePraises);
}
}
}
... ... @@ -462,25 +491,42 @@ public class YohoNowDataSynService {
if(CollectionUtils.isEmpty(praiseList)){
return;
}
List<Integer> postIds = praiseList.stream().map(PostsPraise::getPostsId).collect(Collectors.toList());
List<PublicArticle> articleList = publicArticleRepository.selectByRelateIds(postIds);
Map<Integer,PublicArticle> articleMap = BeanConvertUtils.listToMap(articleList,Integer.class,"relateId");
//将点赞数据同步到表grass_article_praise
List<PublicUserPraise> articlePraises = buildSynArticlePraises(articleMap,praiseList);
logger.info("before synPraiseDataByTime.size is {}",articlePraises.size());
if(!articlePraises.isEmpty()){
publicUserPraiseRepository.saveAll(articlePraises);
try{
Thread.sleep(500);
}catch (Exception exp){
exp.printStackTrace();
}
savePublicUserPraises(articlePraises);
}
}
}
private void savePublicUserPraises(List<PublicUserPraise> articlePraises){
publicEntityManager.getTransaction().begin();
StringBuilder sb = null;
for(PublicUserPraise attention : articlePraises){
if(null == sb ){
sb = new StringBuilder("insert into public_user_praise(target_id,uid,praise_type,status,src_channel,create_time,update_time) values ");
}else {
sb.append(",");
}
sb.append("(").append(attention.getTargetId()).append(",").append(attention.getUid()).append(",").append(attention.getPraiseType()).append(",")
.append(attention.getStatus()).append(",").append(attention.getSrcChannel()).append(",").
append(attention.getCreateTime()).append(",").append(attention.getUpdateTime()).append(")");
}
sb.append("ON DUPLICATE KEY UPDATE target_id=values(target_id),uid=values(uid),praise_type=values(praise_type),status=values(status)," +
"src_channel=values(src_channel),create_time=values(create_time),update_time=values(update_time)");
publicEntityManager.createNativeQuery(sb.toString()).executeUpdate();
publicEntityManager.getTransaction().commit();
try{
Thread.sleep(500);
}catch (Exception exp){
exp.printStackTrace();
}
}
private List<PublicUserPraise> buildSynArticlePraises(Map<Integer, PublicArticle> articleMap, List<PostsPraise> praiseList) {
List<PublicUserPraise> articlePraises = new ArrayList<>();
for(PostsPraise praise : praiseList){
... ...
... ... @@ -32,8 +32,9 @@ public class SecondaryDataSourceConfig {
private Properties jpaProperties;
@Bean(name = "entityManagerSecondary")
public EntityManager entityManager(EntityManagerFactoryBuilder builder) throws Exception {
return entityManagerFactorySecondary(builder).getObject().createEntityManager();
public EntityManager entityManager(@Qualifier("transactionManagerSecondary") PlatformTransactionManager builder) throws Exception {
JpaTransactionManager manager = (JpaTransactionManager)builder;
return manager.getEntityManagerFactory().createEntityManager();
}
@Bean(name = "entityManagerFactorySecondary")
public LocalContainerEntityManagerFactoryBean entityManagerFactorySecondary (EntityManagerFactoryBuilder builder) throws Exception {
... ... @@ -48,8 +49,8 @@ public class SecondaryDataSourceConfig {
}
@Bean(name = "transactionManagerSecondary")
PlatformTransactionManager transactionManagerSecondary(EntityManagerFactoryBuilder builder) throws Exception {
return new JpaTransactionManager(entityManagerFactorySecondary(builder).getObject());
PlatformTransactionManager transactionManagerSecondary(@Qualifier("entityManagerFactorySecondary") LocalContainerEntityManagerFactoryBean builder) throws Exception {
return new JpaTransactionManager(builder.getObject());
}
}
... ...