Spring Boot 多数据源及事务解决方案

2023-11-01

一个主库和N个应用库的数据源,并且会同时操作主库和应用库的数据,需要解决以下两个问题:

  • 如何动态管理多个数据源以及切换?

  • 如何保证多数据源场景下的数据一致性(事务)?

本文主要探讨这两个问题的解决方案,希望能对读者有一定的启发。

1. 数据源切换原理

通过扩展Spring提供的抽象类AbstractRoutingDataSource,可以实现切换数据源。其类结构如下图所示:

  • targetDataSources&defaultTargetDataSource

项目上需要使用的所有数据源和默认数据源。

  • resolvedDataSources&resolvedDefaultDataSource

当Spring容器创建AbstractRoutingDataSource对象时,通过调用afterPropertiesSet复制上述目标数据源。由此可见,一旦数据源实例对象创建完毕,业务无法再添加新的数据源。

  • determineCurrentLookupKey

此方法为抽象方法,通过扩展这个方法来实现数据源的切换。目标数据源的结构为:Map<Object, DataSource>其key为lookup key

我们来看官方对这个方法的注释:

lookup key通常是绑定在线程上下文中,根据这个key去resolvedDataSources中取出DataSource。

根据目标数据源的管理方式不同,可以使用基于配置文件和数据库表两种方式。基于配置文件管理方案无法后续添加新的数据源,而基于数据库表方案管理,则更加灵活。

2. 配置文件解决方案

根据上面的分析,我们可以按照下面的步骤去实现:

  • 定义DynamicDataSource类继承AbstractRoutingDataSource,重写determineCurrentLookupKey()方法。

  • 配置多个数据源注入targetDataSourcesdefaultTargetDataSource,通过afterPropertiesSet()方法将数据源写入resolvedDataSourcesresolvedDefaultDataSource

  • 调用AbstractRoutingDataSourcegetConnection()方法时,determineTargetDataSource()方法返回DataSource执行底层的getConnection()

其流程如下图所示:

2.1 创建数据源

DynamicDataSource数据源的注入,目前业界主流实现步骤如下:

在配置文件中定义数据源

spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.driverClassName=com.mysql.jdbc.Driver
# 主数据源
spring.datasource.druid.master.url=jdbcUrl
spring.datasource.druid.master.username=***
spring.datasource.druid.master.password=***
# 其他数据源
spring.datasource.druid.second.url=jdbcUrl
spring.datasource.druid.second.username=***
spring.datasource.druid.second.password=***

在代码中配置Bean

@Configuration
public class DynamicDataSourceConfig {
    @Bean
    @ConfigurationProperties("spring.datasource.druid.master")
    public DataSource firstDataSource(){
        return DruidDataSourceBuilder.create().build();
    }
 
    @Bean
    @ConfigurationProperties("spring.datasource.druid.second")
    public DataSource secondDataSource(){
        return DruidDataSourceBuilder.create().build();
    }
 
    @Bean
    @Primary
    public DynamicDataSource dataSource(DataSource firstDataSource, DataSource secondDataSource) {
        Map<Object, Object> targetDataSources = new HashMap<>(5);
        targetDataSources.put(DataSourceNames.FIRST, firstDataSource);
        targetDataSources.put(DataSourceNames.SECOND, secondDataSource);
        return new DynamicDataSource(firstDataSource, targetDataSources);
    }
}

2.2 AOP处理

通过DataSourceAspect切面技术来简化业务上的使用,只需要在业务方法添加@SwitchDataSource注解即可完成动态切换:

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface SwitchDataSource {
    String value();
}

DataSourceAspect拦截业务方法,更新当前线程上下文DataSourceContextHolder中存储的key,即可实现数据源切换。

2.3 方案不足

基于AbstractRoutingDataSource的多数据源动态切换,有个明显的缺点,无法动态添加和删除数据源。在我们的产品中,不能把应用数据源写死在配置文件。接下来分享一下基于数据库表的实现方案。

3. 数据库表解决方案

我们需要实现可视化的数据源管理,并实时查看数据源的运行状态。所以我们不能把数据源全部配置在文件中,应该将数据源定义保存到数据库表。参考AbstractRoutingDataSource的设计思路,实现自定义数据源管理。

3.1 设计数据源表

主库的数据源信息仍然配置在项目配置文件中,应用库数据源配置参数,则设计对应的数据表。表结构如下所示:

这个表主要就是DataSource的相关配置参数,其相应的ORM操作代码在此不再赘述,主要是实现数据源的增删改查操作。

