一. 需求:
统计近200万商家数据, 每日通过查询计费系统更新其余额
二. 技术栈:
抛开分布式定时任务系统Elastic-Job之外, 我们先优先把单系统极限优化挖掘出来, 由于博主接手的是14年的老项目, 本地甚至都无法启动,所以不尝试用高级玩法(数据分片, 消息中间件).
1. Java1.7
2. Mysql数据库5.7
三. 思路分析:
流程如下
1: 扫表
几百万数据表的扫表面临的问题很简单, 当我们分页查询到百万级别以上(甚至几十万)的时候limit 800000, 10 都需要几十秒.测试数据如下
select * from product limit 10, 20 0.016秒
select * from product limit 100, 20 0.016秒
select * from product limit 1000, 20 0.047秒
select * from product limit 10000, 20 0.094秒
//我们已经看出随着起始记录的增加,时间也随着增大, 这说明分页语句limit跟起始页码是有很大关系的,那么我们把起始记录改为40w看下(也就是记录的一般左右)
select * from product limit 400000, 20 3.229秒
//再看我们取最后一页记录的时间
select * from product limit 866613, 20 37.44秒
解决方案: 采用覆盖索引的方式快速查询, 并且查询结果只需要自己所需的字段,每次查询100个
SELECT id, user_id,user_name
FROM product
WHERE ID > =(select id from product ORDER BY id ASC limit 866613, 1)
limit 100
大家对大数据量的分页查询可以看下博主这篇文章: https://rourou.blog.csdn.net/article/details/110040284
2. 多线程数据处理
我们数据获取到了, 每次扫表就会获得100条数据, 我们如果串行处理效率比较低下,所以这边博主选择的是多线程处理;
多线程处理必须用到线程池(一些runnable, 和new Thread() 不是很好, 因为频繁创建线程销毁线程也是很浪费时间和资源的),
关于线程池的文章, 博主推荐自己写的工具类,非常好用: https://rourou.blog.csdn.net/article/details/81979295
难点一: 线程池参数配置
楼主创建的线程池如下:
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 10,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(2000), new XXXThreadFactory("countAllSeller"), new ThreadPoolExecutor.CallerRunsPolicy());
线程数其实可以高点,毕竟是IO密集型运算, 但是考虑到计费系统的压力, 所以这边是10; 线程队列为2000, 防止过多创建线程导致OOM(其实不可能的看后续);
难点二: 不能一次性创建过多的对象, 由于是多线程,如果没有收口, 一次性创建太多对象会导致OOM
这边博主用到的是CountDownLauch, 计数就是扫出来的个数, 每一次统计完毕计数一次. 这一轮的100个数据处理完后释放await;下面是楼主代码,也是核心代码
for (int i = 0;i < sellerPageInfo.getPages() ; i++) {
List<XXXSellerSimpleDO > xxxSellerDOS= xxxxSellerConfigDAO.findAllSimpleOptimize( 100,i * 100 );
// 批次结束完才能进行下一波
final CountDownLatch countDownLatch = new CountDownLatch(xxxSellerConfigDOS.size());
for (final XXXSellerSimpleDO xxxxSellerConfigDO : xxxSellerDOS) {
ThreadPoolUtil.execute(new Runnable() {
@Override
public void run() {
try {
// 单个处理
countSellerBalance(userId, userName);
} catch (ServiceException e) {
// 异常直接吞掉
serviceLogger.error(String.format("统计指定商家余额发生异常, userId{%s},userName{%s}", userId, userName), e);
}
// 计数
countDownLatch.countDown();
}
});
}
if (i % 50 == 0){
serviceLogger.info(String.format("统计指定商家余额进度{%s/%s}", i * 100, totalCount));
}
countDownLatch.await();
}
至此: 代码结束,如有其它问题, 可以给博主留言, 博主几乎每天都在线