提交 b7825d76 作者: 沈振路

刷新用户人群包的测试类接口

上级 4b5be35b
package com.yaoyaozw.customer.controller; package com.yaoyaozw.customer.controller;
import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil; import cn.hutool.json.JSONUtil;
import com.google.common.collect.Lists;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.yaoyaozw.customer.components.CustomerServiceCommonAsyncComponent; import com.yaoyaozw.customer.components.CustomerServiceCommonAsyncComponent;
import com.yaoyaozw.customer.constants.CustomerMaterialConstant; import com.yaoyaozw.customer.constants.CustomerMaterialConstant;
import com.yaoyaozw.customer.dto.integration.IntegrationRequestDTO; import com.yaoyaozw.customer.dto.integration.IntegrationRequestDTO;
import com.yaoyaozw.customer.entity.CustomerFollowReply; import com.yaoyaozw.customer.entity.CustomerFollowReply;
import com.yaoyaozw.customer.entity.CustomerGraphics; import com.yaoyaozw.customer.entity.CustomerGraphics;
import com.yaoyaozw.customer.entity.CrowdPackage;
import com.yaoyaozw.customer.entity.RegisterUserEntity;
import com.yaoyaozw.customer.mapper.KanbanCommonMapper; import com.yaoyaozw.customer.mapper.KanbanCommonMapper;
import com.yaoyaozw.customer.publisher.AcquisitionExceptionEventPublisher; import com.yaoyaozw.customer.publisher.AcquisitionExceptionEventPublisher;
import com.yaoyaozw.customer.service.*; import com.yaoyaozw.customer.service.*;
...@@ -16,6 +20,7 @@ import com.yaoyaozw.customer.vo.AuthInfoVO; ...@@ -16,6 +20,7 @@ import com.yaoyaozw.customer.vo.AuthInfoVO;
import com.yaoyaozw.customer.vo.customer.CrowdPackageUserVO; import com.yaoyaozw.customer.vo.customer.CrowdPackageUserVO;
import com.yaoyaozw.customer.vo.referral.ReferralEntityVo; import com.yaoyaozw.customer.vo.referral.ReferralEntityVo;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.RedisTemplate;
...@@ -26,7 +31,12 @@ import org.springframework.web.bind.annotation.RestController; ...@@ -26,7 +31,12 @@ import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
/** /**
* @author wgh * @author wgh
...@@ -180,4 +190,148 @@ public class TestController { ...@@ -180,4 +190,148 @@ public class TestController {
log.info("刷新客服消息送达人数完成..."); log.info("刷新客服消息送达人数完成...");
} }
@Autowired
private CrowdPackageService crowdPackageService;
@GetMapping("/cleanupUserPackages")
public void cleanupUserPackages() {
log.info("开始清理用户人群包数据...");
try {
// 1、查询当前数据库中的人群包数据
List<CrowdPackage> allPackages = crowdPackageService.list();
Set<Long> validPackageIds = allPackages.stream().map(CrowdPackage::getId).collect(Collectors.toSet());
log.info("当前有效人群包数量:{}", validPackageIds.size());
// 2、查询当前数据库中微信用户列表:is_deleted = 0、in_package非空
QueryWrapper<RegisterUserEntity> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("is_deleted", 0)
.isNotNull("in_package")
.ne("in_package", "");
List<RegisterUserEntity> userList = registerUserEntityService.list(queryWrapper);
log.info("待处理用户数量:{}", userList.size());
if (userList.isEmpty()) {
log.info("没有需要处理的用户数据");
return;
}
// 创建线程池,最多5个线程
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(5);
executor.setQueueCapacity(100);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("PackageCleanup-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
// 将用户列表分组,使用Lists.partition方法分组
int batchSize = Math.max(1, userList.size() / 5);
List<List<RegisterUserEntity>> userBatches = Lists.partition(userList, batchSize);
// 用于收集所有处理结果的并发安全Map
ConcurrentHashMap<String, List<Long>> groupedResults = new ConcurrentHashMap<>();
// 提交任务到线程池
List<Future<Void>> futures = new ArrayList<>();
for (List<RegisterUserEntity> batch : userBatches) {
Future<Void> future = executor.submit(() -> {
log.info("线程 {} 开始处理 {} 个用户", Thread.currentThread().getName(), batch.size());
for (RegisterUserEntity user : batch) {
try {
// 3、转换每个用户的in_package值为Long列表
String inPackage = user.getInPackage();
if (StrUtil.isBlank(inPackage)) {
continue;
}
List<Long> packageIds = Arrays.stream(inPackage.split(","))
.filter(StrUtil::isNotEmpty)
.map(String::trim)
.filter(s -> s.matches("\\d+"))
.map(Long::valueOf)
.collect(Collectors.toList());
// 4、移除不在有效人群包列表中的值
List<Long> validIds = packageIds.stream()
.filter(validPackageIds::contains)
.sorted()
.collect(Collectors.toList());
// 5、生成新的in_package字符串
String newInPackage;
if (validIds.isEmpty()) {
newInPackage = "";
} else {
newInPackage = validIds.stream()
.map(String::valueOf)
.collect(Collectors.joining(","));
}
// 6、按照in_package分组
synchronized (groupedResults) {
groupedResults.computeIfAbsent(newInPackage, k -> new ArrayList<>()).add(user.getId());
}
} catch (Exception e) {
log.error("处理用户 {} 时发生错误: {}", user.getId(), e.getMessage(), e);
}
}
log.info("线程 {} 处理完成", Thread.currentThread().getName());
return null;
});
futures.add(future);
}
// 等待所有任务完成
for (Future<Void> future : futures) {
try {
future.get();
} catch (Exception e) {
log.error("等待线程执行完成时发生错误: {}", e.getMessage(), e);
}
}
executor.shutdown();
// 7、遍历Map,批量更新user_register_entity的in_package字段
log.info("开始批量更新数据库,共 {} 个分组", groupedResults.size());
for (Map.Entry<String, List<Long>> entry : groupedResults.entrySet()) {
String newInPackage = entry.getKey();
List<Long> userIds = entry.getValue();
try {
// 分批更新,避免单次更新数据过多
int updateBatchSize = 1000;
List<List<Long>> batchIdsList = Lists.partition(userIds, updateBatchSize);
for (List<Long> batchIds : batchIdsList) {
// 构造更新实体
RegisterUserEntity updateEntity = new RegisterUserEntity();
updateEntity.setInPackage(newInPackage);
// 构造查询条件:id in (batchIds)
QueryWrapper<RegisterUserEntity> updateWrapper = new QueryWrapper<>();
updateWrapper.in("id", batchIds);
registerUserEntityService.update(updateEntity, updateWrapper);
log.info("更新了 {} 个用户的in_package为: '{}'", batchIds.size(), newInPackage);
TimeUnit.MILLISECONDS.sleep(500);
}
} catch (Exception e) {
log.error("批量更新in_package为 '{}' 的用户时发生错误: {}", newInPackage, e.getMessage(), e);
}
}
log.info("用户人群包数据清理完成!");
} catch (Exception e) {
log.error("清理用户人群包数据时发生错误: {}", e.getMessage(), e);
}
}
} }
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论