3.2 自定义数据源管理

3.2.1 定义管理接口

通过继承AbstractDataSource即可实现DynamicDataSource。为了方便对数据源进行操作,我们定义一个接口DataSourceManager,为业务提供操作数据源的统一接口。

public interface DataSourceManager {
    void put(String var1, DataSource var2);
 
    DataSource get(String var1);
 
    Boolean hasDataSource(String var1);
 
    void remove(String var1);
 
    void closeDataSource(String var1);
 
    Collection<DataSource> all();
}

该接口主要是对数据表中定义的数据源,提供基础管理功能。

3.2.2 自定义数据源

DynamicDataSource的实现如下图所示:

根据前面的分析,AbstractRoutingDataSource是在容器启动的时候,执行afterPropertiesSet注入数据源对象,完成之后无法对数据源进行修改。DynamicDataSource则实现DataSourceManager接口,可以将数据表中的数据源加载到dataSources。

3.2.3 切面处理

这一块的处理跟配置文件数据源方案处理方式相同,都是通过AOP技术切换lookup key。

public DataSource determineTargetDataSource() {
        String lookupKey = DataSourceContextHolder.getKey();
        DataSource dataSource = Optional.ofNullable(lookupKey)
                .map(dataSources::get)
                .orElse(defaultDataSource);
        if (dataSource == null) {
            throw new IllegalStateException("Cannot determine DataSource for lookup key [" + lookupKey + "]");
        }
        return dataSource;
    }

4.2.4 管理数据源状态

在项目启动的时候,加载数据表中的所有数据源,并执行初始化。初始化操作主要是使用SpringBoot提供的DataSourceBuilder类,根据数据源表的定义创建DataSource。在项目运行过程中,可以使用定时任务对数据源进行保活,为了提升性能再添加一层缓存。

AbstractRoutingDataSource 只支持单库事务,切换数据源是在开启事务之前执行。 Spring使用 DataSourceTransactionManager进行事务管理。开启事务,会将数据源缓存到DataSourceTransactionObject对象中,后续的commit和 rollback事务操作实际上是使用的同一个数据源。

如何解决切库事务问题?借助Spring的声明式事务处理,我们可以在多次切库操作时强制开启新的事务:

@SwitchDataSource    
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)

这样的话,执行切库操作的时候强制启动新事务,便可实现多次切库而且事务能够生效。但是这种事务方式,存在数据一致性问题:

假若ServiceB正常执行提交事务,接着返回ServiceA执行并且发生异常。因为两次处理是不同的事务,ServiceA这个事务执行回滚,而ServiceA事务已经提交。这样的话,数据就不一致了。接下来,我们主要讨论如何解决多库的事务问题。

4. 多库事务处理

4.1 关于事务的理解

首先有必要理解事务的本质。

1.提到Spring事务,就离不开事务的四大特性和隔离级别、七大传播特性。

事务特性和离级别是属于数据库范畴。Spring事务的七大传播特性是什么呢?它是Spring在当前线程内,处理多个事务操作时的事务应用策略,数据库事务本身并不存在传播特性。

2.Spring事务的定义包括:begin、commit、rollback、close、suspend、resume等动作。

  • begin(事务开始): 可以认为存在于数据库的命令中,比如Mysql的start transaction命令,但是在JDBC编程方式中不存在。

  • close(事务关闭): Spring事务的close()方法,是把Connection对象归还给数据库连接池,与事务无关。

  • suspend(事务挂起): Spring中事务挂起的语义是:需要新事务时,将现有的Connection保存起来(还有尚未提交的事务),然后创建新的Connection2Connection2提交、回滚、关闭完毕后,再把Connection1取出来继续执行。

  • resume(事务恢复): 嵌套事务执行完毕,返回上层事务重新绑定连接对象到事务管理器的过程。

实际上,只有commit、rollback、close是在JDBC真实存在的,而其他动作都是应用的语意,而非JDBC事务的真实命令。因此,事务真实存在的方法是:setAutoCommit()commit()rollback()

close()语义为:

  • 关闭一个数据库连接,这已经不再是事务的方法了。

使用DataSource并不会执行物理关闭,只是归还给连接池。

4.2 自定义管理事务

为了保证在多个数据源中事务的一致性,我们可以手动管理Connetion的事务提交和回滚。考虑到不同ORM框架的事务管理实现差异,要求实现自定义事务管理不影响框架层的事务。

这可以通过使用装饰器设计模式,对Connection进行包装重写commit和rolllback屏蔽其默认行为,这样就不会影响到原生Connection和ORM框架的默认事务行为。其整体思路如下图所示:

