多线程事务怎么回滚?说用 @Transactional 可以回去等通知了!

2023-11-20

背景介绍

1,最近有一个大数据量插入的操作入库的业务场景,需要先做一些其他修改操作,然后在执行插入操作,由于插入数据可能会很多,用到多线程去拆分数据并行处理来提高响应时间,如果有一个线程执行失败,则全部回滚。

2,在spring中可以使用@Transactional注解去控制事务,使出现异常时会进行回滚,在多线程中,这个注解则不会生效,如果主线程需要先执行一些修改数据库的操作,当子线程在进行处理出现异常时,主线程修改的数据则不会回滚,导致数据错误。

3,下面用一个简单示例演示多线程事务。

公用的类和方法

/**
 * 平均拆分list方法.
 * @param source
 * @param n
 * @param <T>
 * @return
 */
public static <T> List<List<T>> averageAssign(List<T> source,int n){
    List<List<T>> result=new ArrayList<List<T>>();
    int remaider=source.size()%n; 
    int number=source.size()/n; 
    int offset=0;//偏移量
    for(int i=0;i<n;i++){
        List<T> value=null;
        if(remaider>0){
            value=source.subList(i*number+offset, (i+1)*number+offset+1);
            remaider--;
            offset++;
        }else{
            value=source.subList(i*number+offset, (i+1)*number+offset);
        }
        result.add(value);
    }
    return result;
}
/**  线程池配置
 * @version V1.0
 */
public class ExecutorConfig {
    private static int maxPoolSize = Runtime.getRuntime().availableProcessors();
    private volatile static ExecutorService executorService;
    public static ExecutorService getThreadPool() {
        if (executorService == null){
            synchronized (ExecutorConfig.class){
                if (executorService == null){
                    executorService =  newThreadPool();
                }
            }
        }
        return executorService;
    }

    private static  ExecutorService newThreadPool(){
        int queueSize = 500;
        int corePool = Math.min(5, maxPoolSize);
        return new ThreadPoolExecutor(corePool, maxPoolSize, 10000L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(queueSize),new ThreadPoolExecutor.AbortPolicy());
    }
    private ExecutorConfig(){}
}
/** 获取sqlSession
 * @author 86182
 * @version V1.0
 */
@Component
public class SqlContext {
    @Resource
    private SqlSessionTemplate sqlSessionTemplate;

    public SqlSession getSqlSession(){
        SqlSessionFactory sqlSessionFactory = sqlSessionTemplate.getSqlSessionFactory();
        return sqlSessionFactory.openSession();
    }
}

示例事务不成功操作

 /**
 * 测试多线程事务.
 * @param employeeDOList
 */
@Override
@Transactional
public void saveThread(List<EmployeeDO> employeeDOList) {
    try {
        //先做删除操作,如果子线程出现异常,此操作不会回滚
        this.getBaseMapper().delete(null);
        //获取线程池
        ExecutorService service = ExecutorConfig.getThreadPool();
        //拆分数据,拆分5份
        List<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5);
        //执行的线程
        Thread []threadArray = new Thread[lists.size()];
        //监控子线程执行完毕,再执行主线程,要不然会导致主线程关闭,子线程也会随着关闭
        CountDownLatch countDownLatch = new CountDownLatch(lists.size());
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        for (int i =0;i<lists.size();i++){
            if (i==lists.size()-1){
                atomicBoolean.set(false);
            }
            List<EmployeeDO> list  = lists.get(i);
            threadArray[i] =  new Thread(() -> {
                try {
                 //最后一个线程抛出异常
                    if (!atomicBoolean.get()){
                        throw new ServiceException("001","出现异常");
                    }
                    //批量添加,mybatisPlus中自带的batch方法
                    this.saveBatch(list);
                }finally {
                    countDownLatch.countDown();
                }

            });
        }
        for (int i = 0; i <lists.size(); i++){
            service.execute(threadArray[i]);
        }
        //当子线程执行完毕时,主线程再往下执行
        countDownLatch.await();
        System.out.println("添加完毕");
    }catch (Exception e){
        log.info("error",e);
        throw new ServiceException("002","出现异常");
    }finally {
         connection.close();
     }
}

数据库中存在一条数据:

