多线程事务的实现

2023-11-18

为了提高效率,在批量执行SQL时,可以采用多线程并发执行的方式。每个线程在执行完SQL后,暂时不提交事务,而是等待所有线程的SQL执行成功后,一起进行提交。如果其中任何一个线程执行失败,则所有线程都会回滚。

一、springboot多线程(声明式)的使用方法

1、springboot提供了注解@Async来使用线程池,具体使用方法如下:

(1) 在启动类(配置类)添加@EnableAsync来开启线程池

(2) 在需要开启子线程的方法上添加注解@Async

所以使用时需要配置自定义线程池,如下:

@Configuration
@EnableAsync
public class ThreadPoolTaskConfig {
 
    @Bean("threadPoolTaskExecutor")//自定义线程池名称
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
 
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
 
        //线程池创建的核心线程数,线程池维护线程的最少数量,即使没有任务需要执行,也会一直存活
        executor.setCorePoolSize(16);
 
        //如果设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭
        //executor.setAllowCoreThreadTimeOut(true);
 
        //阻塞队列 当核心线程数达到最大时,新任务会放在队列中排队等待执行
        executor.setQueueCapacity(124);
 
        //最大线程池数量,当线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务
        //任务队列已满时, 且当线程数=maxPoolSize,,线程池会拒绝处理任务而抛出异常
        executor.setMaxPoolSize(64);
 
        //当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize
        //允许线程空闲时间30秒,当maxPoolSize的线程在空闲时间到达的时候销毁
        //如果allowCoreThreadTimeout=true,则会直到线程数量=0
        executor.setKeepAliveSeconds(30);
 
        //spring 提供的 ThreadPoolTaskExecutor 线程池,是有setThreadNamePrefix() 方法的。
        //jdk 提供的ThreadPoolExecutor 线程池是没有 setThreadNamePrefix() 方法的
        executor.setThreadNamePrefix("自定义线程池-");
 
        // rejection-policy:拒绝策略:当线程数已经达到maxSize的时候,如何处理新任务
        // CallerRunsPolicy():交由调用方线程运行,比如 main 线程;如果添加到线程池失败,那么主线程会自己去执行该任务,不会等待线程池中的线程去执行, (个人推荐)
        // AbortPolicy():该策略是线程池的默认策略,如果线程池队列满了丢掉这个任务并且抛出RejectedExecutionException异常。
        // DiscardPolicy():如果线程池队列满了,会直接丢掉这个任务并且不会有任何异常
        // DiscardOldestPolicy():丢弃队列中最老的任务,队列满了,会将最早进入队列的任务删掉腾出空间,再尝试加入队列
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
 
        //设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean,这样这些异步任务的销毁就会先于Redis线程池的销毁
  executor.setWaitForTasksToCompleteOnShutdown(true);
  //设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住。
  executor.setAwaitTerminationSeconds(60);
 
        executor.initialize();
        return executor;
    }
}

开启子线程方法: 在需要开启线程的方法上添加 注解@Async("threadPoolTaskExecutor")即可,其中注解中的参数为自定义线程池的名称。

二、自定义注解实现多线程事务控制

自定义注解

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
 
/**
 * 多线程事务注解: 主事务
 */
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface MainTransaction {
 int value();//子线程数量
}
package com.example.anno;
 
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
 
/**
 * 多线程事务注解: 子事务
 */
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface SonTransaction {
    String value() default "";
}

解释:

两个注解都是用在方法上的,须配合@Transactional(rollbackFor = Exception.class)一起使用

@MainTransaction注解 用在调用方,其参数为必填,参数值为本方法中调用的方法开启的线程数,如:在这个方法中调用的方法中有2个方法用@Async注解开启了子线程,则参数为@MainTransaction(2),另外如果未使用@MainTransaction注解,则直接已无多线程事务执行(不影响方法的单线程事务)

@SonTransaction注解 用在被调用方(开启线程的方法),无需传入参数

AOP内容

代码如下:

import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.example.anno.MainTransaction;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
 
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
 
/**
 * 多线程事务
 */
@Aspect
@Component
public class TransactionAop {
 
    //用来存储各线程计数器数据(每次执行后会从map中删除)
 private static final Map<String, Object> map = new HashMap<>();
 
 @Resource
 private PlatformTransactionManager transactionManager;
 
