提交 7df1b8a2 作者: 沈振路

刷公众号用户标签的测试类接口改为多线程

上级 f430e24b
...@@ -51,6 +51,7 @@ import java.util.Objects; ...@@ -51,6 +51,7 @@ import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
...@@ -355,6 +356,11 @@ public class TestController { ...@@ -355,6 +356,11 @@ public class TestController {
private static final String TAG_NAME_ALL_USER = "全量用户"; private static final String TAG_NAME_ALL_USER = "全量用户";
private static final String TAG_NAME_MINI_PROGRAM_USER = "小程序用户"; private static final String TAG_NAME_MINI_PROGRAM_USER = "小程序用户";
private static final int BATCH_TAGGING_SIZE = 50; private static final int BATCH_TAGGING_SIZE = 50;
private static final int ACCOUNT_BATCH_SIZE = 100;
private static final ThreadPoolExecutor THREAD_POOL = new ThreadPoolExecutor(
3, 3, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new ThreadPoolExecutor.CallerRunsPolicy());
@Resource @Resource
private WeChatTagCleanMapper weChatTagCleanMapper; private WeChatTagCleanMapper weChatTagCleanMapper;
...@@ -365,18 +371,14 @@ public class TestController { ...@@ -365,18 +371,14 @@ public class TestController {
public void refreshFullUserTag() { public void refreshFullUserTag() {
log.info("开始为公众号用户打全量用户标签..."); log.info("开始为公众号用户打全量用户标签...");
int startIdx = 0;
Object val = redisTemplate.opsForValue().get("operate:customer:refreshWechatTag:startIdx");
if (val != null) {
startIdx = Integer.parseInt(String.valueOf(val));
}
List<PendingAuthAccountVO> accounts = weChatTagCleanMapper.getPendingAuthAccounts(); List<PendingAuthAccountVO> accounts = weChatTagCleanMapper.getPendingAuthAccounts();
accounts = accounts.subList(Math.min(startIdx, accounts.size()), accounts.size());
log.info("待处理公众号数量: {}", accounts.size()); log.info("待处理公众号数量: {}", accounts.size());
Map<Long, Integer> fullUserWechatTagIdByAuthorizer = Collections.emptyMap(); if (accounts.isEmpty()) {
Map<Long, Integer> miniProgramUserWechatTagIdByAuthorizer = Collections.emptyMap(); log.info("没有待处理的公众号");
return;
}
List<Long> authorizerIdsToQuery = accounts.stream() List<Long> authorizerIdsToQuery = accounts.stream()
.map(PendingAuthAccountVO::getAuthId) .map(PendingAuthAccountVO::getAuthId)
...@@ -384,106 +386,144 @@ public class TestController { ...@@ -384,106 +386,144 @@ public class TestController {
.distinct() .distinct()
.collect(Collectors.toList()); .collect(Collectors.toList());
if (!authorizerIdsToQuery.isEmpty()) { if (authorizerIdsToQuery.isEmpty()) {
List<AuthorizerFullUserTagVO> dbTagRows = return;
weChatTagCleanMapper.listFullUserWechatTagByAuthorizers(
authorizerIdsToQuery,
Arrays.asList(TAG_NAME_ALL_USER, TAG_NAME_MINI_PROGRAM_USER));
Map<String, Map<Long, Integer>> tagMapByTagName = dbTagRows.stream()
.filter(row -> row.getAuthorizerId() != null
&& row.getWechatTagId() != null
&& row.getTagName() != null)
.collect(Collectors.groupingBy(
AuthorizerFullUserTagVO::getTagName,
Collectors.toMap(
AuthorizerFullUserTagVO::getAuthorizerId,
AuthorizerFullUserTagVO::getWechatTagId,
(left, right) -> left)));
fullUserWechatTagIdByAuthorizer =
tagMapByTagName.getOrDefault(TAG_NAME_ALL_USER, Collections.emptyMap());
miniProgramUserWechatTagIdByAuthorizer =
tagMapByTagName.getOrDefault(TAG_NAME_MINI_PROGRAM_USER, Collections.emptyMap());
log.info("从库中载入标签映射: 「{}」{} 条, 「{}」{} 条 (授权方候选数: {})",
TAG_NAME_ALL_USER,
fullUserWechatTagIdByAuthorizer.size(),
TAG_NAME_MINI_PROGRAM_USER,
miniProgramUserWechatTagIdByAuthorizer.size(),
authorizerIdsToQuery.size());
} }
List<AuthorizerFullUserTagVO> dbTagRows =
weChatTagCleanMapper.listFullUserWechatTagByAuthorizers(
authorizerIdsToQuery,
Arrays.asList(TAG_NAME_ALL_USER, TAG_NAME_MINI_PROGRAM_USER));
Map<String, Map<Long, Integer>> tagMapByTagName = dbTagRows.stream()
.filter(row -> row.getAuthorizerId() != null
&& row.getWechatTagId() != null
&& row.getTagName() != null)
.collect(Collectors.groupingBy(
AuthorizerFullUserTagVO::getTagName,
Collectors.toMap(
AuthorizerFullUserTagVO::getAuthorizerId,
AuthorizerFullUserTagVO::getWechatTagId,
(left, right) -> left)));
Map<Long, Integer> fullUserWechatTagIdByAuthorizer =
tagMapByTagName.getOrDefault(TAG_NAME_ALL_USER, Collections.emptyMap());
Map<Long, Integer> miniProgramUserWechatTagIdByAuthorizer =
tagMapByTagName.getOrDefault(TAG_NAME_MINI_PROGRAM_USER, Collections.emptyMap());
log.info("从库中载入标签映射: 「{}」{} 条, 「{}」{} 条 (授权方候选数: {})",
TAG_NAME_ALL_USER,
fullUserWechatTagIdByAuthorizer.size(),
TAG_NAME_MINI_PROGRAM_USER,
miniProgramUserWechatTagIdByAuthorizer.size(),
authorizerIdsToQuery.size());
List<List<PendingAuthAccountVO>> accountBatches = Lists.partition(accounts, ACCOUNT_BATCH_SIZE);
int batchIdx = 0;
for (List<PendingAuthAccountVO> batch : accountBatches) {
batchIdx++;
log.info("开始处理第 {} / {} 组, 本组公众号数量: {}", batchIdx, accountBatches.size(), batch.size());
List<String> batchAppids = batch.stream()
.map(PendingAuthAccountVO::getAppid)
.filter(Objects::nonNull)
.collect(Collectors.toList());
Map<String, String> tokenMap;
if (!batchAppids.isEmpty()) {
List<PendingAuthAccountVO> tokenResults = weChatTagCleanMapper.getTokensByAppids(batchAppids);
tokenMap = tokenResults.stream()
.filter(t -> t.getAppid() != null && t.getAuthorizerAccessToken() != null)
.collect(Collectors.toMap(
PendingAuthAccountVO::getAppid,
PendingAuthAccountVO::getAuthorizerAccessToken,
(a, b) -> a));
} else {
tokenMap = Collections.emptyMap();
}
log.info("第 {} 组实时查询 token 结果: {} 条", batchIdx, tokenMap.size());
List<Future<?>> futures = new ArrayList<>();
for (PendingAuthAccountVO account : batch) {
futures.add(THREAD_POOL.submit(() -> {
String appid = account.getAppid();
String nickName = account.getNickName();
String accessToken = tokenMap.get(appid);
Long authId = account.getAuthId();
if (StringUtils.isAnyBlank(appid, accessToken)) {
log.warn("公众号信息缺失, 跳过: appid={}, nickName={}", appid, nickName);
return;
}
if (authId == null) {
log.warn("公众号缺少 authId(authorizer_id), 跳过: appid={}, nickName={}", appid, nickName);
return;
}
int idx = 1; try {
for (PendingAuthAccountVO account : accounts) { Integer fullUserTagId = fullUserWechatTagIdByAuthorizer.get(authId);
String appid = account.getAppid(); if (fullUserTagId == null) {
String nickName = account.getNickName(); log.warn("公众号 {}({}) 在库中未找到 [{}] 标签记录, 跳过", nickName, appid,
String accessToken = account.getAuthorizerAccessToken(); TAG_NAME_ALL_USER);
Long authId = account.getAuthId(); return;
}
if (StringUtils.isAnyBlank(appid, accessToken)) { Integer miniProgramUserTagId = miniProgramUserWechatTagIdByAuthorizer.get(authId);
log.warn("公众号信息缺失, 跳过: appid={}, nickName={}", appid, nickName);
continue;
}
if (authId == null) { Set<String> fullUserTagUserIds = fetchAllOpenidsByTag(accessToken, fullUserTagId, nickName);
log.warn("公众号缺少 authId(authorizer_id), 跳过: appid={}, nickName={}", appid, nickName); Set<String> miniProgramUserTagUserIds = miniProgramUserTagId != null
continue; ? fetchAllOpenidsByTag(accessToken, miniProgramUserTagId, nickName)
} : Collections.emptySet();
log.info("刷新公众号用户标签进度:{} / {},当前公众号:{}", idx++, accounts.size(), account.getNickName()); Set<String> taggedUnionOpenids = new HashSet<>(fullUserTagUserIds);
try { taggedUnionOpenids.addAll(miniProgramUserTagUserIds);
Integer fullUserTagId = fullUserWechatTagIdByAuthorizer.get(authId);
if (fullUserTagId == null) {
log.warn("公众号 {}({}) 在库中未找到 [{}] 标签记录, 跳过", nickName, appid,
TAG_NAME_ALL_USER);
continue;
}
Integer miniProgramUserTagId = miniProgramUserWechatTagIdByAuthorizer.get(authId); List<String> allUserIds = fetchAllOpenids(accessToken, nickName);
Set<String> fullUserTagUserIds = fetchAllOpenidsByTag(accessToken, fullUserTagId, nickName); List<String> needTagOpenidList = allUserIds.stream()
Set<String> miniProgramUserTagUserIds = miniProgramUserTagId != null .filter(openid -> !taggedUnionOpenids.contains(openid))
? fetchAllOpenidsByTag(accessToken, miniProgramUserTagId, nickName) .collect(Collectors.toList());
: Collections.emptySet();
Set<String> taggedUnionOpenids = new HashSet<>(fullUserTagUserIds); if (needTagOpenidList.isEmpty()) {
taggedUnionOpenids.addAll(miniProgramUserTagUserIds); return;
}
List<String> allUserIds = fetchAllOpenids(accessToken, nickName); List<List<String>> partitions = Lists.partition(needTagOpenidList, BATCH_TAGGING_SIZE);
int updatedCount = 0;
for (List<String> partition : partitions) {
BatchTaggingDTO body = new BatchTaggingDTO();
body.setOpenid_list(partition);
body.setTagid(fullUserTagId);
List<String> needTagOpenidList = allUserIds.stream() WeChatResponseEntity resp = weChatTagRest.batchTagging(accessToken, body);
.filter(openid -> !taggedUnionOpenids.contains(openid)) if (resp != null && (resp.getErrcode() == null || resp.getErrcode() == 0)) {
.collect(Collectors.toList()); updatedCount += partition.size();
} else {
log.warn("公众号 {}({}) 批量打标签失败: errcode={}, errmsg={}",
nickName, appid,
resp == null ? null : resp.getErrcode(),
resp == null ? null : resp.getErrmsg());
}
}
if (needTagOpenidList.isEmpty()) { log.info("公众号 appid={}, 名称={}, 此次更新标签的用户数={}, 目标标签id={}",
continue; appid, nickName, updatedCount, fullUserTagId);
}
List<List<String>> partitions = Lists.partition(needTagOpenidList, BATCH_TAGGING_SIZE); } catch (Exception e) {
int updatedCount = 0; log.error("处理公众号 {}({}) 时发生错误: {}", nickName, appid, e.getMessage(), e);
for (List<String> partition : partitions) {
BatchTaggingDTO body = new BatchTaggingDTO();
body.setOpenid_list(partition);
body.setTagid(fullUserTagId);
WeChatResponseEntity resp = weChatTagRest.batchTagging(accessToken, body);
if (resp != null && (resp.getErrcode() == null || resp.getErrcode() == 0)) {
updatedCount += partition.size();
} else {
log.warn("公众号 {}({}) 批量打标签失败: errcode={}, errmsg={}",
nickName, appid,
resp == null ? null : resp.getErrcode(),
resp == null ? null : resp.getErrmsg());
} }
} }));
}
log.info("公众号 appid={}, 名称={}, 此次更新标签的用户数={}, 目标标签id={}", for (Future<?> future : futures) {
appid, nickName, updatedCount, fullUserTagId); try {
TimeUnit.MILLISECONDS.sleep(500); future.get();
} catch (Exception e) { } catch (Exception e) {
log.error("处理公众号 {}({}) 时发生错误: {}", nickName, appid, e.getMessage(), e); log.error("等待线程执行完成时发生错误: {}", e.getMessage(), e);
}
} }
log.info("第 {} / {} 组处理完成", batchIdx, accountBatches.size());
} }
log.info("公众号用户全量用户标签清洗完成"); log.info("公众号用户全量用户标签清洗完成");
......
...@@ -32,5 +32,9 @@ public interface WeChatTagCleanMapper { ...@@ -32,5 +32,9 @@ public interface WeChatTagCleanMapper {
@Param("authorizerIds") List<Long> authorizerIds, @Param("authorizerIds") List<Long> authorizerIds,
@Param("tagNames") List<String> tagNames); @Param("tagNames") List<String> tagNames);
/**
* 根据 appid 列表实时查询 access_token
*/
List<PendingAuthAccountVO> getTokensByAppids(@Param("appids") List<String> appids);
} }
...@@ -4,17 +4,25 @@ ...@@ -4,17 +4,25 @@
<select id="getPendingAuthAccounts" resultType="com.yaoyaozw.customer.vo.wechat.PendingAuthAccountVO"> <select id="getPendingAuthAccounts" resultType="com.yaoyaozw.customer.vo.wechat.PendingAuthAccountVO">
SELECT SELECT
ai.id AS authId, ai.id AS authId,
ai.appid AS appid, ai.appid AS appid,
ai.nick_name AS nickName, ai.nick_name AS nickName
atk.authorizer_access_token AS authorizerAccessToken
FROM authorizer_info ai FROM authorizer_info ai
LEFT JOIN authorizer_token atk ON atk.authorizer_appid = ai.appid
WHERE ai.store_type IN ( WHERE ai.store_type IN (
'YANG_GUANG','TOMATO','YUE_WEN_1' 'YANG_GUANG','TOMATO','YUE_WEN_1'
) )
AND atk.create_time >= DATE_ADD(CURRENT_DATE,INTERVAL -1 DAY) </select>
<select id="getTokensByAppids" resultType="com.yaoyaozw.customer.vo.wechat.PendingAuthAccountVO">
SELECT
ai.appid AS appid,
atk.authorizer_access_token AS authorizerAccessToken
FROM authorizer_info ai
LEFT JOIN authorizer_token atk ON atk.authorizer_appid = ai.appid
WHERE ai.appid IN
<foreach collection="appids" item="appid" open="(" separator="," close=")">
#{appid}
</foreach>
</select> </select>
<select id="listFullUserWechatTagByAuthorizers" <select id="listFullUserWechatTagByAuthorizers"
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论