//测试用例
@RunWith(SpringRunner.class)
@SpringBootTest(classes = { ThreadTest01.class, MainApplication.class})
public class ThreadTest01 {

    @Resource
    private EmployeeBO employeeBO;

    /**
     *   测试多线程事务.
     * @throws InterruptedException
     */
    @Test
    public  void MoreThreadTest2() throws InterruptedException {
        int size = 10;
        List<EmployeeDO> employeeDOList = new ArrayList<>(size);
        for (int i = 0; i<size;i++){
            EmployeeDO employeeDO = new EmployeeDO();
            employeeDO.setEmployeeName("lol"+i);
            employeeDO.setAge(18);
            employeeDO.setGender(1);
            employeeDO.setIdNumber(i+"XX");
            employeeDO.setCreatTime(Calendar.getInstance().getTime());
            employeeDOList.add(employeeDO);
        }
        try {
            employeeBO.saveThread(employeeDOList);
            System.out.println("添加成功");
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

测试结果:

可以发现子线程组执行时,有一个线程执行失败,其他线程也会抛出异常,但是主线程中执行的删除操作,没有回滚,@Transactional注解没有生效。

使用sqlSession控制手动提交事务

@Resource
  SqlContext sqlContext;
 /**
 * 测试多线程事务.
 * @param employeeDOList
 */
@Override
public void saveThread(List<EmployeeDO> employeeDOList) throws SQLException {
    // 获取数据库连接,获取会话(内部自有事务)
    SqlSession sqlSession = sqlContext.getSqlSession();
    Connection connection = sqlSession.getConnection();
    try {
        // 设置手动提交
        connection.setAutoCommit(false);
        //获取mapper
        EmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class);
        //先做删除操作
        employeeMapper.delete(null);
        //获取执行器
        ExecutorService service = ExecutorConfig.getThreadPool();
        List<Callable<Integer>> callableList  = new ArrayList<>();
        //拆分list
        List<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5);
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        for (int i =0;i<lists.size();i++){
            if (i==lists.size()-1){
                atomicBoolean.set(false);
            }
            List<EmployeeDO> list  = lists.get(i);
            //使用返回结果的callable去执行,
            Callable<Integer> callable = () -> {
                //让最后一个线程抛出异常
                if (!atomicBoolean.get()){
                    throw new ServiceException("001","出现异常");
                }
              return employeeMapper.saveBatch(list);
            };
            callableList.add(callable);
        }
        //执行子线程
       List<Future<Integer>> futures = service.invokeAll(callableList);
        for (Future<Integer> future:futures) {
        //如果有一个执行不成功,则全部回滚
            if (future.get()<=0){
                connection.rollback();
                 return;
            }
        }
        connection.commit();
        System.out.println("添加完毕");
    }catch (Exception e){
        connection.rollback();
        log.info("error",e);
        throw new ServiceException("002","出现异常");
    }finally {
         connection.close();
     }
}
// sql
<insert id="saveBatch" parameterType="List">
 INSERT INTO
 employee (employee_id,age,employee_name,birth_date,gender,id_number,creat_time,update_time,status)
 values
     <foreach collection="list" item="item" index="index" separator=",">
     (
     #{item.employeeId},
     #{item.age},
     #{item.employeeName},
     #{item.birthDate},
     #{item.gender},
     #{item.idNumber},
     #{item.creatTime},
     #{item.updateTime},
     #{item.status}
         )
     </foreach>
 </insert>

数据库中一条数据:

测试结果:抛出异常,

删除操作的数据回滚了,数据库中的数据依旧存在,说明事务成功了。

成功操作示例:

 @Resource
SqlContext sqlContext;
/**
 * 测试多线程事务.
 * @param employeeDOList
 */
@Override
public void saveThread(List<EmployeeDO> employeeDOList) throws SQLException {
    // 获取数据库连接,获取会话(内部自有事务)
    SqlSession sqlSession = sqlContext.getSqlSession();
    Connection connection = sqlSession.getConnection();
    try {
        // 设置手动提交
        connection.setAutoCommit(false);
        EmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class);
        //先做删除操作
        employeeMapper.delete(null);
        ExecutorService service = ExecutorConfig.getThreadPool();
        List<Callable<Integer>> callableList  = new ArrayList<>();
        List<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5);
        for (int i =0;i<lists.size();i++){
            List<EmployeeDO> list  = lists.get(i);
            Callable<Integer> callable = () -> employeeMapper.saveBatch(list);
            callableList.add(callable);
        }
        //执行子线程
       List<Future<Integer>> futures = service.invokeAll(callableList);
        for (Future<Integer> future:futures) {
            if (future.get()<=0){
                connection.rollback();
                 return;
            }
        }
        connection.commit();
        System.out.println("添加完毕");
    }catch (Exception e){
        connection.rollback();
        log.info("error",e);
        throw new ServiceException("002","出现异常");
       // throw new ServiceException(ExceptionCodeEnum.EMPLOYEE_SAVE_OR_UPDATE_ERROR);
    }
}

测试结果:

数据库中数据:

删除的删除了,添加的添加成功了,测试成功。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