 @Around("@annotation(mainTransaction)")
 public void mainIntercept(ProceedingJoinPoint joinPoint, MainTransaction mainTransaction) throws Throwable {
  //当前线程名称
  Thread thread = Thread.currentThread();
  String threadName = thread.getName();
  //初始化计数器
  CountDownLatch mainDownLatch = new CountDownLatch(1);
  CountDownLatch sonDownLatch = new CountDownLatch(mainTransaction.value());//@MainTransaction注解中的参数, 为子线程的数量
  // 用来记录子线程的运行状态,只要有一个失败就变为true
  AtomicBoolean rollBackFlag = new AtomicBoolean(false);
  // 用来存每个子线程的异常,把每个线程的自定义异常向vector的首位置插入,其余异常向末位置插入,避免线程不安全,所以使用vector代替list
  Vector<Throwable> exceptionVector = new Vector<>();
 
  map.put(threadName + "mainDownLatch", mainDownLatch);
  map.put(threadName + "sonDownLatch", sonDownLatch);
  map.put(threadName + "rollBackFlag", rollBackFlag);
  map.put(threadName + "exceptionVector", exceptionVector);
 
  try {
   joinPoint.proceed();//执行方法
  } catch (Throwable e) {
   exceptionVector.add(0, e);
   rollBackFlag.set(true);//子线程回滚
   mainDownLatch.countDown();//放行所有子线程
  }
 
  if (!rollBackFlag.get()) {
   try {
    // sonDownLatch等待,直到所有子线程执行完插入操作,但此时还没有提交事务
    sonDownLatch.await();
    mainDownLatch.countDown();// 根据rollBackFlag状态放行子线程的await处,告知是回滚还是提交
   } catch (Exception e) {
    rollBackFlag.set(true);
    exceptionVector.add(0, e);
   }
  }
  if (CollectionUtils.isNotEmpty(exceptionVector)) {
   map.remove(threadName + "mainDownLatch");
   map.remove(threadName + "sonDownLatch");
   map.remove(threadName + "rollBackFlag");
   map.remove(threadName + "exceptionVector");
   if(exceptionVector.size()>0){
   		throw exceptionVector.get(0);
   }
  }
 }
 
 @Around("@annotation(com.huigu.common.anno.SonTransaction)")
 public void sonIntercept(ProceedingJoinPoint joinPoint) throws Throwable {
  Object[] args = joinPoint.getArgs();
  Thread thread = (Thread) args[args.length - 1];
  String threadName = thread.getName();
  CountDownLatch mainDownLatch = (CountDownLatch) map.get(threadName + "mainDownLatch");
  if (mainDownLatch == null) {
   //主事务未加注解时, 直接执行子事务
   joinPoint.proceed();//这里最好的方式是:交由上面的thread来调用此方法,但我没有找寻到对应api,只能直接放弃事务, 欢迎大神来优化, 留言分享
   return;
  }
  CountDownLatch sonDownLatch = (CountDownLatch) map.get(threadName + "sonDownLatch");
  AtomicBoolean rollBackFlag = (AtomicBoolean) map.get(threadName + "rollBackFlag");
  Vector<Throwable> exceptionVector = (Vector<Throwable>) map.get(threadName + "exceptionVector");
 
  //如果这时有一个子线程已经出错,那当前线程不需要执行
  if (rollBackFlag.get()) {
   sonDownLatch.countDown();
   return;
  }
 
  DefaultTransactionDefinition def = new DefaultTransactionDefinition();// 开启事务
  def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);// 设置事务隔离级别
  TransactionStatus status = transactionManager.getTransaction(def);
 
  try {
   joinPoint.proceed();//执行方法
 
   sonDownLatch.countDown();// 对sonDownLatch-1
   mainDownLatch.await();// 如果mainDownLatch不是0,线程会在此阻塞,直到mainDownLatch变为0
   // 如果能执行到这一步说明所有子线程都已经执行完毕判断如果atomicBoolean是true就回滚false就提交
   if (rollBackFlag.get()) {
    transactionManager.rollback(status);
   } else {
    transactionManager.commit(status);
   }
  } catch (Throwable e) {
   exceptionVector.add(0, e);
   // 回滚
   transactionManager.rollback(status);
   // 并把状态设置为true
   rollBackFlag.set(true);
   mainDownLatch.countDown();
   sonDownLatch.countDown();
  }
 }
}

CountDownLatch是什么?