这里并没有使用前面提到的@SwitchDataSource,这是因为我们在TransactionAop中已经执行了lookupKey的切换。

4.2.1 定义多事务注解

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MultiTransaction {
    String transactionManager() default "multiTransactionManager";
    // 默认数据隔离级别,随数据库本身默认值
    IsolationLevel isolationLevel() default IsolationLevel.DEFAULT;
    // 默认为主库数据源
    String datasourceId() default "default";
    // 只读事务,若有更新操作会抛出异常
    boolean readOnly() default false;

业务方法只需使用该注解即可开启事务,datasourceId指定事务用到的数据源,不指定默认为主库。

4.2.2 包装Connection

自定义事务我们使用包装过的Connection,屏蔽其中的commit&rollback方法。这样我们就可以在主事务里进行统一的事务提交和回滚操作。

public class ConnectionProxy implements Connection {
 
    private final Connection connection;
 
    public ConnectionProxy(Connection connection) {
        this.connection = connection;
    }
 
    @Override
    public void commit() throws SQLException {
        // connection.commit();
    }
 
    public void realCommit() throws SQLException {
        connection.commit();
    }
 
    @Override
    public void close() throws SQLException {
        //connection.close();
    }
 
    public void realClose() throws SQLException {
        if (!connection.getAutoCommit()) {
            connection.setAutoCommit(true);
        }
        connection.close();
    }
 
    @Override
    public void rollback() throws SQLException {
        if(!connection.isClosed())
            connection.rollback();
    }
    ...
}

这里commit&close方法不执行操作,rollback执行的前提是连接执行close才生效。这样不管是使用哪个ORM框架,其自身事务管理都将失效。事务的控制就交由MultiTransaction控制了。

4.2.3 事务上下文管理

public class TransactionHolder {
    // 是否开启了一个MultiTransaction
    private boolean isOpen;
    // 是否只读事务
    private boolean readOnly;
    // 事务隔离级别
    private IsolationLevel isolationLevel;
    // 维护当前线程事务ID和连接关系
    private ConcurrentHashMap<String, ConnectionProxy> connectionMap;
    // 事务执行栈
    private Stack<String> executeStack;
    // 数据源切换栈
    private Stack<String> datasourceKeyStack;
    // 主事务ID
    private String mainTransactionId;
    // 执行次数
    private AtomicInteger transCount;
 
    // 事务和数据源key关系
    private ConcurrentHashMap<String, String> executeIdDatasourceKeyMap;
 
}

每开启一个事物,生成一个事务ID并绑定一个ConnectionProxy。事务嵌套调用,保存事务ID和lookupKey至栈中,当内层事务执行完毕执行pop。这样的话,外层事务只需在栈中执行peek即可获取事务ID和lookupKey。

4.2.4 数据源兼容处理

为了不影响原生事务的使用,需要重写getConnection方法。当前线程没有启动自定义事务,则直接从数据源中返回连接。

@Override
public Connection getConnection() throws SQLException {
    TransactionHolder transactionHolder = MultiTransactionManager.TRANSACTION_HOLDER_THREAD_LOCAL.get();
    if (Objects.isNull(transactionHolder)) {
        return determineTargetDataSource().getConnection();
    }
    ConnectionProxy ConnectionProxy = transactionHolder.getConnectionMap()
        .get(transactionHolder.getExecuteStack().peek());
    if (ConnectionProxy == null) {
        // 没开跨库事务,直接返回
        return determineTargetDataSource().getConnection();
    } else {
        transactionHolder.addCount();
        // 开了跨库事务,从当前线程中拿包装过的Connection
        return ConnectionProxy;
    }
}

4.2.5 切面处理

切面处理的核心逻辑是:维护一个嵌套事务栈,当业务方法执行结束,或者发生异常时,判断当前栈顶事务ID是否为主事务ID。如果是的话这时候已经到了最外层事务,这时才执行提交和回滚。详细流程如下图所示:

package com.github.mtxn.transaction.aop;

import com.github.mtxn.application.Application;
import com.github.mtxn.transaction.MultiTransactionManager;
import com.github.mtxn.transaction.annotation.MultiTransaction;
import com.github.mtxn.transaction.context.DataSourceContextHolder;
import com.github.mtxn.transaction.support.IsolationLevel;
import com.github.mtxn.transaction.support.TransactionHolder;
import com.github.mtxn.utils.ExceptionUtils;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import java.lang.reflect.Method;


@Aspect
@Component
@Slf4j
@Order(99999)
public class MultiTransactionAop {

    @Pointcut("@annotation(com.github.mtxn.transaction.annotation.MultiTransaction)")
    public void pointcut() {
        if (log.isDebugEnabled()) {
            log.debug("start in transaction pointcut...");
        }
    }


    @Around("pointcut()")
    public Object aroundTransaction(ProceedingJoinPoint point) throws Throwable {
        MethodSignature signature = (MethodSignature) point.getSignature();
        // 从切面中获取当前方法
        Method method = signature.getMethod();
        MultiTransaction multiTransaction = method.getAnnotation(MultiTransaction.class);
        if (multiTransaction == null) {
            return point.proceed();
        }
        IsolationLevel isolationLevel = multiTransaction.isolationLevel();
        boolean readOnly = multiTransaction.readOnly();
        String prevKey = DataSourceContextHolder.getKey();
        MultiTransactionManager multiTransactionManager = Application.resolve(multiTransaction.transactionManager());
        // 切数据源,如果失败使用默认库
        if (multiTransactionManager.switchDataSource(point, signature, multiTransaction)) return point.proceed();
        // 开启事务栈
        TransactionHolder transactionHolder = multiTransactionManager.startTransaction(prevKey, isolationLevel, readOnly, multiTransactionManager);
        Object proceed;

        try {
            proceed = point.proceed();
            multiTransactionManager.commit();
        } catch (Throwable ex) {
            log.error("execute method:{}#{},err:", method.getDeclaringClass(), method.getName(), ex);
            multiTransactionManager.rollback();
            throw ExceptionUtils.api(ex, "系统异常:%s", ex.getMessage());
        } finally {
            // 当前事务结束出栈
            String transId = multiTransactionManager.getTrans().getExecuteStack().pop();
            transactionHolder.getDatasourceKeyStack().pop();
            // 恢复上一层事务
            DataSourceContextHolder.setKey(transactionHolder.getDatasourceKeyStack().peek());
            // 最后回到主事务,关闭此次事务
            multiTransactionManager.close(transId);
        }
        return proceed;

    }

}

5.总结

本文主要介绍了多数据源管理的解决方案(应用层事务,而非XA二段提交保证),以及对多个库同时操作的事务管理。

需要注意的是,这种方式只适用于单体架构的应用。因为多个库的事务参与者都是运行在同一个JVM进行。如果是在微服务架构的应用中,则需要使用分布式事务管理(譬如:Seata)。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spring Boot 多数据源及事务解决方案 的相关文章

随机推荐

  • 目标检测数据集

    参考 You Only Look Once Unified Real Time Object Detection 你只看一次 统一的实时对象检测 CVPR 2016 1 PASCAL VOC PASCAL VOC 数据集是目标检测领域最常用
  • 推荐图灵出版社的图书

    最近半年买了不少图灵出版社的图书 有java oracle Linux和TCP IP方面的书 每本都读了差不多一半 读后有一种很亲切的感觉 总结下来一句话 很适合受中国传统教育的大部分学生 在此并无讽刺之意 很多学生习惯了 填鸭式 的教育模
  • 使用conda创建Python的虚拟环境

    目录 一 查看已有环境 二 创建虚拟环境 2 1 添加镜像地址 2 2 创建虚拟环境 2 3 激活虚拟环境 2 4 退出虚拟环境 三 pycharm使用环境 四 删除虚拟环境 五 快速创建虚拟环境 一 查看已有环境 键入以下命令 conda
  • nosql

    公共基础 一 NoSQL概述 1 NoSQL NoSQL定义 非关系型 分布式 开放源码和具有横向扩展能力的下一代数据库 NOSQL Not Only SQL 2 数据库的分类 1 TRDB数据库 传统关系型数据库 基于单机集中管理数据理念
  • 机器学习实战——第三章(分类):决策树算法与实例(一)

    前言 今天看了会 机器学习实战 第三章 决策树 很迷 似懂非懂 专业术语太多了 而且有点混乱 对于一个大一概率论没学好的学渣来说 如今大三的我看到那些概率公式和一些概率论专业术语就头疼 马上就打了退堂鼓 早起看了半个小时没看明白果断又躺回了
  • 设置swiper轮播图的css样式无效?

    项目场景 在做vue项目的时候 使用到了vue awesome swiper三方库 当时小编想重写 覆盖轮播图的分页器小圆点的css样式 其次 小编做项目时 使用的css预编译器是SCSS 问题描述 当小编在重写分页器小圆点css样式时 发
  • Vim配置及使用总结

    Vim配置及使用总结 我的Vim配置 Vim安装及配置文件 Vim安装 Vim配置文件 我的Vim配置 Vim配置详解 Vim使用技巧 基本使用介绍 命令模式 输入模式 底线命令模式 技巧总结 二进制文件查看 下面是我在使用vim时的一些配
  • Sass -- 条件语句、循环语句、自定义函数

    文章目录 条件语句 循环语句 for while each 自定义函数 条件语句 在Sass中可以使用 if if else if else 来进行条件判断 用法和JS相同 循环语句 在Sass中可以使用 for while each 来进
  • 4. Hadoop伪分布式运行模式

    文章目录 Hadoop伪分布式运行模式 1 启动HDFS并运行MapReduce程序 1 1 配置集群 配置hadoop env sh 配置core site xml 配置hdfs site xml 1 2 启动集群 格式化NameNode
  • window.open()的所有参数列表(转)

    前言 经常上网的朋友可能会到过这样一些网站 一进入首页立刻会弹出一个窗口 或者按一个连接或按钮弹出 通常在这个窗口里会显示一些注意事项 版权信息 警告 欢迎光顾之类的话或者作者想要特别提示的信息 其实制作这样的页面效果非常的容易 只要往该页
  • Python-玩转数据-Scrapy框架介绍及安装

    一 Scrapy框架说明 1 Scrapy介绍 Scrapy框架官方网址 http doc scrapy org en latest Scrapy是用纯Python实现一个为了爬取网站数据 提取结构性数据而编写的应用框架 用途非常广泛 用户
  • 如果访问云服务器上的文件,如果访问云服务器上的文件

    如果访问云服务器上的文件 内容精选 换一换 WinSCP工具可以实现在本地与远程计算机之间安全地复制文件 与使用FTP上传代码相比 通过 WinSCP 可以直接使用服务器账户密码访问服务器 无需在服务器端做任何配置 通常本地Windows计
  • [Qt]不带标题栏(FramelessWindowHint)的窗体移动及调整大小

    Qt窗体若设置了 setWindowFlags Qt FramelessWindowHint 运行后该窗体是无法进行移动和调整大小的 那要如何才能让它和普通窗体一样进行移动和调整其大小的呢 方案如下 void QFramelessWindo
  • 【八组输入输出你都了解多少】

    目录 前言 一 总览 一 只能从标准流 std 中输入和输出数据 二 可以从所以流中读取和写入数据 三 只能从字符串中读取和写入数据 二 详细描述 一 scanf fscanf sscanf 1 scanf 2 fscanf 3 sscan
  • 震惊,docker操作竟如此简单

    Docker概述 1 什么是Docker Docker 是一个开源的应用容器引擎 让开发者可以打包他们的应用以及依赖包到一个可移植的镜像中 然后发布到任何流行的Linux和Windows操作系统的机器上 也可以实现虚拟化 容器是完全使用沙箱
  • c#—OpenFileDialog(打开文件对话框)

    OpenFileDialog是什么 OpenFileDialog是一个类 实例化此类可以设置弹出一个文件对话框 比如 我们发邮件时需要上传附件的时候 就会弹出一个让我们选择文件的对话框 我们可以根据自己的需求 自行设置一些对话框的属性 那么
  • ASP.NET WebAPI 连接数据库

    ASP NET Web API 是一种框架 用于轻松构建可以访问多种客户端 包括浏览器和移动设备 的 HTTP 服务 ASP NET Web API 是一种用于在 NET Framework 上构建 RESTful 应用程序的理想平台 本文
  • window11中Jdk1.8下载,安装和环境配置(超详细)

    一 下载安装包 这里为了方便大家 提供百度网盘下载 链接 https pan baidu com s 1 Qz7pO226To7yy6ytdPR Q https pan baidu com s 1 Qz7pO226To7yy6ytdPR Q
  • Windows——进程间通信

    进程间通信 进程间通信的概念 Mailslots 关于Mailslots 命名规则 使用 创建 Mailslot 写入 Mailslot 读取Mailslot 管道 关于管道 匿名管道 匿名管道创建 命名管道 命名规则 访问模式 相关操作
  • Spring Boot 多数据源及事务解决方案

    一个主库和N个应用库的数据源 并且会同时操作主库和应用库的数据 需要解决以下两个问题 如何动态管理多个数据源以及切换 如何保证多数据源场景下的数据一致性 事务 本文主要探讨这两个问题的解决方案 希望能对读者有一定的启发 1 数据源切换原理