多线程事务怎么回滚?说用 @Transactional 可以回去等通知了! 的相关文章

  • JPanel透明背景和显示元素[重复]

    这个问题在这里已经有答案了 我插入一个背景图e 变成 aJPanel但一些界面元素消失了 以下 Java Swing 元素不会出现 标签标题 标签 usuario 标签 密码 按钮加速器 你能否使图像透明或元素不透明 setOpaque f
  • Java Swing:清除JList而不触发监听器

    我的情况如下 我有一个 JList 只要在列表中进行选择 它就会触发搜索 使用 ListSelectionListener 我正在尝试使用以下命令重置列表上的选择list clearSelection 这样做的问题是使用clearSelec
  • Java/JAXB:将具有相同名称但不同属性值的 XML 元素解组到不同的类成员

    我正在尝试根据其属性之一将具有多个 Fields 元素的 XML 解析为不同的类成员 这是 XML
  • 使用 jdbc 程序连接到 Open Office odb 文件

    我编写了以下代码来连接到 OpenOffice db String db C Documents and Settings hkonakanchi Desktop Test odb Class forName org hsqldb jdbc
  • java“void”和“非void”构造函数

    我用 java 编写了这个简单的类 只是为了测试它的一些功能 public class class1 public static Integer value 0 public class1 da public int da class1 v
  • 删除 servlet 中的 cookie 时出现问题

    我尝试使用以下代码删除 servlet 中的 cookie Cookie minIdCookie null for Cookie c req getCookies if c getName equals iPlanetDirectoryPr
  • 业务代表与服务定位器

    Business Delegate 和 Service Locator 之间有什么区别 两者都负责封装查找和创建机制 如果 Business Delegate 使用 Service Locator 来隐藏查找和创建机制 那么 Busines
  • 如何消除警告:使用“$”而不是“.”对于 Eclipse 中的内部类

    我是 Android 开发新手 当我将 eclipse 和 Android SDK 更新到最新版本后 我收到警告 Use instead of for inner classes or use only lowercase letters
  • Java 的 QP 求解器 [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi
  • getClassLoader().getResource() 返回 null

    我有这个测试应用程序 import java applet import java awt import java net URL public class Test extends Applet public void init URL
  • Java String.format 向整数添加空格

    我有一小段代码 我不明白输出 此输出向我的字符串格式文本添加空格 我做错了什么吗 public class HelloWorld public static void main String args int a1 540 int a2 4
  • 如何在将数据发送到 Firebase 数据库之前对其进行加密?

    我正在使用 Firebase 实时数据库制作聊天应用程序 我知道 Firebase 非常安全 只要您的规则正确 但我自己可以阅读使用我的应用程序的人的所有聊天记录 我想阻止这种情况 为此我需要一种解密和加密方法 我尝试使用凯撒解密 但失败了
  • 通用 JSF 实体转换器[重复]

    这个问题在这里已经有答案了 我正在编写我的第一个 Java EE 6 Web 应用程序作为学习练习 我没有使用框架 只是使用 JPA 2 0 EJB 3 1 和 JSF 2 0 我有一个自定义转换器 用于将存储在 SelectOne 组件中
  • 哪种 Java DOM 包装器是最好或最受欢迎的? [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi
  • 用 Java 创建迷宫求解算法

    我被分配了用 Java 创建迷宫求解器的任务 这是任务 Write an application that finds a path through a maze The maze should be read from a file A
  • 为什么在尝试使用 Java 连接到 RDS PostgreSQL 数据库时会收到 SocketTimeoutException?

    我有一个 Spring 应用程序 我试图在 AWS 上托管 几天来我一直在努力配置 我有一个 EC2 实例 并且能够通过 SSH 连接到它 我还在 AWS 中设置了 Postgres RDS 数据库 但我无法使用 IDE 中的代码连接到它
  • Android 中的字符串加密

    我正在使用代码进行加密和加密 它没有给出字符串结果 字节数组未转换为字符串 我几乎尝试了所有方法将字节数组转换为字符 但没有给出结果 public class EncryptionTest extends Activity EditText
  • 防止 Firebase 中的待处理写入事务不起作用

    我的目标是在单击按钮时将名称插入 Cloud Firestore 中 但如果用户未连接到互联网 我不希望保存处于挂起状态 我不喜欢 Firebase 保存待处理写入的行为 即使互联网连接已恢复 我研究发现Firebase 开发人员建议使用事
  • JDK 7 的快速调试/调试构建

    我正在寻找 JDK 的调试 或者我猜他们称之为快速调试构建 以启用在运行时生成的打印程序集以及查找性能问题时所需的其他诊断 就目前情况而言 我似乎找不到可以直接使用的 现成的 快速调试构建二进制包 有人可以帮我提供下载链接 或者至少提供有关
  • 如何在Java中跨类共享变量,我尝试了静态不起作用

    类 Testclass1 有一个变量 有一些执行会改变变量的值 现在在同一个包中有类 Testclass2 我将如何访问 Testclass2 中变量的更新值 由 Testclass1 更新 试过这个没用 注意 Testclass1和Tes

随机推荐

  • 同时有线内网无线外网的解决方案

    内网有线环境下先固定好自己的IP地址 子网掩码和网关地址 DNS 连接WIFI后 用管理员权限打开CMD命令行 第一步 输入route print 后按回车 会看到左侧网络目标里有两个0 0 0 0的地址 这样就会路由冲突 出现要么只能上内
  • “传统技术”快速搭建AI产品的利器——LLM技术

    文章首发地址 LLM原理 LLM Learning Localization and Mapping 技术的原理是将学习 定位和建图结合起来 实现机器人对环境的感知 定位和地图构建 下面是LLM技术的基本原理 学习 Learning LLM
  • html颜色怎么渐变效果,html怎么设置颜色渐变

    html设置颜色渐变的方法 首先创建一个HTML示例文件 然后使用div标签创建一个模块 接着在css标签内通过 id colorchange 来设置div样式 最后通过linear gradient属性设置div的背景颜色渐变效果即可 本
  • 讲解 最大流问题+最小花费问题+python(ortool库)实现

    文章目录 基本概念 图 邻接矩阵 最大流问题 python解决最大流问题 python解决最大流最小费用问题 喜欢的话请关注我们的微信公众号 你好世界炼丹师 公众号主要讲统计学 数据科学 机器学习 深度学习 以及一些参加Kaggle竞赛的经
  • ul里面可以放div吗?

    在HTML中 ul 标签代表无序列表 可以用来展示项目列表 而 div 标签则是div容器用于分组内容 提供独立于文档的CSS样式和JavaScript事件处理 那么 ul里面可以放div吗 答案是肯定的 下面从多个方面进行详细阐述 一 语
  • MacBook安装使用XMind

    MacBook安装使用XMind XMind简介 官方地址 https www xmind cn XMind 是一个全功能的思维导图和头脑风暴软件 为激发灵感和创意而生 作为一款有效提升工作和生活效率的生产力工具 受到全球百千万用户的青睐
  • 计算机内存插在主板的哪个槽,四个内存插槽,这是正确的安装顺序

    具有防呆设计的主板插槽几乎不可能错误地插入 因此很有可能在第一个插槽中安装了内存 尽管可以 但是会阻止内存在最佳状态下工作 主板手册中有这样一句话 说明 为便于理解 首先对内存插槽编号 从靠近CPU插槽的位置开始 主板 针对不同情况的最佳安
  • 区域生长算法及其实现

    区域生长算法及其实现 背景 前面我们已经介绍了 最大熵分割法 OTSU算法 他们都有各自的优缺点 通常都不是单独使用这些算法 需要和其它算法来结合使用 前面两类算法都是单独对图像的灰度信息进行处理 不包含图像的空间信息 而区域生长算法则包含
  • 云计算与大数据概论第十一周

    分布式计算 分布式计算是一种计算方法 和集中式计算是相对的 随着计算技术的发展 有些应用需要非常巨大的计算能力才能完成 如果采用集中式计算 需要耗费相当长的时间来完成 分布式计算将该应用分解成许多小的部分 分配给多台计算机进行处理 这样可以
  • 如何快速开发一个简单实用的MES系统?

    01 如何快速开发一个简单实用的MES系统 MES生产管理系统 又称制造执行系统 是一种集成了计划 生产 质量控制 库存管理和材料申请等生产流程的管理系统 是企业中实现高效生产的重要一环 根据题主描述 通过产品条形码实现对生产计划下的产品追
  • 小程序的基础(三)

    文章目录 前言 一 navigator 二 rich text 作用 1 nodes属性 注意 三 button open type 的 contact的实现流程 代码示例 1 contact 直接打开 客服对话功能 需要在微信小程序的后台
  • int argc,char*argv[ ]的简洁解释

    1 arguments argument counter 计数个数 和 argument vector 矢量 带有方向的变量参数 也就是指针 argc命令行输入参数的个数 int main int argc char argv int i
  • 【操作系统】王道考研 p42 段页式管理方式

    段页式管理方式 知识总览 分段 分页管理方式中最大的优缺点 关于段式管理会产生外部碎片 ps 分段管理中产生的外部碎片也可以用 紧凑 来解决 只是需要付出较大的时间代价 分段 分页 段页式管理 示意图 先分段 后分页 段页式管理的逻辑地址结
  • oracle 的路径不一致,DG环境搭建,在备库遇到问题,主备库的路径不一致

    现在在做oracle DG的环境搭建 因为实际生产的原因 主备库的路径是不一致的 我把主库的rman文件传到备库后 在备库进行恢复 无法恢复 求指导 oracle std bak rman target sys 123456 pri aux
  • java变量作用域和堆栈

    一 作用域决定了变量的可见性和生命周期 java中变量分为成员变量和局部变量 如下 1 成员变量 在类的所有方法外部声明的变量 即类所拥有的变量 可以被系统初始化 1 1静态成员变量 类被加载时被创建 其生命周期与该类的生命周期相同 1 2
  • 在自己的bash脚本中实现自动补全

    在90年代Linux和DOS共存的年代里 Linux的Shell们有一个最微不足道但也最实用的小功能 就是命令自动补全 而DOS那个笨蛋一直到死都没学会什么叫易用 Linux的这个微不足道的小传统一直延续至今 虽然看似微不足道 其实也极大的
  • stm32无法唤醒DTH11温湿度传感器解决

    关于DTH11的介绍和使用方法可以随便搜索一下别的文章 直接搜索DTH11即可 这里使用艾克姆科技的例程 却无法成功运行 上了示波器才发现拉低时间无法达到18ms 因此无法唤醒DTH11 总线由stm32拉低12ms左右之后就一直处于高电平
  • CString字符串查找和截取

    1 Find 该函数从左侧0索引开始 查找第一个出现的字符位置 CString str abc int postion str Find a 如果查到 返回以0索引起始的位置 未查到 返回 1 如果查到 返回以0索引起始的位置 未查到 返回
  • 卷积神经网络CNN小结(简单实现代码mnist数据集)

    由于全连接神经网络处理图像中的需要训练参数过多的问题 而卷积神经网络中 卷积层的神经元只与前一层的部分 神经元节点相连 既它的神经元的连接是非全连接的 且同一层某些神经元之间的连接的权重w和偏移b是共享的 这样大量减少了训练参数的数量 图1
  • 多线程事务怎么回滚?说用 @Transactional 可以回去等通知了!

    背景介绍 1 最近有一个大数据量插入的操作入库的业务场景 需要先做一些其他修改操作 然后在执行插入操作 由于插入数据可能会很多 用到多线程去拆分数据并行处理来提高响应时间 如果有一个线程执行失败 则全部回滚 2 在spring中可以使用 T