Authored by unknown

微信客服消息发送

@@ -8,4 +8,6 @@ public class RedisCacheKeyConstant { @@ -8,4 +8,6 @@ public class RedisCacheKeyConstant {
8 8
9 //小程序发送数统计key 9 //小程序发送数统计key
10 public static final String MINI_SEND_NUM_KEY = "yh:messageSender:MINI_SEND_NUM_"; 10 public static final String MINI_SEND_NUM_KEY = "yh:messageSender:MINI_SEND_NUM_";
  11 +
  12 + public static final String WECAHT_CUSTOM_SEND_KEY = "yh:messageSender:WECAHT_CUSTOM_SEND_";
11 } 13 }
@@ -3,11 +3,15 @@ package com.yoho.yhmessage.redis; @@ -3,11 +3,15 @@ package com.yoho.yhmessage.redis;
3 import com.yoho.core.redis.cluster.annotation.Redis; 3 import com.yoho.core.redis.cluster.annotation.Redis;
4 import com.yoho.core.redis.cluster.operations.nosync.*; 4 import com.yoho.core.redis.cluster.operations.nosync.*;
5 import com.yoho.core.redis.cluster.operations.serializer.RedisKeyBuilder; 5 import com.yoho.core.redis.cluster.operations.serializer.RedisKeyBuilder;
  6 +import org.apache.commons.collections.CollectionUtils;
6 import org.springframework.data.redis.serializer.RedisSerializer; 7 import org.springframework.data.redis.serializer.RedisSerializer;
7 import org.springframework.data.redis.serializer.StringRedisSerializer; 8 import org.springframework.data.redis.serializer.StringRedisSerializer;
8 import org.springframework.stereotype.Service; 9 import org.springframework.stereotype.Service;
9 10
  11 +import java.util.ArrayList;
  12 +import java.util.HashSet;
10 import java.util.List; 13 import java.util.List;
  14 +import java.util.Set;
