提交 9068bf4c 作者: 沈振路

刷新既不在“全量用户”也不在“小程序用户”标签下的用户标签刷为“全量用户”的测试类接口

上级 01c33dd5
......@@ -3,3 +3,5 @@
/customer-service/
/test-customer-service/
/.idea/
/.cursor
/.claude
\ No newline at end of file
......@@ -8,17 +8,25 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.yaoyaozw.customer.components.CustomerServiceCommonAsyncComponent;
import com.yaoyaozw.customer.constants.CustomerMaterialConstant;
import com.yaoyaozw.customer.dto.integration.IntegrationRequestDTO;
import com.yaoyaozw.customer.dto.wechat.BatchTaggingDTO;
import com.yaoyaozw.customer.entity.CustomerFollowReply;
import com.yaoyaozw.customer.entity.CustomerGraphics;
import com.yaoyaozw.customer.entity.CrowdPackage;
import com.yaoyaozw.customer.entity.RegisterUserEntity;
import com.yaoyaozw.customer.mapper.KanbanCommonMapper;
import com.yaoyaozw.customer.mapper.WeChatTagCleanMapper;
import com.yaoyaozw.customer.publisher.AcquisitionExceptionEventPublisher;
import com.yaoyaozw.customer.service.*;
import com.yaoyaozw.customer.service.wechat.entity.WeChatResponseEntity;
import com.yaoyaozw.customer.service.wechat.service.WeChatService;
import com.yaoyaozw.customer.utils.WeChatTagRest;
import com.yaoyaozw.customer.vo.AuthInfoVO;
import com.yaoyaozw.customer.vo.customer.CrowdPackageUserVO;
import com.yaoyaozw.customer.vo.referral.ReferralEntityVo;
import com.yaoyaozw.customer.vo.wechat.AllUserListVO;
import com.yaoyaozw.customer.vo.wechat.PendingAuthAccountVO;
import com.yaoyaozw.customer.vo.wechat.TagUserListVO;
import com.yaoyaozw.customer.vo.wechat.AuthorizerFullUserTagVO;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
......@@ -30,7 +38,17 @@ import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
......@@ -334,4 +352,174 @@ public class TestController {
}
}
private static final String TAG_NAME_ALL_USER = "全量用户";
private static final String TAG_NAME_MINI_PROGRAM_USER = "小程序用户";
private static final int BATCH_TAGGING_SIZE = 50;
@Resource
private WeChatTagCleanMapper weChatTagCleanMapper;
@Resource
private WeChatTagRest weChatTagRest;
@GetMapping("/refreshFullUserTag")
public void refreshFullUserTag() {
log.info("开始为公众号用户打全量用户标签...");
List<PendingAuthAccountVO> accounts = weChatTagCleanMapper.getPendingAuthAccounts();
log.info("待处理公众号数量: {}", accounts.size());
Map<Long, Integer> fullUserWechatTagIdByAuthorizer = Collections.emptyMap();
Map<Long, Integer> miniProgramUserWechatTagIdByAuthorizer = Collections.emptyMap();
List<Long> authorizerIdsToQuery = accounts.stream()
.map(PendingAuthAccountVO::getAuthId)
.filter(Objects::nonNull)
.distinct()
.collect(Collectors.toList());
if (!authorizerIdsToQuery.isEmpty()) {
List<AuthorizerFullUserTagVO> dbTagRows =
weChatTagCleanMapper.listFullUserWechatTagByAuthorizers(
authorizerIdsToQuery,
Arrays.asList(TAG_NAME_ALL_USER, TAG_NAME_MINI_PROGRAM_USER));
Map<String, Map<Long, Integer>> tagMapByTagName = dbTagRows.stream()
.filter(row -> row.getAuthorizerId() != null
&& row.getWechatTagId() != null
&& row.getTagName() != null)
.collect(Collectors.groupingBy(
AuthorizerFullUserTagVO::getTagName,
Collectors.toMap(
AuthorizerFullUserTagVO::getAuthorizerId,
AuthorizerFullUserTagVO::getWechatTagId,
(left, right) -> left)));
fullUserWechatTagIdByAuthorizer =
tagMapByTagName.getOrDefault(TAG_NAME_ALL_USER, Collections.emptyMap());
miniProgramUserWechatTagIdByAuthorizer =
tagMapByTagName.getOrDefault(TAG_NAME_MINI_PROGRAM_USER, Collections.emptyMap());
log.info("从库中载入标签映射: 「{}」{} 条, 「{}」{} 条 (授权方候选数: {})",
TAG_NAME_ALL_USER,
fullUserWechatTagIdByAuthorizer.size(),
TAG_NAME_MINI_PROGRAM_USER,
miniProgramUserWechatTagIdByAuthorizer.size(),
authorizerIdsToQuery.size());
}
int idx = 1;
for (PendingAuthAccountVO account : accounts) {
String appid = account.getAppid();
String nickName = account.getNickName();
String accessToken = account.getAuthorizerAccessToken();
Long authId = account.getAuthId();
if (StringUtils.isAnyBlank(appid, accessToken)) {
log.warn("公众号信息缺失, 跳过: appid={}, nickName={}", appid, nickName);
continue;
}
if (authId == null) {
log.warn("公众号缺少 authId(authorizer_id), 跳过: appid={}, nickName={}", appid, nickName);
continue;
}
log.info("刷新公众号用户标签进度:{} / {},当前公众号:{}", idx++, accounts.size(), account.getNickName());
try {
Integer fullUserTagId = fullUserWechatTagIdByAuthorizer.get(authId);
if (fullUserTagId == null) {
log.warn("公众号 {}({}) 在库中未找到 [{}] 标签记录, 跳过", nickName, appid,
TAG_NAME_ALL_USER);
continue;
}
Integer miniProgramUserTagId = miniProgramUserWechatTagIdByAuthorizer.get(authId);
Set<String> fullUserTagUserIds = fetchAllOpenidsByTag(accessToken, fullUserTagId);
Set<String> miniProgramUserTagUserIds = miniProgramUserTagId != null
? fetchAllOpenidsByTag(accessToken, miniProgramUserTagId)
: Collections.emptySet();
Set<String> taggedUnionOpenids = new HashSet<>(fullUserTagUserIds);
taggedUnionOpenids.addAll(miniProgramUserTagUserIds);
List<String> allUserIds = fetchAllOpenids(accessToken);
List<String> needTagOpenidList = allUserIds.stream()
.filter(openid -> !taggedUnionOpenids.contains(openid))
.collect(Collectors.toList());
if (needTagOpenidList.isEmpty()) {
continue;
}
List<List<String>> partitions = Lists.partition(needTagOpenidList, BATCH_TAGGING_SIZE);
int updatedCount = 0;
for (List<String> partition : partitions) {
BatchTaggingDTO body = new BatchTaggingDTO();
body.setOpenid_list(partition);
body.setTagid(fullUserTagId);
WeChatResponseEntity resp = weChatTagRest.batchTagging(accessToken, body);
if (resp != null && (resp.getErrcode() == null || resp.getErrcode() == 0)) {
updatedCount += partition.size();
} else {
log.warn("公众号 {}({}) 批量打标签失败: errcode={}, errmsg={}",
nickName, appid,
resp == null ? null : resp.getErrcode(),
resp == null ? null : resp.getErrmsg());
}
}
log.info("公众号 appid={}, 名称={}, 此次更新标签的用户数={}, 目标标签id={}",
appid, nickName, updatedCount, fullUserTagId);
} catch (Exception e) {
log.error("处理公众号 {}({}) 时发生错误: {}", nickName, appid, e.getMessage(), e);
}
}
log.info("公众号用户全量用户标签清洗完成");
}
private Set<String> fetchAllOpenidsByTag(String accessToken, Integer tagid) {
Set<String> result = new HashSet<>();
String nextOpenid = null;
while (true) {
TagUserListVO vo = weChatTagRest.getTagUserList(accessToken, tagid, nextOpenid);
if (vo == null || vo.getData() == null || vo.getData().getOpenid() == null
|| vo.getData().getOpenid().isEmpty()) {
break;
}
result.addAll(vo.getData().getOpenid());
String returnedNext = vo.getNext_openid();
if (StrUtil.isBlank(returnedNext)) {
break;
}
nextOpenid = returnedNext;
}
return result;
}
private List<String> fetchAllOpenids(String accessToken) {
List<String> result = new ArrayList<>();
String nextOpenid = null;
while (true) {
AllUserListVO vo = weChatTagRest.getAllUserList(accessToken, nextOpenid);
if (vo == null || vo.getData() == null || vo.getData().getOpenid() == null
|| vo.getData().getOpenid().isEmpty()) {
break;
}
result.addAll(vo.getData().getOpenid());
String returnedNext = vo.getNext_openid();
if (StrUtil.isBlank(returnedNext)) {
break;
}
nextOpenid = returnedNext;
}
return result;
}
}
package com.yaoyaozw.customer.dto.wechat;
import lombok.Data;
import java.io.Serializable;
import java.util.List;
/**
* 批量更新用户标签请求体
*/
@Data
public class BatchTaggingDTO implements Serializable {
private static final long serialVersionUID = 1L;
private List<String> openid_list;
private Integer tagid;
}
package com.yaoyaozw.customer.dto.wechat;
import lombok.Data;
import java.io.Serializable;
/**
* 查询标签下用户列表请求体
*/
@Data
public class TagUserQueryDTO implements Serializable {
private static final long serialVersionUID = 1L;
private Integer tagid;
private String next_openid;
}
package com.yaoyaozw.customer.mapper;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.yaoyaozw.customer.vo.wechat.AuthorizerFullUserTagVO;
import com.yaoyaozw.customer.vo.wechat.PendingAuthAccountVO;
import org.apache.ibatis.annotations.Param;
import org.springframework.stereotype.Repository;
import java.util.List;
/**
* 公众号用户标签清洗
*/
@Repository
@DS("material")
public interface WeChatTagCleanMapper {
/**
* 查询待处理的公众号列表
*/
List<PendingAuthAccountVO> getPendingAuthAccounts();
/**
* 批量查询指定授权方下多个标签在微信侧的 id(循环外一次查询)
*
* @param authorizerIds 授权方 id 列表(与 PendingAuthAccountVO#getAuthId 一致)
* @param tagNames 标签名称列表
* @return 每个授权方每条标签对应一条可用的 wechat_tag 记录
*/
List<AuthorizerFullUserTagVO> listFullUserWechatTagByAuthorizers(
@Param("authorizerIds") List<Long> authorizerIds,
@Param("tagNames") List<String> tagNames);
}
package com.yaoyaozw.customer.utils;
import cn.hutool.json.JSONUtil;
import com.yaoyaozw.customer.dto.wechat.BatchTaggingDTO;
import com.yaoyaozw.customer.dto.wechat.TagUserQueryDTO;
import com.yaoyaozw.customer.service.wechat.entity.WeChatResponseEntity;
import com.yaoyaozw.customer.vo.wechat.AllUserListVO;
import com.yaoyaozw.customer.vo.wechat.TagUserListVO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* 公众号标签相关接口封装
*/
@Slf4j
@Component
public class WeChatTagRest {
private static final String TAG_USER_LIST_URL = "https://api.weixin.qq.com/cgi-bin/user/tag/get";
private static final String ALL_USER_LIST_URL = "https://api.weixin.qq.com/cgi-bin/user/get";
private static final String BATCH_TAGGING_URL = "https://api.weixin.qq.com/cgi-bin/tags/members/batchtagging";
/**
* 查询标签下用户列表
*/
public TagUserListVO getTagUserList(String accessToken, Integer tagid, String nextOpenid) {
Map<String, String> params = new HashMap<>(2);
params.put("access_token", accessToken);
TagUserQueryDTO body = new TagUserQueryDTO();
body.setTagid(tagid);
body.setNext_openid(nextOpenid == null ? "" : nextOpenid);
String result = HttpClientUtil.doPostJson(TAG_USER_LIST_URL, JSONUtil.toJsonStr(body), params);
return JSONUtil.toBean(result, TagUserListVO.class);
}
/**
* 查询公众号下所有用户列表
*/
public AllUserListVO getAllUserList(String accessToken, String nextOpenid) {
Map<String, String> params = new HashMap<>(2);
params.put("access_token", accessToken);
if (nextOpenid != null && !nextOpenid.isEmpty()) {
params.put("next_openid", nextOpenid);
}
String result = HttpClientUtil.doGet(ALL_USER_LIST_URL, params);
return JSONUtil.toBean(result, AllUserListVO.class);
}
/**
* 批量更新用户标签
*/
public WeChatResponseEntity batchTagging(String accessToken, BatchTaggingDTO body) {
Map<String, String> params = new HashMap<>(2);
params.put("access_token", accessToken);
String result = HttpClientUtil.doPostJson(BATCH_TAGGING_URL, JSONUtil.toJsonStr(body), params);
return JSONUtil.toBean(result, WeChatResponseEntity.class);
}
}
package com.yaoyaozw.customer.vo.wechat;
import lombok.Data;
import java.io.Serializable;
/**
* 查询公众号下所有用户列表响应
*/
@Data
public class AllUserListVO implements Serializable {
private static final long serialVersionUID = 1L;
private Integer total;
private Integer count;
private OpenidDataVO data;
private String next_openid;
private Integer errcode;
private String errmsg;
}
package com.yaoyaozw.customer.vo.wechat;
import lombok.Data;
import java.io.Serializable;
/**
* 公众号「全量用户」标签在库中的映射,用于批量打标签时获取微信标签 id
*
* @author wgh
* @since 2026-05-18
*/
@Data
public class AuthorizerFullUserTagVO implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 授权方主体 id(与 pending 账号列表中的 authId 一致)
*/
private Long authorizerId;
/**
* 微信侧标签 id,对应接口 batch_tagging 的 tagid
*/
private Integer wechatTagId;
/**
* 标签名称
*/
private String tagName;
}
package com.yaoyaozw.customer.vo.wechat;
import lombok.Data;
import java.io.Serializable;
import java.util.List;
/**
* openid数据
*/
@Data
public class OpenidDataVO implements Serializable {
private static final long serialVersionUID = 1L;
private List<String> openid;
}
package com.yaoyaozw.customer.vo.wechat;
import lombok.Data;
import java.io.Serializable;
/**
* 待处理公众号信息
*/
@Data
public class PendingAuthAccountVO implements Serializable {
private static final long serialVersionUID = 1L;
private Long authId;
private String appid;
private String nickName;
private String authorizerAccessToken;
}
package com.yaoyaozw.customer.vo.wechat;
import lombok.Data;
import java.io.Serializable;
/**
* 查询标签下用户列表响应
*/
@Data
public class TagUserListVO implements Serializable {
private static final long serialVersionUID = 1L;
private Integer count;
private OpenidDataVO data;
private String next_openid;
private Integer errcode;
private String errmsg;
}
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.yaoyaozw.customer.mapper.WeChatTagCleanMapper">
<select id="getPendingAuthAccounts" resultType="com.yaoyaozw.customer.vo.wechat.PendingAuthAccountVO">
SELECT
ai.id AS authId,
ai.appid AS appid,
ai.nick_name AS nickName,
atk.authorizer_access_token AS authorizerAccessToken
FROM authorizer_info ai
LEFT JOIN authorizer_token atk ON atk.authorizer_appid = ai.appid
WHERE ai.store_type IN (
'YANG_GUANG','TOMATO','YUE_WEN_1'
)
AND atk.create_time >= DATE_ADD(CURRENT_DATE,INTERVAL -1 DAY)
</select>
<select id="listFullUserWechatTagByAuthorizers"
resultType="com.yaoyaozw.customer.vo.wechat.AuthorizerFullUserTagVO">
SELECT
authorizer_id AS authorizerId,
wechat_tag_id AS wechatTagId,
tag_name AS tagName
FROM
`wechat_tag`
WHERE
tag_name IN
<foreach collection="tagNames" item="name" open="(" separator="," close=")">
#{name}
</foreach>
AND is_deleted = 0
AND authorizer_id IN
<foreach collection="authorizerIds" item="id" open="(" separator="," close=")">
#{id}
</foreach>
</select>
</mapper>
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论