Authored by 张帅

自动点赞发送站内信

... ... @@ -7,7 +7,7 @@ import com.yoho.service.model.sns.model.enums.GrassInboxBusinessTypeEnum;
import com.yoho.service.model.sns.request.GrassInBoxAddReq;
import com.yohobuy.platform.common.enums.GrassUserTypeEnum;
import com.yohobuy.platform.common.service.redis.PlatformRedis;
import com.yohobuy.platform.common.util.StringUtil;
import com.yohobuy.platform.common.util.DateUtil;
import com.yohobuy.platform.dal.grass.*;
import com.yohobuy.platform.dal.grass.model.*;
import com.yohobuy.platform.grass.service.IGrassVirtualService;
... ... @@ -18,7 +18,11 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static com.yoho.tools.common.utils.RandomUtil.getRandom;
... ... @@ -64,6 +68,61 @@ public class GrassUserVirtualImpl implements IGrassVirtualService{
private static final int recPraise = 2;
private static ExecutorService executorService = Executors.newSingleThreadExecutor();
private static final String MESSAGE_KEY = "yh:pfcms:grass:auto_praise:list";
private static final String MESSAGE_SENDING_FLAG = "yh:pfcms:grass:auto_praise:sending";
@PostConstruct
public void startSendMessageTask(){
/**
*
*/
executorService.submit(() -> {
while(true){
List<GrassInBoxAddReq> messageList = platformRedis.range(MESSAGE_KEY, "all",
GrassInBoxAddReq.class, 0, -1);
logger.info("startSendMessageTask Begin messageList size is {}",messageList.size());
if(!messageList.isEmpty()) {
//在当前线程开始发站内信之前,确保没有别的地方也在跑
//在redis 放一个正在处理的标记位, 正常数据量 2w 30s之内可以处理完, 标记位置有效时间10分钟
Integer sendTime = platformRedis.getValue(MESSAGE_SENDING_FLAG);
//没有标记位,直接处理
if(sendTime == null){
platformRedis.setValue(MESSAGE_SENDING_FLAG, DateUtil.getCurrentTimeSeconds(), 600);
for (GrassInBoxAddReq message : messageList) {
List<GrassInBoxAddReq> l = new ArrayList<>();
l.add(message);
if (l.size() % 1000 == 0) {
sendMessage(l);
l.clear();
Thread.sleep(1000);
}
if (CollectionUtils.isNotEmpty(l)) {
sendMessage(l);
}
}
platformRedis.delete(MESSAGE_SENDING_FLAG);
platformRedis.delete(MESSAGE_KEY);
logger.info("sendMessageTask end , message list clear success");
}else if(DateUtil.getCurrentTimeSeconds() - sendTime < 600){
logger.info("current message list is sending");
}else if(DateUtil.getCurrentTimeSeconds() - sendTime >= 600){ //标记位超过10分钟,认为失效,清除
logger.info("current message list is sending, and time over 600s");
platformRedis.delete(MESSAGE_SENDING_FLAG);
}
}
//加个随机数的时间, 避免多台机器同时执行
Thread.sleep(5 * 60 * 1000 + new Random().nextInt(180) * 1000);
}
});
}
/**
* 点赞规则
... ... @@ -145,6 +204,7 @@ public class GrassUserVirtualImpl implements IGrassVirtualService{
return new ArrayList<>();
}
//入库操作
logger.info("addPraiseForPublish updateData begin");
updatePraiseData(newPariseDetail, updateDetailList,updateArticleList,userAchieveList);
return newPariseDetail;
}
... ... @@ -193,6 +253,7 @@ public class GrassUserVirtualImpl implements IGrassVirtualService{
logger.info("addOrUpdateVirtualPraise list is empty");
return new ArrayList<>();
}
logger.info("addPraiseForPublish updateData begin");
updatePraiseData(newPariseDetail, updateDetailList,updateArticleList,userAchieveList);
return newPariseDetail;
}
... ... @@ -206,10 +267,10 @@ public class GrassUserVirtualImpl implements IGrassVirtualService{
*/
private void updatePraiseData(List<GrassArticlePraise> newPariseDetail,List<GrassArticlePraise> updateDetailList,
List<GrassArticle> articleList,List<GrassUserAchieve> userAchieveList) {
logger.info("start to addVirtualPraise,praiseDetailList={}",newPariseDetail);
logger.info("start to updateVirtualPraise,updateDetailList={}",updateDetailList);
logger.info("start to addVirtualPraise,articleList={}",articleList);
logger.info("start to addOrUpdateUserAchieve,userAchieveList={}",userAchieveList);
logger.info("start to addVirtualPraise,praiseDetailList size is {}",newPariseDetail.size());
logger.info("start to updateVirtualPraise,updateDetailList size is {}",updateDetailList.size());
logger.info("start to addVirtualPraise,articleList size is {}",articleList.size());
logger.info("start to addOrUpdateUserAchieve,userAchieveList size is {}",userAchieveList.size());
//1)新增点赞明细表
if(CollectionUtils.isNotEmpty(newPariseDetail)){
grassArticlePraiseDao.batchInsert(newPariseDetail);
... ... @@ -534,6 +595,7 @@ public class GrassUserVirtualImpl implements IGrassVirtualService{
}
private void sendAddPariseMessage(List<GrassArticlePraise> pariseDetail) {
logger.info("sendAddPariseMessage start, pariseDetail size is {}", pariseDetail.size());
if(CollectionUtils.isEmpty(pariseDetail)){
return;
}
... ... @@ -568,7 +630,9 @@ public class GrassUserVirtualImpl implements IGrassVirtualService{
reqList.add(req);
}
sendMessage(reqList);
// sendMessage(reqList);
addSendMessage(reqList);
}
private void sendAddPariseMessage(List<GrassArticlePraise> pariseDetail,Map<Integer,GrassArticle> articleInfoMap) {
... ... @@ -604,7 +668,7 @@ public class GrassUserVirtualImpl implements IGrassVirtualService{
reqList.add(req);
}
sendMessage(reqList);
addSendMessage(reqList);
}
private void sendMessage(List<GrassInBoxAddReq> reqList) {
... ... @@ -646,7 +710,7 @@ public class GrassUserVirtualImpl implements IGrassVirtualService{
req.setParams("");
reqList.add(req);
}
sendMessage(reqList);
addSendMessage(reqList);
}
... ... @@ -939,4 +1003,31 @@ public class GrassUserVirtualImpl implements IGrassVirtualService{
}
return set;
}
private void addSendMessage(List<GrassInBoxAddReq> list){
//往队列里扔之前,先确保队列此时没有被消费, 不然会丢失
//如果正在被处理,则等待处理完再push, 最多等10分钟
int count = 0;
while (true){
Integer sendTime = platformRedis.getValue(MESSAGE_SENDING_FLAG);
if(sendTime != null && DateUtil.getCurrentTimeSeconds() - sendTime < 600){
logger.info(" message list is sending, wait a minute");
count ++;
try {
Thread.sleep(60 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}else {
platformRedis.rightPushAll(MESSAGE_KEY, "all",list, 4, TimeUnit.HOURS);
break;
}
if( count >= 10 ){
platformRedis.rightPushAll(MESSAGE_KEY, "all",list, 4, TimeUnit.HOURS);
break;
}
}
}
}
... ...