- 使用编程式事务手动管理多线程事务的生命周期,通过原子类 + CountDown去控制多线程事务的全局提交或回滚
public class ThreadTransactionUtil {
// 事务管理
private DataSourceTransactionManager transactionManager;
// 多线程执行任务列表
private final List<Handle> handleList = new ArrayList<>();
// 隔离级别
private int isolationLevel = TransactionDefinition.ISOLATION_DEFAULT;
// 传播行为
private int propagationBehavior = TransactionDefinition.PROPAGATION_REQUIRED;
/**
* 示例
*/
public static void main(String[] args) {
Object o = new Object();
boolean isCommit = ThreadTransactionUtil.builder().add(o::toString).add(o::toString).begin();
}
/**
* 构建
*/
public static ThreadTransactionUtil builder() {
ThreadTransactionUtil util = new ThreadTransactionUtil();
util.transactionManager = SpringUtils.getBean(DataSourceTransactionManager.class);
// TODO 多数据源
// util.transactionManager = new DataSourceTransactionManager(ds);
return util;
}
/**
* 添加事务
*/
public ThreadTransactionUtil add(Handle s) {
handleList.add(s);
return this;
}
/**
* 开始执行
*/
public boolean begin() {
// 线程共享状态
AtomicBoolean isCommit = new AtomicBoolean(true);
// 计数器
CountDownLatch cd = new CountDownLatch(handleList.size());
// 分配线程执行
handleList.forEach(l -> CompletableFuture.runAsync(() -> {
// 开启事务
TransactionStatus ts = beginTransaction();
// 执行自定义的逻辑
try {
l.handle();
} catch (Exception e) {
isCommit.set(false);
log.error("线程事务出现异常",e);
}
// 等待计数器归零,不用await方法是因为如果出现回滚通知就算计数器没有归零也无需占用线程资源了,直接回滚
cd.countDown();
while (cd.getCount() > 0 && isCommit.get()) {
sleep(50);
}
// 判断回滚还是提交
if (isCommit.get()) {
transactionManager.commit(ts);
} else {
transactionManager.rollback(ts);
}
}));
// 等待所有线程执行结束 TODO 超时等待抛出异常测试
try {
cd.await();
} catch (InterruptedException e) {
e.printStackTrace();
throw new RuntimeException("多线程事务线程等待出现异常");
}
return isCommit.get();
}
/* ----------------------------------- setter --------------------------------- */
public ThreadTransactionUtil isolationLevel(int isolationLevel) {
this.isolationLevel = isolationLevel;
return this;
}
public ThreadTransactionUtil propagationBehavior(int propagationBehavior) {
this.propagationBehavior = propagationBehavior;
return this;
}
/* ----------------------------------- private --------------------------------- */
/**
* 手动开启一个事务
*/
private TransactionStatus beginTransaction() {
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
// 事物隔离级别
def.setIsolationLevel(isolationLevel);
// 事务传播行为
def.setPropagationBehavior(propagationBehavior);
// 将拿到的事务加入到事务池并返回
return transactionManager.getTransaction(def);
}
/**
* sleep
*/
private void sleep(long mill) {
try {
Thread.sleep(mill);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
/**
* 自定义的lambda
*/
public interface Handle {
void handle();
}
}