1.自定义事务管理器
@Component
public class SelfTransactionManager {
private TransactionStatus transactionStatus;
//获取事务源
@Autowired
private PlatformTransactionManager platformTransactionManager;
@Autowired
private TransactionDefinition transactionDefinition;
/**
* 手动开启事务
*/
public TransactionStatus begin() {
transactionStatus = platformTransactionManager.getTransaction(transactionDefinition);
return transactionStatus;
}
/**
* 提交事务
*/
public void commit(TransactionStatus transactionStatus) {
platformTransactionManager.commit(transactionStatus);
}
/**
* 回滚事务
*/
public void rollBack() {
platformTransactionManager.rollback(transactionStatus);
}
}
2.业务中实现
@Service
@AllArgsConstructor
public class BookingInfoServiceImpl implements IBookingInfoService {
private final BookingInfoMapper bookingInfoMapper;
private final SelfTransactionManager selfTransactionManager;
private final TaskExecutor taskExecutor;//自定义的线程池
@Override
@Transactional(rollbackFor=Exception.class)
public boolean test() {
// 主线程决策状态(决策最终是commit还是rollback)
AtomicBoolean IS_OK = new AtomicBoolean(true);
// 假设子线程数量为1个(根据自己的业务需求)
final int size = 1;
// 子线程计数器
CountDownLatch childMonitor = new CountDownLatch(size);
// 子线程结果集
CopyOnWriteArrayList<Boolean> childResponse = new CopyOnWriteArrayList<>();
// 主线程计数器
CountDownLatch mainMonitor = new CountDownLatch(1);
/**
* ================================子线程1=======================================
*/
CompletableFuture.runAsync(() -> {
TransactionStatus transactionStatus = selfTransactionManager.begin();
boolean result = false;
try {
// 当前子线程开始插入数据
// ---------业务代码----------
BookingInfo bookingInfo = new BookingInfo();
bookingInfo.setCreateTime(DateUtil.date());
bookingInfo.setPhoneNumber("13215592666");
bookingInfo.setNickName("ceshi0");
result = bookingInfoMapper.insertBookingInfo(bookingInfo) < 1 ? false : true;
// ---------业务代码----------
/ /插入结果 true or false
childResponse.add(result);
// 执行到这里,等待主线程执行完,因为当前子线程要拿到 IS_OK 的结果,IS_OK的结果由主线程决策
mainMonitor.await();
// 根据主线程决策的IS_OK,判断是回滚还是提交
if (IS_OK.get()) {
// 提交
selfTransactionManager.commit(transactionStatus);
}else {
// 回滚
selfTransactionManager.rollBack();
}
} catch (Exception e) {
e.printStackTrace();
// 因为会发生异常 而主线程一直在 while (true) 循环 childResponse的size,这里一定要保证进行add,否则可能会出现死循环
childResponse.add(result);
// 异常回滚
selfTransactionManager.rollBack();
} finally {
// 子线程执行完计数器-1
childMonitor.countDown();
}
}, taskExecutor);
/**
* ================================主线程========================================
*/
try {
// ---------业务代码----------
BookingInfo bookingInfo2 = new BookingInfo();
bookingInfo2.setPhoneNumber("13215592000");
bookingInfo2.setNickName("ceshi2");
bookingInfo2.setCreateTime(new Date());
boolean result = bookingInfoMapper.insertBookingInfo(bookingInfo2) < 1 ? false : true;
// ---------业务代码----------
// 主线程插入不成功,不成功让 IS_OK = false;
if (!result) {
throw new RuntimeException();
}
// 循环获取子线程结果
while (true) {
if (childResponse.size() == size) {
break;
}
}
// 子线程中存在不成功让 IS_OK = false;
if (childResponse.contains(Boolean.FALSE)) {
throw new RuntimeException();
}
} catch (Exception e) {
e.printStackTrace();
// IS_OK = false; 等待主线程走完子线程就能得知 IS_OK 为false
IS_OK.set(Boolean.FALSE);
// 这里抛出异常 是为了当前主线程事务自动进行回滚
throw e;
} finally {
// 到这个方法执行完就意味着mainMonitor中主线程走完了,接着子线程会开始执行
mainMonitor.countDown();
}
return true;
}
}