Authored by unknown

Merge branch 'dev-2683微信服务号客服消息'

... ... @@ -8,4 +8,6 @@ public class RedisCacheKeyConstant {
//小程序发送数统计key
public static final String MINI_SEND_NUM_KEY = "yh:messageSender:MINI_SEND_NUM_";
public static final String WECAHT_CUSTOM_SEND_KEY = "yh:messageSender:WECAHT_CUSTOM_SEND_";
}
... ...
... ... @@ -3,11 +3,15 @@ package com.yoho.yhmessage.redis;
import com.yoho.core.redis.cluster.annotation.Redis;
import com.yoho.core.redis.cluster.operations.nosync.*;
import com.yoho.core.redis.cluster.operations.serializer.RedisKeyBuilder;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
... ... @@ -103,4 +107,42 @@ public class PushRedisService {
return count;
}
public void getSetFilter(Set<String> orginSet, String redisKey){
if(CollectionUtils.isEmpty(orginSet)){
return;
}
List<String> orginList = new ArrayList<>(orginSet);
Set<String> invalidSet = new HashSet<>();
List<Object> filteredList = pushRedisTemplate.executePipelined(connection -> {
connection.openPipeline();
byte[] key = stringSerializer.serialize(redisKey);
for (String str : orginList) {
connection.sIsMember(key, stringSerializer.serialize(str));
}
return null;
});
for (int j = 0; j < orginSet.size(); j++) {
//如果存在 则过滤
if ("true".equals(filteredList.get(j).toString())) {
invalidSet.add(orginList.get(j));
}
}
orginSet.removeAll(invalidSet);
}
public void setListValue(String redisKey,List<String> valueList,int hours){
//写入redis中的当天发送uid set集合
pushRedisTemplate.executePipelined(connection -> {
connection.openPipeline();
byte[] key = stringSerializer.serialize(redisKey);
for (String value : valueList) {
connection.sAdd(key, stringSerializer.serialize(value));
}
return null;
});
//设置超时时间
RedisKeyBuilder redisTodayKey = RedisKeyBuilder.newInstance().appendFixed(redisKey);
pushRedisTemplate.expire(redisTodayKey,hours,TimeUnit.HOURS);
}
}
... ...
... ... @@ -4,7 +4,7 @@
<parent>
<groupId>com.yoho</groupId>
<artifactId>yoho-starter</artifactId>
<version>1.4.5-SNAPSHOT</version>
<version>1.4.7-SNAPSHOT</version>
</parent>
<groupId>com.yoho.dsf</groupId>
... ...
... ... @@ -16,4 +16,19 @@ consumer:
topic: msgcenter.inner.wechat
- class: com.yoho.yhmessage.wechat.consumer.MiniMsgSendConsumer
topic: msgcenter.inner.mini
\ No newline at end of file
topic: msgcenter.inner.mini
- class: com.yoho.yhmessage.wechat.consumer.WechatCustomMsgConsumer
topic: msgcenter.inner.wechatCustom
producer:
- address: 172.16.6.54:5672
username: yoho
password: yoho
producers:
- bean: yhProducer
async: true
trace: false
confirm: true
persistent: false
\ No newline at end of file
... ...
... ... @@ -16,4 +16,19 @@ consumer:
topic: msgcenter.inner.wechat
- class: com.yoho.yhmessage.wechat.consumer.MiniMsgSendConsumer
topic: msgcenter.inner.mini
\ No newline at end of file
topic: msgcenter.inner.mini
- class: com.yoho.yhmessage.wechat.consumer.WechatCustomMsgConsumer
topic: msgcenter.inner.wechatCustom
producer:
- address: ${rabbit_common}
username: ${rabbit_common_user}
password: ${rabbit_common_password}
producers:
- bean: yhProducer
async: true
trace: false
confirm: true
persistent: false
\ No newline at end of file
... ...
package com.test;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.yoho.message.dal.MessageFilterInfoMapper;
import com.yoho.service.model.msgcenter.wechat.WechatCustomMsgBO;
import com.yoho.yhmessage.wechat.service.IWechatService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import java.util.ArrayList;
import java.util.List;
/**
* Created by min.ling on 2018/10/8.
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath*:META-INF/spring/spring*.xml","classpath*:META-INF/spring/mybatis-datasource.xml",})
public class WechatSendTest {
@Autowired
MessageFilterInfoMapper filterInfoMapper;
@Autowired
private IWechatService wechatService;
@Test
public void sendCustomTest(){
List<String> openIdList = new ArrayList<>();
openIdList.add("oemqmjl0M38zwP-7wnmhNcQqVMEA");
JSONArray contentArray = new JSONArray();
JSONObject part = new JSONObject();
part.put("title","Happy Day");
part.put("description","Is Really A Happy Day");
part.put("url","https://m.yohobuy.com/");
part.put("picurl","http://img11.static.yhbimg.com/yhb-img01/2018/10/09/11/01212cad454186ddd6e9bab47d76dde7f3.png");
contentArray.add(part);
JSONObject content = new JSONObject();
content.put("articles",contentArray);
WechatCustomMsgBO customMsgBO = new WechatCustomMsgBO();
customMsgBO.setPublicNumCode(2);
customMsgBO.setOpenIdList(openIdList);
customMsgBO.setSendKey("123");
customMsgBO.setFilterDays(0);
customMsgBO.setMsgType("news");
customMsgBO.setSendContent(content.toJSONString());
wechatService.sendWechatCustomMsg(customMsgBO);
}
}
... ...
... ... @@ -16,4 +16,8 @@ public class Consts {
public static final String WEIXIN_ACCESS_TOKEN_URL = "https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid=APPID&secret=APPSECRET";
//微信查询用户信息地址
public static final String QUERY_USER_INFO_URL = "https://api.weixin.qq.com/cgi-bin/user/info?access_token=ACCESS_TOKEN&openid=OPENID&lang=zh_CN";
//客服消息发送地址
public static final String CUSTOM_MSG_SEND_URL = "https://api.weixin.qq.com/cgi-bin/message/custom/send?access_token=";
public static final String CUSTOM_MSG_SEND_NUM_TOPIC = "crm.customMsgSendNum";
}
... ...
package com.yoho.yhmessage.wechat.consumer;
import com.alibaba.fastjson.JSONObject;
import com.yoho.core.rabbitmq.YhConsumer;
import com.yoho.service.model.msgcenter.wechat.WechatCustomMsgBO;
import com.yoho.yhmessage.wechat.service.IWechatService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 客服消息发送
* Created by min.ling on 2018/9/29.
*/
@Component
public class WechatCustomMsgConsumer implements YhConsumer {
private static final Logger log = LoggerFactory.getLogger(WechatCustomMsgConsumer.class);
@Autowired
IWechatService wechatService;
@Override
public void handleMessage(Object message) throws Exception {
log.info("WechatCustomMsgConsumer.handleMessage with message is {}", message);
try {
WechatCustomMsgBO customMsgBO = JSONObject.parseObject(message.toString(), WechatCustomMsgBO.class);
// 默认值 有货服务号
customMsgBO.setPublicNumCode(customMsgBO.getPublicNumCode() == 0 ? 2 : customMsgBO.getPublicNumCode());
wechatService.sendWechatCustomMsg(customMsgBO);
} catch (Exception e) {
log.error("WechatCustomMsgConsumer.handleMessage error with e is {}", e);
}
}
}
... ...
... ... @@ -2,6 +2,7 @@ package com.yoho.yhmessage.wechat.service;
import com.yoho.error.exception.ServiceException;
import com.yoho.service.model.msgcenter.wechat.McWechatBO;
import com.yoho.service.model.msgcenter.wechat.WechatCustomMsgBO;
/**
*
... ... @@ -24,4 +25,11 @@ public interface IWechatService {
* @throws ServiceException <br>
*/
void sendWechatMsg(McWechatBO wechatBO) throws ServiceException;
/**
* 发送微信客服消息
* @param customMsgBO
* @throws ServiceException
*/
void sendWechatCustomMsg(WechatCustomMsgBO customMsgBO) throws ServiceException;
}
... ...
package com.yoho.yhmessage.wechat.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.google.common.util.concurrent.RateLimiter;
import com.yoho.core.common.utils.DateUtil;
import com.yoho.core.rabbitmq.YhProducer;
import com.yoho.error.exception.ServiceException;
import com.yoho.service.model.msgcenter.wechat.McWechatBO;
import com.yoho.yhmessage.filter.IFilterSensitiveWordService;
import com.yoho.service.model.msgcenter.wechat.WechatCustomMsgBO;
import com.yoho.yhmessage.constants.RedisCacheKeyConstant;
import com.yoho.yhmessage.redis.PushRedisService;
import com.yoho.yhmessage.wechat.common.Consts;
import com.yoho.yhmessage.wechat.service.IWechatService;
import com.yoho.yhmessage.wechat.service.IWeixinAccessTokenService;
import com.yoho.yhmessage.wechat.service.model.AccessToken;
import com.yoho.yhmessage.wechat.util.SendWechatRequest;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.*;
/**
*
* <Description> 微信<br>
... ... @@ -40,8 +54,22 @@ public class WechatServiceImpl implements IWechatService {
private SendWechatRequest sendWechatRequest;
@Autowired
private IFilterSensitiveWordService filterSensitiveWordService;
private PushRedisService pushRedisService;
private static final String DATE_ID_FORMAT = "yyyyMMdd";
//1s放10个令牌 100ms频率
private static final RateLimiter rateLimiter = RateLimiter.create(10);
private static final int SEVEN_DAY_HOURS = 168;
@Resource(name = "yhProducer")
private YhProducer yhProducer;
@Override
public void sendWechatMsg(McWechatBO wechatBO) throws ServiceException {
logger.info("begin sendTemplateMsg param {}", wechatBO);
... ... @@ -67,7 +95,7 @@ public class WechatServiceImpl implements IWechatService {
logger.warn("sendWechatMsg with accessToken is null, publicCode is {},wechatBO is {}",wechatBO.getPublicNumberCode(),wechatBO);
return;
}
String templateUrl = this.getTemplateUrl(accessToken);
String templateUrl = this.getTemplateUrl(accessToken,Consts.SEND_TEMPLATE_URL);
//针对需要展示微信用户昵称的消息,在这里调用微信api查询一把并替换,避免在其他服务调用微信api的复杂行,微信用户昵称统一用${wechatNickname}表示
String data = this.addUserNickname(wechatBO, accessToken);
// 获取请求参数
... ... @@ -82,7 +110,7 @@ public class WechatServiceImpl implements IWechatService {
logger.warn("send wechat template message error, e is {}", JSONObject.toJSONString(result) );
// 重新调接口获取access token
AccessToken accessTokenNew = weixinAccessTokenService.getAccessTokenFromUrl(wechatBO.getPublicNumberCode());
templateUrl = this.getTemplateUrl(accessTokenNew);
templateUrl = this.getTemplateUrl(accessTokenNew,Consts.SEND_TEMPLATE_URL);
result = sendWechatRequest.sendWechatRequest(wechatBO.getPublicNumberCode(), templateUrl, "POST", sendMsg.toJSONString());
}
sendWechatMsgLog.info("send wechat template message {},result {}, templateUrl is {}", sendMsg.toJSONString(), result, templateUrl);
... ... @@ -116,12 +144,12 @@ public class WechatServiceImpl implements IWechatService {
* @taskId <br>
* @return <br>
*/
private String getTemplateUrl(AccessToken accessToken) {
private String getTemplateUrl(AccessToken accessToken,String url) {
if (null == accessToken) {
return null;
}
logger.info("accessToken is {}", accessToken.getToken());
StringBuffer templateUrl = new StringBuffer(Consts.SEND_TEMPLATE_URL);
StringBuffer templateUrl = new StringBuffer(url);
templateUrl.append(accessToken.getToken());
return templateUrl.toString();
}
... ... @@ -129,4 +157,100 @@ public class WechatServiceImpl implements IWechatService {
private String getQueryUserUrl(String openId, AccessToken accessToken) {
return Consts.QUERY_USER_INFO_URL.replace("ACCESS_TOKEN", accessToken.getToken()).replace("OPENID", openId);
}
@Override
public void sendWechatCustomMsg(WechatCustomMsgBO customMsgBO) throws ServiceException{
logger.info("enter sendWechatCustomMsg with sceneKey is {},msgBO is {}",customMsgBO.getSendKey(),customMsgBO.getSendContent());
if(CollectionUtils.isEmpty(customMsgBO.getOpenIdList())){
logger.warn("sendWechatCustomMsg with sendOpenIds null, sceneKey is {}", customMsgBO.getSendKey());
return;
}
String sceneKey = customMsgBO.getSendKey();
Set<String> openIdSet = new HashSet<>(customMsgBO.getOpenIdList());
logger.info("sendWechatCustomMsg with sceneKey is {},send origin size is {}",sceneKey,openIdSet.size());
//根据过滤条件进行过滤
if(customMsgBO.getFilterDays() != 0){
getDaysFilter(customMsgBO, openIdSet);
}
//组装发送
List<String> sendSuccessList = customMsgSend(customMsgBO, openIdSet);
if(CollectionUtils.isEmpty(sendSuccessList)){
logger.warn("sendWechatCustomMsg with sendSuccess size is 0,sceneKey is {}",customMsgBO.getSendKey());
return;
}
logger.info("end sendWechatCustomMsg with sceneKey is {},successSize is {}",customMsgBO.getSendKey(),sendSuccessList.size());
//成功记录存redis
String nowDate = DateUtil.getToday(DATE_ID_FORMAT);
String redisKeyStr = RedisCacheKeyConstant.WECAHT_CUSTOM_SEND_KEY + customMsgBO.getPublicNumCode() + '_' + nowDate;
//成功记录存7天
pushRedisService.setListValue(redisKeyStr,sendSuccessList,SEVEN_DAY_HOURS);
logger.info("sendWechatCustomMsg with setRedis end, sceneKey is {},successSize is {}",customMsgBO.getSendKey(),sendSuccessList.size());
//发送成功数给到crm
sendMqToCrm(customMsgBO, sendSuccessList);
}
private void sendMqToCrm(WechatCustomMsgBO customMsgBO, List<String> sendSuccessList) {
JSONObject object = new JSONObject();
object.put("activityId",customMsgBO.getSendKey());
object.put("sendNum",sendSuccessList.size());
yhProducer.send(Consts.CUSTOM_MSG_SEND_NUM_TOPIC,object);
}
private void getDaysFilter(WechatCustomMsgBO customMsgBO, Set<String> openIdSet) {
int filterDayReal = customMsgBO.getFilterDays() - 1;
//根据间隔天数计算出需要过滤到的最早的一天
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.DATE, -filterDayReal);
Calendar c = Calendar.getInstance();
c.setTime(calendar.getTime());
int day = c.get(Calendar.DATE);
for (int i = 0; i <= filterDayReal; i++){
//从最早日期开始往后累加到每个日期,每天过滤
c.set(Calendar.DATE, day + i);
String nowDate = new SimpleDateFormat(DATE_ID_FORMAT).format(c.getTime());
//根据日期获取当天发送客服消息的记录
String redisKeyStr = RedisCacheKeyConstant.WECAHT_CUSTOM_SEND_KEY + customMsgBO.getPublicNumCode() + '_' + nowDate;
pushRedisService.getSetFilter(openIdSet,redisKeyStr);
logger.info("sendWechatCustomMsg with day filter,sceneKey is {},redisKeyStr is {},nowSize is {}",customMsgBO.getSendKey(),redisKeyStr,openIdSet.size());
if(openIdSet.size() == 0){
break;
}
}
}
private List<String> customMsgSend(WechatCustomMsgBO customMsgBO,Set<String> openIdSet){
logger.info("begin customMsgSend with sceneKey is {},send openId size is {}",customMsgBO.getSendKey(),openIdSet.size());
if(CollectionUtils.isEmpty(openIdSet)){
logger.warn("customMsgSend with openIdSet is null,sceneKey is {}",customMsgBO.getSendKey());
return new ArrayList<>();
}
// 获取token 和接口url
AccessToken accessToken = weixinAccessTokenService.queryWeixinAccessToken(customMsgBO.getPublicNumCode());
//若仍未获取到 token 则返回
if(accessToken == null){
logger.warn("customMsgSend with accessToken is null,sendKey is {}",customMsgBO.getSendKey());
return new ArrayList<>();
}
List<String> successList = new ArrayList<>();
String templateUrl = this.getTemplateUrl(accessToken, Consts.CUSTOM_MSG_SEND_URL);
//组装消息内容
JSONObject sendDataObject = new JSONObject();
sendDataObject.put("msgtype",customMsgBO.getMsgType());
sendDataObject.put(customMsgBO.getMsgType(),JSONObject.parseObject(customMsgBO.getSendContent()));
openIdSet.forEach(openId -> {
//限制100ms调一次
rateLimiter.acquire();
sendDataObject.put("touser", openId);
JSONObject result = sendWechatRequest.sendWechatRequest(customMsgBO.getPublicNumCode(), templateUrl, "POST", sendDataObject.toJSONString());
if (result != null && result.getIntValue("errcode") == 0) {
successList.add(openId);
}
sendWechatMsgLog.info("customMsgSend with sceneKey {},openId is {},result {}, content is {}", customMsgBO.getSendKey(), openId, result, sendDataObject.toJSONString());
});
return successList;
}
}
... ...