11 import java.util.concurrent.TimeUnit; 15 import java.util.concurrent.TimeUnit;
12 16
13 /** 17 /**
@@ -103,4 +107,42 @@ public class PushRedisService { @@ -103,4 +107,42 @@ public class PushRedisService {
103 return count; 107 return count;
104 } 108 }
105 109
  110 + public void getSetFilter(Set<String> orginSet, String redisKey){
  111 + if(CollectionUtils.isEmpty(orginSet)){
  112 + return;
  113 + }
  114 + List<String> orginList = new ArrayList<>(orginSet);
  115 + Set<String> invalidSet = new HashSet<>();
  116 + List<Object> filteredList = pushRedisTemplate.executePipelined(connection -> {
  117 + connection.openPipeline();
  118 + byte[] key = stringSerializer.serialize(redisKey);
  119 + for (String str : orginList) {
  120 + connection.sIsMember(key, stringSerializer.serialize(str));
  121 + }
  122 + return null;
  123 + });
  124 + for (int j = 0; j < orginSet.size(); j++) {
  125 + //如果存在 则过滤
  126 + if ("true".equals(filteredList.get(j).toString())) {
  127 + invalidSet.add(orginList.get(j));
  128 + }
  129 + }
  130 + orginSet.removeAll(invalidSet);
  131 + }
  132 +
  133 + public void setListValue(String redisKey,List<String> valueList,int hours){
  134 + //写入redis中的当天发送uid set集合
  135 + pushRedisTemplate.executePipelined(connection -> {
  136 + connection.openPipeline();
  137 + byte[] key = stringSerializer.serialize(redisKey);
  138 + for (String value : valueList) {
  139 + connection.sAdd(key, stringSerializer.serialize(value));
  140 + }
  141 + return null;
  142 + });
  143 + //设置超时时间
  144 + RedisKeyBuilder redisTodayKey = RedisKeyBuilder.newInstance().appendFixed(redisKey);
  145 + pushRedisTemplate.expire(redisTodayKey,hours,TimeUnit.HOURS);
  146 + }
  147 +
106 } 148 }
@@ -4,7 +4,7 @@ @@ -4,7 +4,7 @@
4 <parent> 4 <parent>
5 <groupId>com.yoho</groupId> 5 <groupId>com.yoho</groupId>
6 <artifactId>yoho-starter</artifactId> 6 <artifactId>yoho-starter</artifactId>
7 - <version>1.4.5-SNAPSHOT</version> 7 + <version>1.4.6-SNAPSHOT</version>
8 </parent> 8 </parent>
9 9
10 <groupId>com.yoho.dsf</groupId> 10 <groupId>com.yoho.dsf</groupId>
@@ -16,4 +16,7 @@ consumer: @@ -16,4 +16,7 @@ consumer:
16 topic: msgcenter.inner.wechat 16 topic: msgcenter.inner.wechat
17 17
18 - class: com.yoho.yhmessage.wechat.consumer.MiniMsgSendConsumer 18 - class: com.yoho.yhmessage.wechat.consumer.MiniMsgSendConsumer
19 - topic: msgcenter.inner.mini  
  19 + topic: msgcenter.inner.mini
  20 +
  21 + - class: com.yoho.yhmessage.wechat.consumer.WechatCustomMsgConsumer
  22 + topic: msgcenter.inner.wechatCustom
@@ -17,4 +17,7 @@ consumer: @@ -17,4 +17,7 @@ consumer:
17 topic: msgcenter.inner.wechat 17 topic: msgcenter.inner.wechat
18 18
19 - class: com.yoho.yhmessage.wechat.consumer.MiniMsgSendConsumer 19 - class: com.yoho.yhmessage.wechat.consumer.MiniMsgSendConsumer
20 - topic: msgcenter.inner.mini  
  20 + topic: msgcenter.inner.mini
  21 +
  22 + - class: com.yoho.yhmessage.wechat.consumer.WechatCustomMsgConsumer
  23 + topic: msgcenter.inner.wechatCustom
@@ -16,4 +16,6 @@ public class Consts { @@ -16,4 +16,6 @@ public class Consts {
16 public static final String WEIXIN_ACCESS_TOKEN_URL = "https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid=APPID&secret=APPSECRET"; 16 public static final String WEIXIN_ACCESS_TOKEN_URL = "https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid=APPID&secret=APPSECRET";
17 //微信查询用户信息地址 17 //微信查询用户信息地址
18 public static final String QUERY_USER_INFO_URL = "https://api.weixin.qq.com/cgi-bin/user/info?access_token=ACCESS_TOKEN&openid=OPENID&lang=zh_CN"; 18 public static final String QUERY_USER_INFO_URL = "https://api.weixin.qq.com/cgi-bin/user/info?access_token=ACCESS_TOKEN&openid=OPENID&lang=zh_CN";
  19 + //客服消息发送地址
  20 + public static final String CUSTOM_MSG_SEND_URL = "https://api.weixin.qq.com/cgi-bin/message/custom/send?access_token=";
19 } 21 }
  1 +package com.yoho.yhmessage.wechat.consumer;
  2 +
  3 +import com.alibaba.fastjson.JSONObject;
  4 +import com.yoho.core.rabbitmq.YhConsumer;
  5 +import com.yoho.service.model.msgcenter.wechat.WechatCustomMsgBO;
  6 +import org.slf4j.Logger;
  7 +import org.slf4j.LoggerFactory;
  8 +import org.springframework.stereotype.Component;
  9 +
  10 +/**
  11 + * 客服消息发送
  12 + * Created by min.ling on 2018/9/29.
  13 + */
  14 +@Component
  15 +public class WechatCustomMsgConsumer implements YhConsumer {
  16 +
  17 + private static final Logger log = LoggerFactory.getLogger(WechatCustomMsgConsumer.class);
  18 +
  19 + @Override
  20 + public void handleMessage(Object message) throws Exception {
  21 + log.info("WechatCustomMsgConsumer.handleMessage with message is {}",message);
  22 +
  23 + WechatCustomMsgBO customMsgBO = JSONObject.parseObject(message.toString(), WechatCustomMsgBO.class);
  24 +
  25 + }
  26 +}
@@ -2,6 +2,7 @@ package com.yoho.yhmessage.wechat.service; @@ -2,6 +2,7 @@ package com.yoho.yhmessage.wechat.service;
2 2
3 import com.yoho.error.exception.ServiceException; 3 import com.yoho.error.exception.ServiceException;
4 import com.yoho.service.model.msgcenter.wechat.McWechatBO; 4 import com.yoho.service.model.msgcenter.wechat.McWechatBO;
  5 +import com.yoho.service.model.msgcenter.wechat.WechatCustomMsgBO;
