避坑指南
1 @Async+@Transactional不能在同一个方法上注解使用,原因Spring实现AOP的方法则就是利用了动态代理机制,正因如此,才会导致某些情况下@Async
和@Transactional
不生效
比如下面的将事务事务控制
或者
正确使用
2 事务中不要过多的做耗时操作,实际情况通过业务并发要求+事务分离粒度大小决定
3 一般批量执行要么全部成功要么全部失败,因为每次new事务+提交会影响性能,当前是场景需要(业务并发量大的情况,请通过手动集合装载事务,一期提交),如下效率会高点
@Autowired
private IPeFinalCheckService finalCheckService; //另外一个sercice
@Autowired
public DataSourceTransactionManager dataSourceTransactionManager;
@Autowired
private DefaultTransactionDefinition def;
@Async("releaseBatchOrderExecutor")
@Override
public void testTranslation() {
List<HashMap<TransactionStatus, Boolean>> maps = Collections.synchronizedList(new ArrayList<HashMap<TransactionStatus, Boolean>>());
for (int i = 1; i <= 3; i++) {
try {
finalCheckService.testTranslation(i, maps,dataSourceTransactionManager,def);
} catch (Exception e) {
log.info("异常了:" + e.getMessage());
continue;
}
}
for (HashMap<TransactionStatus, Boolean> item : maps) {
for (Map.Entry<TransactionStatus, Boolean> entry : item.entrySet()) {
if (entry.getValue()) {//有异常
dataSourceTransactionManager.rollback(entry.getKey());
}else{
dataSourceTransactionManager.commit(entry.getKey());
}
}
}
}
@Override
public void testTranslation(int i, List<HashMap<TransactionStatus, Boolean>> maps, DataSourceTransactionManager dataSourceTransactionManager, DefaultTransactionDefinition def) {
HashMap<TransactionStatus,Boolean> map=new HashMap<>();
//事物隔离级别,开启新事务,这样会比较安全些。
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
//获得事务状态
TransactionStatus status = dataSourceTransactionManager.getTransaction(def);
try {
map.put(status,false);
log.info("当前编号:"+i);
PeFinalCheck finalCheck=new PeFinalCheck();
finalCheck.setPeNumber(""+i);
finalCheck.setRemarks("十五测试");
this.save(finalCheck);
if(i==1){
int s=4/0;
}
} catch (Exception e) {
try {
Thread.sleep(5000);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
map.put(status,true);
throw e;
}finally {
maps.add(map);
}
}
4 性能测试
循环1000遍执行
4.1 手动事务集合共耗时:
手动事务测试之前注意mysql设置的连接超时时间:我的是600s
不然不报错:com.alibaba.druid.pool.GetConnectionTimeoutException: wait millis 60000,因为多个事务集合执行完成登记一起提交
由于事务阻塞过多超时+死锁,手动多线程事务应用场景一定是在已知的不多的执行条件下配合同步计数器countdown使用,而循环基数动态多的情况下强烈禁止使用
4.2 @Transactional(propagation = Propagation.REQUIRES_NEW,rollbackFor = Exception.class)
耗时:41197ms
4.3 手动事务一个个提交
结论
#查看mysql的最大连接数
SHOW VARIABLES LIKE 'max_connections';
#查看服务器响应的最大连接数
SHOW GLOBAL STATUS LIKE 'Max_used_connections';
#推荐比例在15%,至少在10%以上,当前我的配置30/300=10%
Max_used_connections / max_connections * 100%=15
1 循环28次自动事务一个个提交耗时1230,而事务集合需要1524,手动事务更加耗时
2 duild配置maxactive大小由Max_used_connections来决定,默认配置20
3 如果使用手动同步计数器事务集合整体提交连接数不超过10个,超时时间不超过50秒
4 使用事务集合最大连接数不能超过maxactive-1否则死锁
源码粘贴:
/**
* 1 循环28次自动事务一个个提交耗时1230,而事务集合需要1524,手动事务更加耗时
* 2 duild配置maxactive大小由Max_used_connections来决定,默认配置20
* 3 如果使用手动同步计数器事务集合整体提交连接数不超过10个,超时时间不超过50秒
* 4 使用事务集合最大连接数不能超过maxactive-1否则死锁
*/
@Async("releaseBatchOrderExecutor")
@Override
public void testTranslation() {
StopWatch sw = new StopWatch();
sw.start();
List<HashMap<TransactionStatus, Boolean>> maps = Collections.synchronizedList(new ArrayList<HashMap<TransactionStatus, Boolean>>());
CountDownLatch countDownLatch = new CountDownLatch(28);
for (int i = 1; i <= 28; i++) {
try {
finalCheckService.testTranslation(i, maps, dataSourceTransactionManager, def, countDownLatch);
} catch (Exception e) {
log.info("异常了:" + e.getMessage());
continue;
}
}
// try {
// countDownLatch.await();
// for (HashMap<TransactionStatus, Boolean> item : maps) {
// for (Map.Entry<TransactionStatus, Boolean> entry : item.entrySet()) {
// if (entry.getValue()) {//有异常
// dataSourceTransactionManager.rollback(entry.getKey());
// } else {
// dataSourceTransactionManager.commit(entry.getKey());
// }
// }
// }
// } catch (Exception e) {
// for (HashMap<TransactionStatus, Boolean> item : maps) {
// for (Map.Entry<TransactionStatus, Boolean> entry : item.entrySet()) {
// dataSourceTransactionManager.rollback(entry.getKey());
// }
// }
// }
sw.stop();
System.out.println("耗时:" + sw.getTotalTimeMillis());
}
@Transactional(propagation = Propagation.REQUIRES_NEW,rollbackFor = Exception.class)
@Override
public void testTranslation(int i, List<HashMap<TransactionStatus, Boolean>> maps, DataSourceTransactionManager dataSourceTransactionManager, DefaultTransactionDefinition def
, CountDownLatch countDownLatch) {
// HashMap<TransactionStatus,Boolean> map=new HashMap<>();
// //事物隔离级别,开启新事务,这样会比较安全些。
// def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
// //获得事务状态
// TransactionStatus status = dataSourceTransactionManager.getTransaction(def);
try {
// map.put(status,false);
log.info("当前编号:"+i);
PeFinalCheck finalCheck=new PeFinalCheck();
finalCheck.setPeNumber(""+i);
finalCheck.setRemarks("十五测试");
finalCheck.setDelFlag(1);
this.save(finalCheck);
if(i==1){
int s=4/0;
}
// dataSourceTransactionManager.commit(status);
} catch (Exception e) {
// dataSourceTransactionManager.rollback(status);
// map.put(status,true);
throw e;
}finally {
// countDownLatch.countDown();
// maps.add(map);
}
}
package org.jeecg.modules.pUser.excutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @version 1.0
* @Author zhaozhiqiang
* @Date 2022/8/18 14:56
* @Description //TODO 职业病团队批量导入线程池配置
*/
@Slf4j
@Configuration
@EnableAsync// 启用异步任务 151-500
public class PeUserThreadPoolConfig implements AsyncConfigurer {
public static void main(String[] args) {
System.out.println(Runtime.getRuntime().availableProcessors());
}
/**
* @version 1.0
* @Author zhaozhiqiang
* @Date 2022/8/19 10:59
* @Description //TODO 团队批量预登记
*/
@Bean("teamImportExecutor")
public ThreadPoolTaskExecutor teamImportTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
// 设置核心线程数
threadPoolTaskExecutor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
// 设置最大线程数
threadPoolTaskExecutor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 2);
// 设置队列容量,当没有足够的线程去处理任务时,可以将任务放进队列中,以队列先进先出的特性来执行工作任务
threadPoolTaskExecutor.setQueueCapacity(5000);
// 设置线程活跃时间(秒)非核心线程数 还回去的时间 如果空闲超过10秒就被回收
threadPoolTaskExecutor.setKeepAliveSeconds(1);
// 设置默认线程名称
threadPoolTaskExecutor.setThreadNamePrefix("teamImport_");
//用来设置线程池关闭的时候等待所有任务都完成(可以设置时间)
threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
//设置上面的时间(秒)
threadPoolTaskExecutor.setAwaitTerminationSeconds(1);
//拒绝策略:线程池都忙不过来的时候,可以适当选择拒绝
/**
* ThreadPoolExecutor.AbortPolicy();//默认,队列满了丢任务抛出异常
* ThreadPoolExecutor.DiscardPolicy();//队列满了丢任务不异常
* ThreadPoolExecutor.DiscardOldestPolicy();//将最早进入队列的任务删,之后再尝试加入队列
* ThreadPoolExecutor.CallerRunsPolicy();//如果添加到线程池失败,那么主线程会自己去执行该任务
*/
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
/**
* @version 1.0
* @Author zhaozhiqiang
* @Date 2022/8/19 11:00
* @Description //TODO 批量发布申请单
*/
@Bean("releaseBatchOrderExecutor")
public ThreadPoolTaskExecutor releaseBatchOrderTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
// 设置核心线程数
threadPoolTaskExecutor.setCorePoolSize(2);
// 设置最大线程数
threadPoolTaskExecutor.setMaxPoolSize(4);
// 设置队列容量,当没有足够的线程去处理任务时,可以将任务放进队列中,以队列先进先出的特性来执行工作任务
threadPoolTaskExecutor.setQueueCapacity(30);
// 设置线程活跃时间(秒)非核心线程数 还回去的时间 如果空闲超过10秒就被回收
threadPoolTaskExecutor.setKeepAliveSeconds(1);
// 设置默认线程名称
threadPoolTaskExecutor.setThreadNamePrefix("releaseBatch_");
//用来设置线程池关闭的时候等待所有任务都完成(可以设置时间)
threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
//设置上面的时间(秒)
threadPoolTaskExecutor.setAwaitTerminationSeconds(60);
//拒绝策略:线程池都忙不过来的时候,可以适当选择拒绝
/**
* ThreadPoolExecutor.AbortPolicy();//默认,队列满了丢任务抛出异常
* ThreadPoolExecutor.DiscardPolicy();//队列满了丢任务不异常
* ThreadPoolExecutor.DiscardOldestPolicy();//将最早进入队列的任务删,之后再尝试加入队列
* ThreadPoolExecutor.CallerRunsPolicy();//如果添加到线程池失败,那么主线程会自己去执行该任务
*/
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
}