一个同步辅助类

  • 创建对象时: 用给定的数字初始化 CountDownLatch
  • countDown() 方法: 使计数减1
  • await() 方法: 阻塞当前线程, 直至当前计数到达零。

本文中:

用 计数 1 初始化的 mainDownLatch 当作一个简单的开/关锁存器,或入口:在通过调用 countDown() 的线程打开入口前,所有调用 await 的线程都一直在入口处等待。

用 子线程数量 初始化的 sonDownLatch 可以使一个线程在 N 个线程完成某项操作之前一直等待,或者使其在某项操作完成 N 次之前一直等待。

注解使用Demo

import com.example.demo.anno.SonTransaction;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
 
/**
 * 测试子service
 */
@Service
public class SonService {
 
    /**
     * 参数说明:  以下4个方法参数和此相同
     *
     * @param args   业务中需要传递的参数
     * @param thread 调用者的线程, 用于aop获取参数, 不建议以方法重写的方式简略此参数,
     *               在调用者方法中可以以此参数为标识计算子线程的个数作为注解参数,避免线程参数计算错误导致锁表
     *               传参时参数固定为: Thread.currentThread()
     */
    @Transactional(rollbackFor = Exception.class)
    @Async("threadPoolTaskExecutor")
    @SonTransaction
    public void sonMethod1(String args, Thread thread) {
        System.out.println(args + "开启了线程");
    }
 
    @Transactional(rollbackFor = Exception.class)
    @Async("threadPoolTaskExecutor")
    @SonTransaction
    public void sonMethod2(String args1, String args2, Thread thread) {
        System.out.println(args1 + "和" + args2 + "开启了线程");
    }
 
    @Transactional(rollbackFor = Exception.class)
    @Async("threadPoolTaskExecutor")
    @SonTransaction
    public void sonMethod3(String args, Thread thread) {
        System.out.println(args + "开启了线程");
    }
 
    //sonMethod4方法没有使用线程池
    @Transactional(rollbackFor = Exception.class)
    public void sonMethod4(String args) {
        System.out.println(args + "没有开启线程");
    }
}

调用方:

 
import com.example.demo.anno.MainTransaction;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
 
import javax.annotation.Resource;
 
/**
 * 测试主service
 */
@Service
public class MainService {
 
    @Resource
    private SonService sonService;
 
    @MainTransaction(3)//调用的方法中sonMethod1/sonMethod2/sonMethod3使用@Async开启了线程, 所以参数为: 3
    @Transactional(rollbackFor = Exception.class)
    public void test1() {
        sonService.sonMethod1("路飞", Thread.currentThread());
        sonService.sonMethod2("索隆", "山治", Thread.currentThread());
        sonService.sonMethod3("娜美", Thread.currentThread());
        sonService.sonMethod4("罗宾");
    }
 
    /*
     * 有的业务中存在if的多种可能, 每一种走向调用的方法(开启线程的方法)数量如果不同, 这时可以选择放弃使用@MainTransaction注解避免锁表
     * 这时候如果发生异常会导致多线程不能同时回滚, 可根据业务自己权衡是否使用
     */
    @Transactional(rollbackFor = Exception.class)
    public void test2() {
        sonService.sonMethod1("路飞", Thread.currentThread());
        sonService.sonMethod2("索隆", "山治", Thread.currentThread());
        sonService.sonMethod3("娜美", Thread.currentThread());
        sonService.sonMethod4("罗宾");
    }
 
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

多线程事务的实现 的相关文章

随机推荐

  • JDK的命令行工具——修改中

    目录 一 jps 虚拟机进程状况工具 二 jstat 虚拟机统计信息监视工具 三 jinfo java配置信息工具 四 jmap java内存映像工具 五 jhat 虚拟机堆转储快照分析工具 仅做了解即可 六 jstack java堆栈跟踪
  • 以太坊私有网络的设置与体验

    记录一下搭建一个以太坊私有网络环境的过程 方便以后的开发 我这里采用的是Geth客户端 在geth ethereum org网站上有详细的文档介绍 这里主要是按照官网的教程来操作 安装 我是Ubuntu的环境 执行以下命令来安装 sudo
  • 使用hydra进行FTP认证破解

    hydra入门 hydra是什么 hydra的安装 hydra的基本使用 熟悉常见协议 HTTP协议 FTP协议 SSH协议 Telnet协议 熟悉hydra的参数 基本参数 高级参数 使用方法 使用hydra进行HTTP认证破解 HTTP
  • 搭建机器人电控系统——通信协议——串口通信USART/UART、RS232、RS485及其实例