5 6
6 /** 7 /**
7 * 8 *
@@ -24,4 +25,11 @@ public interface IWechatService { @@ -24,4 +25,11 @@ public interface IWechatService {
24 * @throws ServiceException <br> 25 * @throws ServiceException <br>
25 */ 26 */
26 void sendWechatMsg(McWechatBO wechatBO) throws ServiceException; 27 void sendWechatMsg(McWechatBO wechatBO) throws ServiceException;
  28 +
  29 + /**
  30 + * 发送微信客服消息
  31 + * @param customMsgBO
  32 + * @throws ServiceException
  33 + */
  34 + void sendWechatCustomMsg(WechatCustomMsgBO customMsgBO) throws ServiceException;
27 } 35 }
1 package com.yoho.yhmessage.wechat.service.impl; 1 package com.yoho.yhmessage.wechat.service.impl;
2 2
3 import com.alibaba.fastjson.JSONObject; 3 import com.alibaba.fastjson.JSONObject;
  4 +import com.google.common.util.concurrent.RateLimiter;
  5 +import com.yoho.core.common.utils.DateUtil;
  6 +import com.yoho.core.redis.cluster.operations.serializer.RedisKeyBuilder;
4 import com.yoho.error.exception.ServiceException; 7 import com.yoho.error.exception.ServiceException;
5 import com.yoho.service.model.msgcenter.wechat.McWechatBO; 8 import com.yoho.service.model.msgcenter.wechat.McWechatBO;
  9 +import com.yoho.service.model.msgcenter.wechat.WechatCustomMsgBO;
  10 +import com.yoho.yhmessage.constants.RedisCacheKeyConstant;
  11 +import com.yoho.yhmessage.redis.PushRedisService;
6 import com.yoho.yhmessage.wechat.common.Consts; 12 import com.yoho.yhmessage.wechat.common.Consts;
7 import com.yoho.yhmessage.wechat.service.IWechatService; 13 import com.yoho.yhmessage.wechat.service.IWechatService;
8 import com.yoho.yhmessage.wechat.service.IWeixinAccessTokenService; 14 import com.yoho.yhmessage.wechat.service.IWeixinAccessTokenService;
@@ -13,6 +19,12 @@ import org.slf4j.Logger; @@ -13,6 +19,12 @@ import org.slf4j.Logger;
13 import org.slf4j.LoggerFactory; 19 import org.slf4j.LoggerFactory;
14 import org.springframework.beans.factory.annotation.Autowired; 20 import org.springframework.beans.factory.annotation.Autowired;
15 import org.springframework.stereotype.Service; 21 import org.springframework.stereotype.Service;
  22 +import org.apache.commons.collections.CollectionUtils;
  23 +import retrofit2.http.PUT;
  24 +
  25 +import javax.print.DocFlavor;
  26 +import java.text.SimpleDateFormat;
  27 +import java.util.*;
