提交 01d603b2 作者: gh

定时客服消息使用MQ

上级 e46b30b8
......@@ -63,6 +63,8 @@ public class CustomerCommonConstant {
public final static Integer SEND_STATUS_ACTIVE = 9;
public final static Integer SEND_STATUS_FINISHED = 10;
public final static Integer SEND_STATUS_LINK_ERROR = 3;
public final static Integer SEND_STATUS_SENT_FAIL = 4;
......
......@@ -17,13 +17,22 @@ public class RabbitCommonNameConstant {
public static final String YG_LIMIT_TTL_QUEUE = "YG_LIMIT_TTL_QUEUE";
public static final String YG_LIMIT_TTL_ROUTE_KEY = "YG_LIMIT";
/**
* 2、运营系统交换机
*/
public static final String CUSTOMER_SERVICE_EXCHANGE = "CUSTOMER_SERVICE_EXCHANGE";
/**
* 客服消息处理队列
*/
public static final String CUSTOMER_MESSAGE_ROUTE_KEY="CUSTOMER_MESSAGE_HANDLE";
public static final String CUSTOMER_MESSAGE_QUEUE="CUSTOMER_MESSAGE_HANDLE_QUEUE";
/**
* 2、死信交换机 - 死信队列 - routeKey
* 3、死信交换机 - 死信队列 - routeKey
*/
public static final String DEATH_EXCHANGE = "DEATH_EXCHANGE";
......@@ -33,6 +42,11 @@ public class RabbitCommonNameConstant {
public static final String YG_LIMIT_DEATH_QUEUE = "YG_LIMIT_DEATH_QUEUE";
public static final String YG_LIMIT_DEATH_ROUTE_KEY = "YG_LIMIT_DEATH";
/**
* 客服消息死信队列
*/
public static final String CUSTOMER_MESSAGE_DEATH_QUEUE="CUSTOMER_MESSAGE_DEATH_QUEUE";
public static final String CUSTOMER_MESSAGE_DEATH_ROUTE_KEY="CUSTOMER_MESSAGE_DEATH";
}
package com.yaoyaozw.customer.dto.customer;
import com.yaoyaozw.customer.service.wechat.entity.customerRequest.WeChatCustomerRequestEntity;
import lombok.Data;
import java.io.Serializable;
/**
* @author wgh
* @date 2022/11/10 14:52
*/
@Data
public class CustomerMessageTransferDTO implements Serializable {
/**
* 客服发送请求体
*/
private WeChatCustomerRequestEntity customerRequestEntity;
/**
* 素材id
*/
private Long customerGraphicsId;
/**
* appid
*/
private String appid;
/**
* openid
*/
private String openid;
}
package com.yaoyaozw.customer.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.yaoyaozw.customer.dto.integration.IntegrationRequestDTO;
import com.yaoyaozw.customer.entity.AuthorizerToken;
import com.yaoyaozw.customer.service.AuthorizerTokenService;
import com.yaoyaozw.customer.service.CustomerGraphicsDelayService;
import com.yaoyaozw.customer.service.wechat.service.WeChatService;
import com.yaoyaozw.customer.utils.YYZWDateUtil;
import com.yaoyaozw.customer.vo.customer.CustomerDelayItemVO;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.*;
......@@ -31,13 +29,15 @@ import com.yaoyaozw.customer.service.CustomerDelayPublishService;
@Service
public class CustomerDelayPublishServiceImpl extends ServiceImpl<CustomerDelayPublishMapper, CustomerDelayPublish> implements CustomerDelayPublishService{
private final static String ACCESS_TOKEN_REDIS_KEY = "AUTH_ACCESS_TOKEN";
private final static Logger localLog = LoggerFactory.getLogger(CustomerDelayPublishServiceImpl.class);
@Autowired
private CustomerGraphicsDelayService customerGraphicsDelayService;
private RedisTemplate<String,Object> redisTemplate;
@Autowired
private AuthorizerTokenService authorizerTokenService;
private CustomerGraphicsDelayService customerGraphicsDelayService;
@Autowired
private WeChatService weChatService;
......@@ -71,11 +71,9 @@ public class CustomerDelayPublishServiceImpl extends ServiceImpl<CustomerDelayPu
String appid = userEntry.getKey();
//获取该号token
AuthorizerToken authorizerToken = authorizerTokenService.findTokenByAppid(appid);
String token = authorizerToken.getAuthorizerAppid();
Object tokenObject = redisTemplate.opsForHash().get(ACCESS_TOKEN_REDIS_KEY, appid);
//没token过滤
if (token==null||"".equals(token)){ continue; }
if (tokenObject==null|| StringUtils.isBlank(tokenObject.toString())){ continue; }
//该号下延时客服
Map<Integer,List<CustomerDelayItemVO>> delaySortMap = customerMap.get(appid);
//所有的用户
......@@ -87,14 +85,19 @@ public class CustomerDelayPublishServiceImpl extends ServiceImpl<CustomerDelayPu
//将所有待用延时客服收集
for (CustomerDelayPublish userPublish : userList) {
//发送延时客服
futureList.add(weChatService.sendCustomerDelayMessage(authorizerToken, userPublish, delaySortMap,needUpdateCustomerDelay)) ;
futureList.add(weChatService.sendCustomerDelayMessage(appid,tokenObject.toString(), userPublish, delaySortMap,needUpdateCustomerDelay)) ;
}
}
}
List<CustomerDelayPublish> registerUserEntities = new ArrayList<>();
//更新下次延时排期
for (Future<CustomerDelayPublish> delayPublishFuture : futureList) {
try {
registerUserEntities.add(delayPublishFuture.get()) ;
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
if (!registerUserEntities.isEmpty()){
......
......@@ -30,6 +30,7 @@ import com.yaoyaozw.customer.vo.referral.ReferralEntityVo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.util.*;
......@@ -57,6 +58,7 @@ public class CustomerGraphicsServiceImpl extends ServiceImpl<CustomerGraphicsMap
private final static Logger LOCAL_LOG = LoggerFactory.getLogger(CustomerGraphicsServiceImpl.class);
@Autowired
private CrowdPackageConditionMatchService matchService;
@Autowired
......@@ -69,8 +71,6 @@ public class CustomerGraphicsServiceImpl extends ServiceImpl<CustomerGraphicsMap
private WeChatService weChatService;
@Autowired
private RegisterUserEntityService registerUserEntityService;
@Autowired
private AuthorizerTokenService authorizerTokenService;
@Autowired
private CrowdPackageConditionService conditionService;
......@@ -82,6 +82,7 @@ public class CustomerGraphicsServiceImpl extends ServiceImpl<CustomerGraphicsMap
@Autowired
private CrowdPackageService crowdPackageService;
@Override
public BaseResult insertCustomerMessage(CustomerMessageSaveDTO saveDto) {
......@@ -213,28 +214,32 @@ public class CustomerGraphicsServiceImpl extends ServiceImpl<CustomerGraphicsMap
return new BaseResult().error("更新失败");
}
/**
* 发送客服消息
* @param integrationRequestDTO 请求DTO
*/
@Override
public void sendCustomerMessage(IntegrationRequestDTO integrationRequestDTO) {
Date requestDate = integrationRequestDTO.getRequestDate();
long currentTimestamp = requestDate.getTime();
//获取待发送的开启客服
List<CustomerGraphics> customerGraphicsList = list(new QueryWrapper<CustomerGraphics>().eq(CustomerGraphics.COL_POST_TIME, requestDate).eq(CustomerGraphics.COL_SEND_STATUS,CustomerCommonConstant.SEND_STATUS_ACTIVE));
List<CustomerGraphics> customerGraphicsList = list(new QueryWrapper<CustomerGraphics>().eq(CustomerGraphics.COL_POST_TIME, requestDate).eq(CustomerGraphics.COL_SEND_STATUS,9));
if(!customerGraphicsList.isEmpty()){
List<CrowdPackage> crowdPackageList = crowdPackageService.list();
Map<Long, CrowdPackage> crowdPackageMap = crowdPackageList.stream().collect(Collectors.toMap(CrowdPackage::getId, a -> a));
if(!customerGraphicsList.isEmpty()){
LOCAL_LOG.info("start sendCustomerMessage");
for (CustomerGraphics customerGraphics : customerGraphicsList) {
//人群包id
Long packId = customerGraphics.getPackId();
CrowdPackage crowdPackage = crowdPackageMap.get(packId);
//活跃时间限制窗口
Long activeTimeMax = crowdPackage.getActiveTimeMax();
Long activeTimeMin = crowdPackage.getActiveTimeMin();
......@@ -243,8 +248,9 @@ public class CustomerGraphicsServiceImpl extends ServiceImpl<CustomerGraphicsMap
//1.活跃时间判断,2.用户去重
Map<String, List<CrowdPackageUserVO>> appidUserMap = userList.stream()
.filter(a->a.getLastActive()!=null&&(activeTimeMin==null||(currentTimestamp-a.getLastActive().getTime())>=activeTimeMin)&&(activeTimeMax==null||(currentTimestamp-a.getLastActive().getTime()<activeTimeMax)))
.filter(a->a.getLastActive()!=null&&(activeTimeMin==null||((currentTimestamp-a.getLastActive().getTime())>=activeTimeMin))&&(activeTimeMax==null||((currentTimestamp-a.getLastActive().getTime())<activeTimeMax)))
.collect(Collectors.groupingBy(CrowdPackageUserVO::getAppId, Collectors.collectingAndThen(toCollection(() -> new TreeSet<>(Comparator.comparing(CrowdPackageUserVO::getOpenId))), ArrayList::new)));
//根据客服id找不同公众号的链接,并按appId分组
List<ReferralEntityVo> referralList = referralEntityService.findReferralByCustomerGraphicsId(customerGraphics.getId());
......@@ -253,21 +259,18 @@ public class CustomerGraphicsServiceImpl extends ServiceImpl<CustomerGraphicsMap
for (Map.Entry<String, List<ReferralEntityVo>> referralEntry : referralMap.entrySet()) {
String appid = referralEntry.getKey();
//获取token
AuthorizerToken authorizerToken = authorizerTokenService.findTokenByAppid(appid);
if (authorizerToken!=null&&authorizerToken.getAuthorizerAccessToken()!=null){
//获取该号的链接实体
List<ReferralEntityVo> referralEntityVo = referralMap.get(appid);
//获取该号的openid
List<CrowdPackageUserVO> packageUserVo = appidUserMap.get(appid);
if (referralEntityVo!=null&&!referralEntityVo.isEmpty()){
if (referralEntityVo!=null&&!referralEntityVo.isEmpty()&&packageUserVo!=null&&!packageUserVo.isEmpty()){
weChatService.sendCustomerMessage(appid,authorizerToken,customerGraphics,packageUserVo,referralEntityVo);
}
weChatService.sendCustomerMessage(appid,customerGraphics,packageUserVo,referralEntityVo);
}
}
//客服状态修改
customerGraphics.setSendStatus(CustomerCommonConstant.SEND_STATUS_FINISHED);
}
updateBatchById(customerGraphicsList);
LOCAL_LOG.info("end sendCustomerMessage");
......
......@@ -15,7 +15,7 @@ public interface WeChatService {
/**
* 发送延时客服消息
*/
Future<CustomerDelayPublish> sendCustomerDelayMessage( AuthorizerToken authorizerToken, CustomerDelayPublish user,
Future<CustomerDelayPublish> sendCustomerDelayMessage( String appid,String token, CustomerDelayPublish user,
Map<Integer,List<CustomerDelayItemVO>>delaySortMap,
Set<CustomerDelayItemVO>needUpdateVoList);
......@@ -23,7 +23,7 @@ public interface WeChatService {
/**
* 定时发送客服消息
*/
void sendCustomerMessage(String appid, AuthorizerToken authorizerToken, CustomerGraphics customerGraphics,
void sendCustomerMessage(String appid, CustomerGraphics customerGraphics,
List<CrowdPackageUserVO>openidList, List<ReferralEntityVo> referralEntityVo);
}
package com.yaoyaozw.customer.service.wechat.service;
import com.alibaba.fastjson.JSONObject;
import com.yaoyaozw.customer.constants.CustomerCommonConstant;
import com.yaoyaozw.customer.constants.RabbitCommonNameConstant;
import com.yaoyaozw.customer.dto.customer.CustomerMessageTransferDTO;
import com.yaoyaozw.customer.entity.AuthorizerToken;
import com.yaoyaozw.customer.entity.CrowdPackage;
import com.yaoyaozw.customer.entity.CustomerDelayPublish;
import com.yaoyaozw.customer.entity.CustomerGraphics;
import com.yaoyaozw.customer.service.AuthorizerTokenService;
import com.yaoyaozw.customer.service.ReferralEntityService;
import com.yaoyaozw.customer.service.impl.CustomerDelayPublishServiceImpl;
import com.yaoyaozw.customer.service.wechat.entity.WeChatResponseEntity;
import com.yaoyaozw.customer.service.wechat.entity.customerRequest.CustomerNewsArticleItem;
import com.yaoyaozw.customer.service.wechat.entity.customerRequest.CustomerNewsItem;
import com.yaoyaozw.customer.service.wechat.entity.customerRequest.WeChatCustomerRequestEntity;
import com.yaoyaozw.customer.vo.customer.CrowdPackageUserVO;
import com.yaoyaozw.customer.vo.customer.CustomerDelayItemVO;
import com.yaoyaozw.customer.vo.referral.ReferralEntityVo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;
import org.springframework.web.util.UriComponentsBuilder;
import java.net.URISyntaxException;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
......@@ -33,8 +28,6 @@ import java.util.stream.Collectors;
@Service
public class WeChatServiceImpl implements WeChatService{
private final static Logger localLog = LoggerFactory.getLogger(WeChatServiceImpl.class);
private String customerPath="https://api.weixin.qq.com/cgi-bin/message/custom/send";
private static final String CUSTOMER_TEXT="text";
......@@ -49,20 +42,25 @@ public class WeChatServiceImpl implements WeChatService{
private static final Integer FORBID_TIME=6;
private static final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(20, 100, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(2),new ThreadPoolExecutor.CallerRunsPolicy());
private static final String ACCESS_TOKEN="access_token";
private final static Object CUSTOMER_LOCK=new Object();
private static final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(20, 100, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10),new ThreadPoolExecutor.CallerRunsPolicy());
private final static String ACCESS_TOKEN_REDIS_KEY = "AUTH_ACCESS_TOKEN";
@Autowired
private AuthorizerTokenService authorizerTokenService;
private RedisTemplate<String,Object> redisTemplate;
@Autowired
private WeChatRestService weChatRestService;
@Autowired
private RabbitTemplate rabbitTemplate;
@Async("delayCustomerExecutor")
@Override
public Future<CustomerDelayPublish> sendCustomerDelayMessage( AuthorizerToken authorizerToken, CustomerDelayPublish user,
public Future<CustomerDelayPublish> sendCustomerDelayMessage( String appid,String token , CustomerDelayPublish user,
Map<Integer,List<CustomerDelayItemVO>> delaySortMap,
Set<CustomerDelayItemVO> needUpdateVoList) {
//获取当前小时数
......@@ -71,7 +69,7 @@ public class WeChatServiceImpl implements WeChatService{
//早上0-6点不发
if (i>=FORBID_TIME){
//token
UriComponentsBuilder uriComponentsBuilder = UriComponentsBuilder.fromHttpUrl(customerPath).queryParam("access_token",authorizerToken.getAuthorizerAccessToken());
UriComponentsBuilder uriComponentsBuilder = UriComponentsBuilder.fromHttpUrl(customerPath).queryParam(ACCESS_TOKEN,token);
//找到延时序列
List<CustomerDelayItemVO> sendCustomerDelays = delaySortMap.get(user.getCustomerSort());
......@@ -90,22 +88,27 @@ public class WeChatServiceImpl implements WeChatService{
if (customerRequest!=null){
customerRequest.setTouser(user.getOpenId());
//System.err.println(user.getOpenId());
try {
WeChatResponseEntity response=(WeChatResponseEntity)weChatRestService.httpPostRequest(uriComponentsBuilder, customerRequest,WeChatResponseEntity.class);
//System.err.println(response);
if(SUCCESS_CODE.equals(response.getErrcode())){
//发送成功计数
sendCustomerDelay.updateSendNum();
}else if(EXPIRED_CODE.equals(response.getErrcode()) ){
//token过期重新取一下
AuthorizerToken updateToken = authorizerTokenService.findTokenByAppid(authorizerToken.getAuthorizerAppid());
return sendCustomerDelayMessage(updateToken,user,delaySortMap,needUpdateVoList);
Object tokenObject = redisTemplate.opsForHash().get(ACCESS_TOKEN_REDIS_KEY, appid);
if (tokenObject!=null){
return sendCustomerDelayMessage(appid,tokenObject.toString(),user,delaySortMap,needUpdateVoList);
}
//token过期重新取一下
//AuthorizerToken updateToken = authorizerTokenService.findTokenByAppid(authorizerToken.getAuthorizerAppid());
}
} catch (Exception e) {
localLog.error("发生异常: {}, 位置: {}", e.getMessage(), e.getStackTrace()[0]);
e.printStackTrace();
}
}
}
......@@ -131,10 +134,8 @@ public class WeChatServiceImpl implements WeChatService{
@Override
public void sendCustomerMessage(String appid, AuthorizerToken authorizerToken, CustomerGraphics customerGraphics, List<CrowdPackageUserVO> openidList,
public void sendCustomerMessage(String appid, CustomerGraphics customerGraphics, List<CrowdPackageUserVO> openidList,
List<ReferralEntityVo> referralEntityVo) {
//token
UriComponentsBuilder uriComponentsBuilder = UriComponentsBuilder.fromHttpUrl(customerPath).queryParam("access_token",authorizerToken.getAuthorizerAccessToken());
//构建请求参数(文本/图文)
WeChatCustomerRequestEntity customerRequest = buildCustomerRequest(customerGraphics,referralEntityVo);
......@@ -145,41 +146,26 @@ public class WeChatServiceImpl implements WeChatService{
for (int i = 0; i < openidList.size(); i+=THREAD_SIZE) {
List<CrowdPackageUserVO> subOpenidList = openidList.subList(i, Math.min(i + THREAD_SIZE, openidList.size()));
//TODO:配置多线程+测试+记录发送量
EXECUTOR.execute(()->{
for (CrowdPackageUserVO crowdPackageUserVO : subOpenidList) {
//遍历人群包,每个用户一发
customerRequest.setTouser(crowdPackageUserVO.getOpenId());
try {
WeChatResponseEntity response=(WeChatResponseEntity)weChatRestService.httpPostRequest(uriComponentsBuilder, customerRequest,WeChatResponseEntity.class);
CustomerMessageTransferDTO customerMessageTransferDTO = new CustomerMessageTransferDTO();
customerMessageTransferDTO.setAppid(appid);
customerMessageTransferDTO.setCustomerGraphicsId(customerGraphics.getId());
customerMessageTransferDTO.setOpenid(crowdPackageUserVO.getOpenId());
customerMessageTransferDTO.setCustomerRequestEntity(customerRequest);
rabbitTemplate.convertAndSend(RabbitCommonNameConstant.CUSTOMER_SERVICE_EXCHANGE, RabbitCommonNameConstant.CUSTOMER_MESSAGE_ROUTE_KEY,JSONObject.toJSONString(customerMessageTransferDTO));
if(SUCCESS_CODE.equals(response.getErrcode())){
//线程锁
synchronized (CUSTOMER_LOCK){
//发送成功计数
customerGraphics.updateCount();
}
}else if(EXPIRED_CODE.equals(response.getErrcode()) ){
Thread.sleep(500);
//token过期重新取一下
synchronized (uriComponentsBuilder){
AuthorizerToken updateToken = authorizerTokenService.findTokenByAppid(appid);
uriComponentsBuilder.replaceQueryParam("access_token",updateToken.getAuthorizerAccessToken());
}
}
} catch (Exception e) {
e.printStackTrace();
}finally {
latch.countDown();
}
}
});
}
try {
latch.await();
} catch (InterruptedException e) {
} catch (Exception e) {
e.printStackTrace();
}
}
......@@ -244,5 +230,4 @@ public class WeChatServiceImpl implements WeChatService{
return null;
}
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论