Authored by unknown

发送成功数mq给crm

... ... @@ -4,7 +4,7 @@
<parent>
<groupId>com.yoho</groupId>
<artifactId>yoho-starter</artifactId>
<version>1.4.6-SNAPSHOT</version>
<version>1.4.7-SNAPSHOT</version>
</parent>
<groupId>com.yoho.dsf</groupId>
... ...
... ... @@ -19,4 +19,16 @@ consumer:
topic: msgcenter.inner.mini
- class: com.yoho.yhmessage.wechat.consumer.WechatCustomMsgConsumer
topic: msgcenter.inner.wechatCustom
\ No newline at end of file
topic: msgcenter.inner.wechatCustom
producer:
- address: ${rabbit_common}
username: ${rabbit_common_user}
password: ${rabbit_common_password}
producers:
- bean: yhProducer
async: true
trace: false
confirm: true
persistent: false
\ No newline at end of file
... ...
... ... @@ -20,4 +20,16 @@ consumer:
topic: msgcenter.inner.mini
- class: com.yoho.yhmessage.wechat.consumer.WechatCustomMsgConsumer
topic: msgcenter.inner.wechatCustom
\ No newline at end of file
topic: msgcenter.inner.wechatCustom
producer:
- address: ${rabbit_crm_host}
username: ${rabbit_crm_user}
password: ${rabbit_crm_password}
producers:
- bean: yhProducer
async: true
trace: false
confirm: true
persistent: false
\ No newline at end of file
... ...
... ... @@ -18,4 +18,6 @@ public class Consts {
public static final String QUERY_USER_INFO_URL = "https://api.weixin.qq.com/cgi-bin/user/info?access_token=ACCESS_TOKEN&openid=OPENID&lang=zh_CN";
//客服消息发送地址
public static final String CUSTOM_MSG_SEND_URL = "https://api.weixin.qq.com/cgi-bin/message/custom/send?access_token=";
public static final String CUSTOM_MSG_SEND_NUM_TOPIC = "crm.customMsgSendNum";
}
... ...
... ... @@ -3,8 +3,10 @@ package com.yoho.yhmessage.wechat.consumer;
import com.alibaba.fastjson.JSONObject;
import com.yoho.core.rabbitmq.YhConsumer;
import com.yoho.service.model.msgcenter.wechat.WechatCustomMsgBO;
import com.yoho.yhmessage.wechat.service.IWechatService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
... ... @@ -16,11 +18,21 @@ public class WechatCustomMsgConsumer implements YhConsumer {
private static final Logger log = LoggerFactory.getLogger(WechatCustomMsgConsumer.class);
@Autowired
IWechatService wechatService;
@Override
public void handleMessage(Object message) throws Exception {
log.info("WechatCustomMsgConsumer.handleMessage with message is {}",message);
WechatCustomMsgBO customMsgBO = JSONObject.parseObject(message.toString(), WechatCustomMsgBO.class);
log.info("WechatCustomMsgConsumer.handleMessage with message is {}", message);
try {
WechatCustomMsgBO customMsgBO = JSONObject.parseObject(message.toString(), WechatCustomMsgBO.class);
// 默认值 有货服务号
customMsgBO.setPublicNumCode(customMsgBO.getPublicNumCode() == 0 ? 2 : customMsgBO.getPublicNumCode());
wechatService.sendWechatCustomMsg(customMsgBO);
} catch (Exception e) {
log.error("WechatCustomMsgConsumer.handleMessage error with e is {}", e);
}
}
}
... ...
... ... @@ -3,7 +3,7 @@ package com.yoho.yhmessage.wechat.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.google.common.util.concurrent.RateLimiter;
import com.yoho.core.common.utils.DateUtil;
import com.yoho.core.redis.cluster.operations.serializer.RedisKeyBuilder;
import com.yoho.core.rabbitmq.YhProducer;
import com.yoho.error.exception.ServiceException;
import com.yoho.service.model.msgcenter.wechat.McWechatBO;
import com.yoho.service.model.msgcenter.wechat.WechatCustomMsgBO;
... ... @@ -14,15 +14,14 @@ import com.yoho.yhmessage.wechat.service.IWechatService;
import com.yoho.yhmessage.wechat.service.IWeixinAccessTokenService;
import com.yoho.yhmessage.wechat.service.model.AccessToken;
import com.yoho.yhmessage.wechat.util.SendWechatRequest;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.apache.commons.collections.CollectionUtils;
import retrofit2.http.PUT;
import javax.print.DocFlavor;
import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.*;
... ... @@ -60,6 +59,9 @@ public class WechatServiceImpl implements IWechatService {
private static final int SEVEN_DAY_HOURS = 168;
@Resource(name = "yhProducer")
private YhProducer yhProducer;
@Override
public void sendWechatMsg(McWechatBO wechatBO) throws ServiceException {
logger.info("begin sendTemplateMsg param {}", wechatBO);
... ... @@ -144,12 +146,12 @@ public class WechatServiceImpl implements IWechatService {
public void sendWechatCustomMsg(WechatCustomMsgBO customMsgBO) throws ServiceException{
logger.info("enter sendWechatCustomMsg with sceneKey is {},msgBO is {}",customMsgBO.getSendKey(),customMsgBO.getSendContent());
if(CollectionUtils.isEmpty(customMsgBO.getOpenIdSet())){
if(CollectionUtils.isEmpty(customMsgBO.getOpenIdList())){
logger.warn("sendWechatCustomMsg with sendOpenIds null, sceneKey is {}",customMsgBO.getSendKey());
return;
}
String sceneKey = customMsgBO.getSendKey();
Set<String> openIdSet = new HashSet<>(customMsgBO.getOpenIdSet());
Set<String> openIdSet = new HashSet<>(customMsgBO.getOpenIdList());
logger.info("sendWechatCustomMsg with sceneKey is {},send origin size is {}",sceneKey,openIdSet.size());
//根据过滤条件进行过滤
if(customMsgBO.getFilterDays() != 0){
... ... @@ -159,9 +161,20 @@ public class WechatServiceImpl implements IWechatService {
List<String> sendSuccessList = customMsgSend(customMsgBO,openIdSet);
logger.info("end sendWechatCustomMsg with sceneKey is {},successSize is {}",customMsgBO.getSendKey(),sendSuccessList.size());
//成功记录存redis
String redisKeyStr = RedisCacheKeyConstant.WECAHT_CUSTOM_SEND_KEY + customMsgBO.getPublicNumberCode() + '_' + nowDate;
String nowDate = DateUtil.getToday(DATE_ID_FORMAT);
String redisKeyStr = RedisCacheKeyConstant.WECAHT_CUSTOM_SEND_KEY + customMsgBO.getPublicNumCode() + '_' + nowDate;
//成功记录存7天
pushRedisService.setListValue(redisKeyStr,sendSuccessList,SEVEN_DAY_HOURS);
logger.info("sendWechatCustomMsg with setRedis end, sceneKey is {},successSize is {}",customMsgBO.getSendKey(),sendSuccessList.size());
//发送成功数给到crm
sendMqToCrm(customMsgBO, sendSuccessList);
}
private void sendMqToCrm(WechatCustomMsgBO customMsgBO, List<String> sendSuccessList) {
JSONObject object = new JSONObject();
object.put("activityId",customMsgBO.getSendKey());
object.put("sendNum",sendSuccessList.size());
yhProducer.send(Consts.CUSTOM_MSG_SEND_NUM_TOPIC,object.toJSONString());
}
private void getDaysFilter(WechatCustomMsgBO customMsgBO, Set<String> openIdSet) {
... ... @@ -179,7 +192,7 @@ public class WechatServiceImpl implements IWechatService {
c.set(Calendar.DATE, day + i);
String nowDate = new SimpleDateFormat(DATE_ID_FORMAT).format(c.getTime());
//根据日期获取当天发送客服消息的记录
String redisKeyStr = RedisCacheKeyConstant.WECAHT_CUSTOM_SEND_KEY + customMsgBO.getPublicNumberCode() + '_' + nowDate;
String redisKeyStr = RedisCacheKeyConstant.WECAHT_CUSTOM_SEND_KEY + customMsgBO.getPublicNumCode() + '_' + nowDate;
pushRedisService.getSetFilter(openIdSet,redisKeyStr);
logger.info("sendWechatCustomMsg with day filter,sceneKey is {},redisKeyStr is {},nowSize is {}",customMsgBO.getSendKey(),redisKeyStr,openIdSet.size());
}
... ... @@ -189,7 +202,7 @@ public class WechatServiceImpl implements IWechatService {
private List<String> customMsgSend(WechatCustomMsgBO customMsgBO,Set<String> openIdSet){
logger.info("begin customMsgSend with sceneKey is {},send openId size is {}",customMsgBO.getSendKey(),openIdSet.size());
// 获取token 和接口url
AccessToken accessToken = weixinAccessTokenService.queryWeixinAccessToken(customMsgBO.getPublicNumberCode());
AccessToken accessToken = weixinAccessTokenService.queryWeixinAccessToken(customMsgBO.getPublicNumCode());
//若仍未获取到 token 则返回
if(accessToken == null){
logger.warn("customMsgSend with accessToken is null,sendKey is {}",customMsgBO.getSendKey());
... ... @@ -200,12 +213,12 @@ public class WechatServiceImpl implements IWechatService {
//组装消息内容
JSONObject sendDataObject = new JSONObject();
sendDataObject.put("msgtype",customMsgBO.getMsgType());
sendDataObject.put("news",JSONObject.parseObject(customMsgBO.getSendContent()));
sendDataObject.put(customMsgBO.getMsgType(),JSONObject.parseObject(customMsgBO.getSendContent()));
openIdSet.forEach(openId -> {
//限制100ms调一次
rateLimiter.acquire();
sendDataObject.put("touser", openId);
JSONObject result = sendWechatRequest.sendWechatRequest(customMsgBO.getPublicNumberCode(), templateUrl, "POST", sendDataObject.toJSONString());
JSONObject result = sendWechatRequest.sendWechatRequest(customMsgBO.getPublicNumCode(), templateUrl, "POST", sendDataObject.toJSONString());
if (result != null && result.getIntValue("errcode") == 0) {
successList.add(openId);
}
... ...