16 28
17 /** 29 /**
18 * 30 *
@@ -38,6 +50,16 @@ public class WechatServiceImpl implements IWechatService { @@ -38,6 +50,16 @@ public class WechatServiceImpl implements IWechatService {
38 @Autowired 50 @Autowired
39 private SendWechatRequest sendWechatRequest; 51 private SendWechatRequest sendWechatRequest;
40 52
  53 + @Autowired
  54 + private PushRedisService pushRedisService;
  55 +
  56 + private static final String DATE_ID_FORMAT = "yyyyMMdd";
  57 +
  58 + //1s放10个令牌 100ms频率
  59 + private static final RateLimiter rateLimiter = RateLimiter.create(10);
  60 +
  61 + private static final int SEVEN_DAY_HOURS = 168;
  62 +
41 @Override 63 @Override
42 public void sendWechatMsg(McWechatBO wechatBO) throws ServiceException { 64 public void sendWechatMsg(McWechatBO wechatBO) throws ServiceException {
43 logger.info("begin sendTemplateMsg param {}", wechatBO); 65 logger.info("begin sendTemplateMsg param {}", wechatBO);
@@ -54,7 +76,7 @@ public class WechatServiceImpl implements IWechatService { @@ -54,7 +76,7 @@ public class WechatServiceImpl implements IWechatService {
54 logger.warn("sendWechatMsg with accessToken is null, publicCode is {},wechatBO is {}",wechatBO.getPublicNumberCode(),wechatBO); 76 logger.warn("sendWechatMsg with accessToken is null, publicCode is {},wechatBO is {}",wechatBO.getPublicNumberCode(),wechatBO);
55 return; 77 return;
56 } 78 }
57 - String templateUrl = this.getTemplateUrl(accessToken); 79 + String templateUrl = this.getTemplateUrl(accessToken,Consts.SEND_TEMPLATE_URL);
58 //针对需要展示微信用户昵称的消息,在这里调用微信api查询一把并替换,避免在其他服务调用微信api的复杂行,微信用户昵称统一用${wechatNickname}表示 80 //针对需要展示微信用户昵称的消息,在这里调用微信api查询一把并替换,避免在其他服务调用微信api的复杂行,微信用户昵称统一用${wechatNickname}表示
59 String data = this.addUserNickname(wechatBO, accessToken); 81 String data = this.addUserNickname(wechatBO, accessToken);
60 // 获取请求参数 82 // 获取请求参数
@@ -69,7 +91,7 @@ public class WechatServiceImpl implements IWechatService { @@ -69,7 +91,7 @@ public class WechatServiceImpl implements IWechatService {
69 logger.warn("send wechat template message error, e is {}", JSONObject.toJSONString(result) ); 91 logger.warn("send wechat template message error, e is {}", JSONObject.toJSONString(result) );
70 // 重新调接口获取access token 92 // 重新调接口获取access token
71 AccessToken accessTokenNew = weixinAccessTokenService.getAccessTokenFromUrl(wechatBO.getPublicNumberCode()); 93 AccessToken accessTokenNew = weixinAccessTokenService.getAccessTokenFromUrl(wechatBO.getPublicNumberCode());
72 - templateUrl = this.getTemplateUrl(accessTokenNew); 94 + templateUrl = this.getTemplateUrl(accessTokenNew,Consts.SEND_TEMPLATE_URL);
73 result = sendWechatRequest.sendWechatRequest(wechatBO.getPublicNumberCode(), templateUrl, "POST", sendMsg.toJSONString()); 95 result = sendWechatRequest.sendWechatRequest(wechatBO.getPublicNumberCode(), templateUrl, "POST", sendMsg.toJSONString());
74 } 96 }
75 sendWechatMsgLog.info("send wechat template message {},result {}, templateUrl is {}", sendMsg.toJSONString(), result, templateUrl); 97 sendWechatMsgLog.info("send wechat template message {},result {}, templateUrl is {}", sendMsg.toJSONString(), result, templateUrl);
@@ -103,12 +125,12 @@ public class WechatServiceImpl implements IWechatService { @@ -103,12 +125,12 @@ public class WechatServiceImpl implements IWechatService {
103 * @taskId <br> 125 * @taskId <br>
104 * @return <br> 126 * @return <br>
105 */ 127 */
106 - private String getTemplateUrl(AccessToken accessToken) { 128 + private String getTemplateUrl(AccessToken accessToken,String url) {
107 if (null == accessToken) { 129 if (null == accessToken) {
108 return null; 130 return null;
109 } 131 }
110 logger.info("accessToken is {}", accessToken.getToken()); 132 logger.info("accessToken is {}", accessToken.getToken());
111 - StringBuffer templateUrl = new StringBuffer(Consts.SEND_TEMPLATE_URL); 133 + StringBuffer templateUrl = new StringBuffer(url);
112 templateUrl.append(accessToken.getToken()); 134 templateUrl.append(accessToken.getToken());
113 return templateUrl.toString(); 135 return templateUrl.toString();
114 } 136 }
@@ -116,4 +138,80 @@ public class WechatServiceImpl implements IWechatService { @@ -116,4 +138,80 @@ public class WechatServiceImpl implements IWechatService {
116 private String getQueryUserUrl(String openId, AccessToken accessToken) { 138 private String getQueryUserUrl(String openId, AccessToken accessToken) {
117 return Consts.QUERY_USER_INFO_URL.replace("ACCESS_TOKEN", accessToken.getToken()).replace("OPENID", openId); 139 return Consts.QUERY_USER_INFO_URL.replace("ACCESS_TOKEN", accessToken.getToken()).replace("OPENID", openId);
118 } 140 }
  141 +
  142 +
  143 + @Override
  144 + public void sendWechatCustomMsg(WechatCustomMsgBO customMsgBO) throws ServiceException{
  145 + logger.info("enter sendWechatCustomMsg with sceneKey is {},msgBO is {}",customMsgBO.getSendKey(),customMsgBO.getSendContent());
  146 +
  147 + if(CollectionUtils.isEmpty(customMsgBO.getOpenIdSet())){
  148 + logger.warn("sendWechatCustomMsg with sendOpenIds null, sceneKey is {}",customMsgBO.getSendKey());
  149 + return;
  150 + }
  151 + String sceneKey = customMsgBO.getSendKey();
  152 + Set<String> openIdSet = new HashSet<>(customMsgBO.getOpenIdSet());
  153 + logger.info("sendWechatCustomMsg with sceneKey is {},send origin size is {}",sceneKey,openIdSet.size());
  154 + //根据过滤条件进行过滤
  155 + if(customMsgBO.getFilterDays() != 0){
  156 + getDaysFilter(customMsgBO,openIdSet);
  157 + }
  158 + //组装发送
  159 + List<String> sendSuccessList = customMsgSend(customMsgBO,openIdSet);
  160 + logger.info("end sendWechatCustomMsg with sceneKey is {},successSize is {}",customMsgBO.getSendKey(),sendSuccessList.size());
  161 + //成功记录存redis
  162 + String redisKeyStr = RedisCacheKeyConstant.WECAHT_CUSTOM_SEND_KEY + customMsgBO.getPublicNumberCode() + '_' + nowDate;
  163 + //成功记录存7天
  164 + pushRedisService.setListValue(redisKeyStr,sendSuccessList,SEVEN_DAY_HOURS);
  165 + }
  166 +
  167 + private void getDaysFilter(WechatCustomMsgBO customMsgBO, Set<String> openIdSet) {
  168 + int filterDayReal = customMsgBO.getFilterDays() - 1;
  169 + //根据间隔天数计算出需要过滤到的最早的一天
  170 + Calendar calendar = Calendar.getInstance();
  171 + calendar.add(Calendar.DATE, -filterDayReal);
  172 + String startDate = DateUtil.dateToString(calendar.getTime(), DATE_ID_FORMAT);
  173 + Date date = DateUtil.stringToDate(startDate,"DATE_ID_FORMAT");
  174 + Calendar c = Calendar.getInstance();
  175 + c.setTime(date);
  176 + int day = c.get(Calendar.DATE);
  177 + for (int i = 0; i <= filterDayReal; i++){
  178 + //从最早日期开始往后累加到每个日期,每天过滤
  179 + c.set(Calendar.DATE, day + i);
  180 + String nowDate = new SimpleDateFormat(DATE_ID_FORMAT).format(c.getTime());
  181 + //根据日期获取当天发送客服消息的记录
  182 + String redisKeyStr = RedisCacheKeyConstant.WECAHT_CUSTOM_SEND_KEY + customMsgBO.getPublicNumberCode() + '_' + nowDate;
  183 + pushRedisService.getSetFilter(openIdSet,redisKeyStr);
  184 + logger.info("sendWechatCustomMsg with day filter,sceneKey is {},redisKeyStr is {},nowSize is {}",customMsgBO.getSendKey(),redisKeyStr,openIdSet.size());
  185 + }
  186 + }
  187 +
  188 +
  189 + private List<String> customMsgSend(WechatCustomMsgBO customMsgBO,Set<String> openIdSet){
  190 + logger.info("begin customMsgSend with sceneKey is {},send openId size is {}",customMsgBO.getSendKey(),openIdSet.size());
  191 + // 获取token 和接口url
  192 + AccessToken accessToken = weixinAccessTokenService.queryWeixinAccessToken(customMsgBO.getPublicNumberCode());
  193 + //若仍未获取到 token 则返回
  194 + if(accessToken == null){
  195 + logger.warn("customMsgSend with accessToken is null,sendKey is {}",customMsgBO.getSendKey());
  196 + return new ArrayList<>();
  197 + }
  198 + List<String> successList = new ArrayList<>();
  199 + String templateUrl = this.getTemplateUrl(accessToken, Consts.CUSTOM_MSG_SEND_URL);
  200 + //组装消息内容
  201 + JSONObject sendDataObject = new JSONObject();
  202 + sendDataObject.put("msgtype",customMsgBO.getMsgType());
  203 + sendDataObject.put("news",JSONObject.parseObject(customMsgBO.getSendContent()));
  204 + openIdSet.forEach(openId -> {
  205 + //限制100ms调一次
  206 + rateLimiter.acquire();
  207 + sendDataObject.put("touser", openId);
  208 + JSONObject result = sendWechatRequest.sendWechatRequest(customMsgBO.getPublicNumberCode(), templateUrl, "POST", sendDataObject.toJSONString());
  209 + if (result != null && result.getIntValue("errcode") == 0) {
  210 + successList.add(openId);
  211 + }
  212 + sendWechatMsgLog.info("customMsgSend with sceneKey {},openId is {},result {}, content is {}", customMsgBO.getSendKey(), openId, result, sendDataObject.toJSONString());
  213 + });
  214 + return successList;
  215 + }
  216 +
119 } 217 }