    通信协议 串口通信详解 IIC通信详解 SPI通信详解 CAN通信详解 文章目录 通信协议 什么是串口 串口分类 USART UART RS232 RS485的区别 串口协议原理 传输协议 需要定义的参数 发送函数USART SendDat
  • Java是一门什么语言?

    个人理解 Java代码需要先编译成class 然后交给JVM执行 而JVM在执行class代码时是解释执行的 所以Java不是一门单纯的编译型或解释型语言 它是一门混合型语言 它是集编译型语言和解释型语言的优势于一身 即执行速度较快 只需编
  • 微调(fine-tuning)

    微调 fine tuing 是一种迁移学习 transfer learning 方法 在迁移学习过程中 预训练的模型的权重会根据新数据进行训练和调整
  • Python工程师面试必备25条知识点

    1 到底什么是Python 你可以在回答中与其他技术进行对比 Python是一种解释型语言 与C语言和C的衍生语言不同 Python代码在运行之前不需要编译 其他解释型语言还包括PHP和Ruby Python是动态类型语言 指的是你在声明变
  • Vue实现高德地图信息窗

  • 操作系统——读者写者问题(写者优先)

    阅读前提醒 本文代码为伪代码 仅供理解 马上就要被关得精神失常了 也许这是我的最后一条博客了吧 啊哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈水文来咯 1 什么是读者写者问题 一个数据文件或记录可被多个进程共享
  • 当pytest遇上poium会擦出什么火花 ?

    首先 创建一个test sample test demo py 文件 写入下面三行代码 def test bing page page get https www bing com assert page get title 必应 不要问题
  • 栈和队列-P79-9

    队列的最大容量为MaxSize 这句话并不是说该队列存满时的元素个数为MaxSize 这一种情况是最大容量为MaxSize 没有申请其他数据成员 判断队列满的条件是Q front Q rear 1 MaxSize 解释 通俗的解释 Q re
  • 强化学习:带起始探索的每次访问同策回合更新算法求解机器人找金币问题

    1 问题描述 2 环境建模 3 游戏环境类roadenv 设计 class roadenv def init self epsilon 0 5 gamma 0 8 状态空间 动作空间 self states 1 2 3 4 5 6 7 8
  • Python 线程池 ThreadPoolExecutor

    线程池 以前我们定义多线程任务的时候都是通过循环来控制线程数量 很不优雅 import threading class MyThread threading Thread def init self threadID name counte
  • 如何比较PixelCNN与DCGAN两种Image generation方法?

    今天组会读了一下deepmind的PixelCNN nips的那篇 不是很明白到底为什么follow的work这么多 而且pixel rnn还拿了best paper award 感觉pixel by pixel生成是一种非常反直觉的生成方
  • png格式解码库移植过程详解

    1 zlib库和png库的源码获取 1 zlib库源码下载网址 http www zlib net 2 libpng库源码下载网址 ftp ftp simplesystems org pub libpng png src libpng16
  • Redis 整合 Jedis SpringBoot

    1 Redis 整合 Jedis 1 1 Jedis 环境准备 A Jedis 的 Jar 包
  • 2021-07-28 读书笔记:Python 学习手册(1)

    读书笔记 Python 学习手册 1 结于2021 07 28 OREILY的书籍 可读性很强 入门类 而且这本书很厚 第一部分 使用入门 第二部分 类型和运算 书前文 Python是一种简单的 解释型的 交互式的 可移植的 面向对象的超高
  • python算法中的深度学习算法之前馈神经网络(详解)

    目录 学习目标 学习内容 前馈神经网络 多层感知机 卷积神经网络
  • 6 FFmpeg从入门到精通-FFmpeg滤镜使用

    1 FFmpeg从入门到精通 FFmpeg简介 2 FFmpeg从入门到精通 FFmpeg工具使用基础 3 FFmpeg从入门到精通 FFmpeg转封装 4 FFmpeg从入门到精通 FFmpeg转码 5 FFmpeg从入门到精通 FFmp
  • 多线程事务的实现

    为了提高效率 在批量执行SQL时 可以采用多线程并发执行的方式 每个线程在执行完SQL后 暂时不提交事务 而是等待所有线程的SQL执行成功后 一起进行提交 如果其中任何一个线程执行失败 则所有线程都会回滚 一 springboot多线程 声