不使用binlog,canal,kafka等,只用java+mybatis拦截器来实现项目中的异步双写主从数据库,代码逻辑全整理

2023-11-05

项目中因为要迁库,所以我要在原项目中接入我的双写逻辑,确保新旧两个库都有数据写入,假如新库写入失败,旧库数据也能写入,这就确保了重要数据不能丢失。
一开始考虑的方案是使用数据同步工具,像是canal或是DTS等,但是环境这块卡的比较死,没有其他花里胡哨的工具,只能纯靠java改写代码来实现了,期间排了不少坑,这里做个人踩坑记录

实现效果,批量双写全部报200,自测下来还算成功
在这里插入图片描述

首先列一下要实现类的目录,因为我们门户,任务,和开放接口都是单独一套springboot然后共用common包的形式,所以这套package每个springboot都要引入

在这里插入图片描述
这里整理的比较匆忙,但重点其实是这几个类
在这里插入图片描述
aop中TargetDataSource之前说过了,还包括DataSourceConfig,DataSourceUtil和DynamicDataSource的配置,可以看我之前写的这一篇:mybatis动态数据源配置(附自定义注解实现数据源切换)

另外一个注解没有用上,是做多数据源事务切换的作用,想看代码和使用场景的,完全参考如下:配置多个数据源的事务
在这里插入图片描述
现在开始讲重点,为了追踪mysql中实时变化的数据,就要用到mybatis拦截器,这是我之前参考的博客,但不能完全适用于我的业务场景:用mybatis 拦截器 为insert update操作填充字段

说一下我的写法
进入AutoFillInterceptor类,也就是我的拦截器

在这里插入图片描述

因为我要引入我的DoubleWriteService做我的双写逻辑,这里会发生第一个坑,因为service加载速度是在拦截器后面的,直接Autowired启动会报错,这里用到了懒加载机制,确保启动顺利
在这里插入图片描述
之后进入拦截器的时候先判空,再去SpringUtils取bean,就能调用了

在这里插入图片描述
附上工具类SpringUtils

@Component
public class SpringUtils implements ApplicationContextAware {
    private static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext)
            throws BeansException {
        if (SpringUtils.applicationContext == null) {
            SpringUtils.applicationContext = applicationContext;
        }

    }

    public static ApplicationContext getApplicationContext() {
        return applicationContext;
    }

    //根据name
    public static Object getBean(String name) {
        return getApplicationContext().getBean(name);
    }

    //根据类型
    public static <T> T getBean(Class<T> clazz) {
        return getApplicationContext().getBean(clazz);
    }

    public static <T> T getBean(String name, Class<T> clazz) {
        return getApplicationContext().getBean(name, clazz);
    }

}

接着往下走方法,这里遇到的第一个坑就是无限循环调用导致的stackoverflow,看过前面那篇博客的都知道,这个mybatis拦截器的作用其实就是去监控各类dao增改的操作,如果走到我的dao里,就会导致再次进入拦截器,然后再进入我的dao,循环往复导致了爆栈,所以我的操作很简单,就是取MappedStatement中的id,也就是完整的包路径+Dao类名,如果是就直接返回
在这里插入图片描述

接下来就是处理我的第一个arg,这个args是如何产生的呢,开头我们声明的就是args,第一个就肯定是MappedStatement了,第二个Object是我们的传参,可以是实体类或是map,这个之后会讲
在这里插入图片描述

取MappedStatement的用处其实有很多,自己debug的话可以看到许多mybatis分装的参数,我这里就取id就够了,一般常用的就是取sql字符串了,这里注释的就是取sql并且替换占位符?的逻辑。我看了好几篇博客,都写的一样:SpringBoot通过MyBatis拦截器打印完整SQL语句(无问号) 但我自己跑的时候却发现获取不到参数,所以我的双写不用直接sql的形式做
在这里插入图片描述
这里我就用到了第二个args,也就是增改他可以传entity类,可以传map,我这里做了类判断和新增修改判断,然后分别进入我相应的方法里。我的service入口传参简单明了,就是传表名和map/entity的形式
在这里插入图片描述
顺便说一下枚举的作用,因为我拦截器只能获得sqlID,为了获取表名tableName,这里枚举其实用到了包含,其实就是如下效果,假如表名交lzq_test1,包路径名叫com.lzq.common.dao.lzqTest1Dao
在这里插入图片描述
拦截器获取的sqlId一般都叫com.lzq.common.dao.lzqTest1Dao.insert,所以这里用contains去获取
在这里插入图片描述
枚举讲完了,之后就进入我的service做主要逻辑了

在这里插入图片描述
可以看到insert和update各做了map和obj类的传参,确保都能顺利执行写入,先讲一下各个方法的作用:

  1. void dealWithSql(String sql); 直接传sql执行sql,因为拦截器那一步获取不到sql,这个方法没用上
  2. void update(String tableName, Object o); 更新逻辑入口,支持obj和map
  3. void insert(String tableName, Object o);新增逻辑入口,支持obj和map
  4. String processUpdateBinLog(Map<String, Object> map,String tableName,String isUpdate); 切换数据源执行的入口,因为他是要上注解,同类调用会让注解失效,所以要设成public外部调取
  5. List getRedisFailList(String tableName,String isUpdate); 我会把dao执行失败的传参放入redis里,这里统一获取失败后的传参全是jsonstring格式
  6. void handleTransactionals(List tableNameList, List objList,List isUpdateList); 处理事务的方法,表名列表,对象列表和新增修改列表,一一对应,统一成功或失败,这里没用上

进入实现类,所有依赖如图所示,事务的注掉了因为发现不好用,同类调用避免依赖问题再次加上@Lazy
在这里插入图片描述
线程池根据应用自己配

@Data
@Configuration
@ConfigurationProperties(prefix = "executor")
@EnableConfigurationProperties(ExecutorConfig.class)
public class ExecutorConfig {
    private int corePoolSize     = 5;//2
    private int maxPoolSize      = 10;//4
    private int keepAliveSeconds = 60;
    private int queueCapacity    = 200;

    @Bean
    public TraceThreadPoolExecutor traceThreadPoolExecutor() {
        return new TraceThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveSeconds, queueCapacity, rejectedExecutionHandler());
    }

    RejectedExecutionHandler rejectedExecutionHandler() {
        return new ThreadPoolExecutor.CallerRunsPolicy();
    }
}

双写开关根据配置文件实现
在这里插入图片描述
新增/更新方法如下

在这里插入图片描述
在这里插入图片描述
再往下走,根据需要自己配

 public <V> String processUpdateBinLogData(String tableName, Map<String, Object> param,String isUpdate) { //, int threadCount
        String result;
        if (null == tableName) {
            return null;
        }
        result = procUpdateBinLogByMultiThread(tableName,param,isUpdate);
        /** 请求数据超过二十,才使用多线程,否则就是单线程 */
//        int threadCount = 2;//requestList.size() > 20 ? 20 : 1;
//        try {
//            result = procUpdateBinLogByMultiThread(tableName,param,isUpdate);
//            if (threadCount > 1) {
//                /* 多线程处理 */
//                result = procUpdateBinLogByMultiThread(enumByTable,param, threadCount);
//            } else {
//                /* 直接处理 */
//                result = processUpdateBinLog(param, enumByTable);
//            }
//        } catch (Exception e) {
//
//        }
        return result;
    }
private <V> String procUpdateBinLogByMultiThread(String tableName,Map<String, Object> param,String isUpdate) {
        //List<String> result = new ArrayList<>();
        if (!param.isEmpty()) {
            processUpdateBinLogDataByThread(param, tableName,isUpdate);
        }

//        if (!param.isEmpty() && null != threadCount) {
//          List<List<V>> splitList = ListUtils.averageSplit(requestList, threadCount);
//			if (CollectionUtils.isNotEmpty(splitList)){
//				List<Future<List<String>>> futureList = new ArrayList<>();
//				for (List<V> r : splitList) {
//                  Future<List<String>> future = processUpdateBinLogDataByThread(param, enumByTable);
//					if (null != future){
//						futureList.add(future);
//					}
//				}
//				/** 获取多线程返回结果 */
//				for (Future<List<String>> r : futureList) {
//					result.addAll(r.get());
//				}
//			}
//            result.addAll(future.get());
//        }
       return null;
    }

这里用Future类和线程池去走异步

 public <T> Future<String> processUpdateBinLogDataByThread(Map<String, Object> param, String tableName,String isUpdate) {
        //Future<List<String>> result = null;
        Future<String> result = null;
        //if (CollectionUtils.isNotEmpty(requestList)) {
        if (!param.isEmpty()) {
            result = threadPoolExecutor.submit(new TraceAsyncCallableTask<String>() {
                private final Map<String, Object> paramIn = param;
                private final String tableNameIn = tableName;
                private final String isUpdateIn = isUpdate;
                private Object res;
                @Override
                public String getMethod() {
                    return "processUpdateBinLogDataByThread";
                }
                @Override
                public String[] getParams() {
                    String[] paraArr = new String[1];
                    return paraArr;
                }
                @Override
                public String getService() {
                    return "DoubleWriteService";
                }
                @Override
                public Object getRes() {
                    return res;
                }
                @Override
                public String call() {
                    //同类调用才能生效注解
                    return doubleWriteService.processUpdateBinLog(paramIn, tableNameIn,isUpdateIn);
                }
            });
        }
        return result;
    }

注意这里,用到了切换数据源的注解

 	@Override
    @TargetDataSource(connName = "dbTwo") //异步重新生效不注解,换成同类调用
    public <V> String processUpdateBinLog(Map<String, Object> map, String tableName,String isUpdate) {
        //List<String> result = new ArrayList<>();
        //String result = "fail";
        int isSucc = 0;
        Map<String, Object> queryMap = new HashMap<>();
        //String str;
        if (!map.isEmpty()) {
            try {
                //报错测试
                //int i = 1;i = i /0;
                map = humpToUnderline(map);
                queryMap.put("tableName", tableName);
                queryMap.put("fieldsMap", map);
                queryMap.put("queryMap", map);
                if (DoubleWriteConstants.UPDATE.equals(isUpdate)){
                    isSucc = dataMigrationDao.updateTableListDynamic(queryMap);
                    //更新直接去map的主键
                    logger.info("{}表双写{}成功,主键->{}", tableName,isUpdate,  map.get("id"));
                }else {
                    isSucc = dataMigrationDao.insertTableListDynamic(queryMap);
                    //新增取mybatis返回主键
                    logger.info("{}表双写{}成功,主键->{}", tableName,isUpdate, queryMap.get("id"));
                }
            }catch (Exception e){
                String jsonStr = JSONObject.toJSONString(map);
                //key格式:前缀+新增/更新+表名
                cacheClient.lpush( DoubleWriteConstants.REDIS_KEY_PREFIX +  isUpdate + tableName,jsonStr);
                logger.error("{}表双写{}错误,json->{},异常->{}" ,tableName,isUpdate,jsonStr, e.getMessage());
            }
        }

        return null;
    }

驼峰转下划线逻辑,这里用hutool的逻辑复制过来的

 	/**
     * 把 map 中的 key 由驼峰命名转为下划线,使用LinkedHashMap确保字段顺序一致性
     */
    private HashMap<String, Object> humpToUnderline(Map<String, Object> map) {
        //使用LinkedHashMap确保字段顺序一致性
        HashMap<String, Object> transitionMap = new LinkedHashMap<>(16);
        map.forEach((k, v) -> transitionMap.put(toUnderlineCase(k), v));
        return transitionMap;
    }

    public static String toUnderlineCase(CharSequence str) {
        return toSymbolCase(str, '_');
    }

    public static String toSymbolCase(CharSequence str, char symbol) {
        if (str == null) {
            return null;
        } else {
            int length = str.length();
            StringBuilder sb = new StringBuilder();

            for(int i = 0; i < length; ++i) {
                char c = str.charAt(i);
                Character preChar = i > 0 ? str.charAt(i - 1) : null;
                if (Character.isUpperCase(c)) {
                    Character nextChar = i < str.length() - 1 ? str.charAt(i + 1) : null;
                    if (null != preChar && Character.isUpperCase(preChar)) {
                        sb.append(c);
                    } else if (null != nextChar && Character.isUpperCase(nextChar)) {
                        if (null != preChar && symbol != preChar) {
                            sb.append(symbol);
                        }

                        sb.append(c);
                    } else {
                        if (null != preChar && symbol != preChar) {
                            sb.append(symbol);
                        }

                        sb.append(Character.toLowerCase(c));
                    }
                } else {
                    if (sb.length() > 0 && Character.isUpperCase(sb.charAt(sb.length() - 1)) && symbol != c) {
                        sb.append(symbol);
                    }

                    sb.append(c);
                }
            }

            return sb.toString();
        }
    }

最后是beanToMap逻辑,注意有时候会有转化失败问题,会影响到入mybatis的传参,自己处理

 public static Map<String,Object> beanToMap(Object object){
        Map<String,Object> map = null;
        try {
            map = new HashMap<String, Object>();

            BeanInfo beanInfo = Introspector.getBeanInfo(object.getClass());
            PropertyDescriptor[] propertyDescriptors = beanInfo.getPropertyDescriptors();
            for (PropertyDescriptor property : propertyDescriptors) {
                String key = property.getName();
                if (key.compareToIgnoreCase("class") == 0) {
                    continue;
                }
                Method getter = property.getReadMethod();
                Object value = getter!=null ? getter.invoke(object) : null;
                map.put(key, value);
            }
            //key 可能会把自己的class 和hashcode编进去,直接去掉
            map.remove("class");

        } catch (Exception e) {
            e.printStackTrace();
            return new HashMap<>();
        }
        Set<String> set = map.keySet();
        Iterator<String> it = set.iterator();
        while (it.hasNext()){
            String key = it.next();
            if (map.get(key)==null || map.get(key)==""){
                map.remove(key);
                set = map.keySet();
                it = set.iterator();
            }
        }
        if ("false".equals(map.get("emtpy"))){
            logger.error("{}双写前obj转化失败",object);
        }
        return map;
    }

最后讲一下dao,上篇博客也整理过了,就是动态新增修改
在这里插入图片描述
作为消费端使用很方便,只要传表名和map就完全能用,这里注意insert要加上useGeneratedKeys = “true” keyProperty = "id"来获取自增主键,这会自动注入到当前map里

	<insert id="insertTableListDynamic" parameterType="java.util.HashMap" useGeneratedKeys = "true" keyProperty = "id">
        insert into
        ${map.tableName}
        (
        <foreach collection="map.fieldsMap" index="key" item="value"
                 separator=",">
            `${key}`
        </foreach>
        )
        values
        (
        <foreach collection="map.queryMap" index="key" item="value"
                 separator=",">
            #{value}
        </foreach>
        )

    </insert>

    <update id="updateTableListDynamic" parameterType="java.util.HashMap">
        update ${map.tableName}
        <trim prefix="set" suffixOverrides=",">
            <foreach collection="map.queryMap.entrySet()" item="value" index="key" separator=",">
                <choose>
                    <when test="key != 'operation_date' and key != 'out_date' and key != 'return_date' and key != 'sign_date'
                                and key != 'audit_date' and key != 'complaint_date' and key != 'close_date' and key != 'create_date'
                                and key != 'modify_date' and key != 'handle_date' and key != 'create_time' and key != 'update_time'
                                and key != 'update_date' ">
                        <if test="value != null and value !=''">
                            ${key}= #{value}
                        </if>
                    </when>
                    <otherwise>
                        <if test="value!= null ">
                            ${key}= #{value}
                        </if>
                    </otherwise>
                </choose>
            </foreach>
        </trim>
        where id = #{map.queryMap.id}
    </update>
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

不使用binlog,canal,kafka等,只用java+mybatis拦截器来实现项目中的异步双写主从数据库,代码逻辑全整理 的相关文章

随机推荐

  • 【LaTeX学习3】LaTeX中的中文处理方法

    1 在设置中检查 gt 构建命令中的默认编译器为XeLaTeX 2 在编辑器命令中默认字体编码为UTF 8 3 在导言区用 usepackage ctex 引用ctex宏包 4 在命令行在打开宏包和文档的使用方法 代码中详细介绍 5 代码
  • vs2017新建空文件夹报错0x80041FE2解决办法

    原因分析 你无意间删除了安装时的缓冲路径文件夹 请将该文件夹恢复 否则 重新安装软件 缓冲文件夹一般名称为 DownloadBrowser
  • 清华大学:紧耦合的雷达视觉惯性里程计方案(FT-LVIO)

    来自清华大学的Zhou Zhang科研团队在误差状态迭代卡尔曼滤波器的框架内提出了一种新型VIO方案 FT LVIO 一种完全紧耦合的多传感器融合框架 该框架由激光雷达 单目相机和IMU 惯性测量单元 三部分组成 同时融合了三个互补传感器的
  • QT connect使用简单介绍

    如图 首先 connect是线程安全的 其次它有很多重载 当然最重要的还是QT4连接和QT5连接的区别 这个函数重载表示connect函数也是支持lambda函数的 connect const QObject sender PointerT
  • php利用循环链表找猴王

    php利用循环链表找猴王 1 前述 2 php源码 3 html源码 1 前述 1 1实现说明 与array数组的实现思路大同小异 不过链表的实现方式略显繁琐 1 2实现思路 建立一个单向循环链表不断循环查找 查找到指定删除位置 踢出猴子即
  • Java连接mysql,sql语句中含有中文就查询不到结果

    最近写一个小项目 javaee项目连接了mysql数据库 根据用户名 密码却怎么也查不到用户结果 后改用英文的用户名就可以查到了 可见是编码问题 解决方法 连接语句加入如下代码 useUnicode true characterEncodi
  • 华为eNSP--4多个路由器连接配置(静态路由)

    R1路由器的配置 Huawei int e0 0 0 Huawei Ethernet0 0 0 ip add 192 168 1 10 24 Huawei Ethernet0 0 0 int e0 0 1 Huawei Ethernet0
  • cookie和session有什么区别,请你谈谈cookie的缺点

    1 区别 cookie机制采用的是在客户端保持状态的方案 session机制采用的是在服务端保持状态的方案 2 cookie 优点 1 gt 极高的扩展性和可用性 2 gt 通过编程方式 控制保存在cookie中的session对象的大小
  • 进程间的通信方式

    文章目录 一 进程通信概念 二 进程间通信方式 2 1 管道 匿名管道 管道的实质 局限 2 2 有名管道 FIFO 2 3 信号 来源 2 4 消息队列 特点 2 5 共享内存 2 6 信号量 互斥 同步 信号量的实现 2 7 Socke
  • 基本运算电路之---反向比例运算电路(1)

    基本运算电路之 反向比例运算电路 1 我们是从大二上学期开始学习模电 丫丫的 当时一直知道这门学科很重要 励志学好它 嘿嘿 可惜后来发现然并卵 除了知道有二极管 三极管 MOS管 运放 就基本别无其他的事了 电路图的原理 数据计算更是一窍不
  • Linux屏中信息量大,一屏或几屏显示不全怎么办

    Linux屏中信息量大 一屏或几屏显示不全 两种方法 1 加个参数 more 例如 ls l more 这样敲完命令后 屏幕信息会停留在第一页上 再敲回车后 又多显示一行 但如果 信息太多敲回车太麻烦 可以采用第二种方法 2 写入一个文件中
  • linux下eclipse C++ 多线程调试

    初学linux编程 想要用linux下eclipse C 多线程调试 发现相关资料很少 所以想写一篇这样的文章 在这个页面看到 这里 If you use eclipse CDT you probably understand that e
  • 小信号先滤波还是先放大?

    1 是先滤波再放大 还是先放大再滤波 ADI 技术 电子技术论坛 广受欢迎的专业电子论坛 ADI亚洲技术支持中心的同事们给出的建议是 一般是先放大再滤波 这样经过放大器带来的噪声也可以被滤除 您怎么看 我觉得应该看是什么信号以及用什么样的运
  • Quartz 建表语句SQL文件

    Quartz 通过配置初始化数据库 https blog csdn net weixin 44371237 article details 133278217 官网找SQL SQL文件在jar里面 github下载 https github
  • SecureCRT MAC版本的单词跳转

    20210201 引言 最近更换了笔记本 第一次用mac 使用起来还是跟windows有很多不同 之前也已经安装了很多我在windows下的软件 例如securecrt 这个是我一直使用的终端软件 可能很多人在mac下都是使用iterm2
  • Linux系统意外断电无法启动解决方案

    首先看提示哪个盘有问题 如 sda2 fsck y dev sda2 修复完成后键入 reboot 重启电脑后修复OK
  • Gradle SNAPSHOT 版本更新

    转自 https www cnblogs com scoftlin p 9809623 html 在引用Maven 库上的aar 时经常会出现我们更新依赖的库时 Studio 并不能及时将最新的依赖库拉下来 这个因为gradle为了加快构建
  • [机器学习与scikit-learn-46]:特征工程-特征选择(降维)-2-常见的特征降维的方法大全

    作者主页 文火冰糖的硅基工坊 文火冰糖 王文兵 的博客 文火冰糖的硅基工坊 CSDN博客 本文网址 https blog csdn net HiWangWenBing article details 123953894 目录 前言 第1章
  • 服务器安装系统如何收费,服务器系统安装费用

    服务器系统安装费用 内容精选 换一换 根据是否支持高级的SCSI命令来划分磁盘模式 分为VBD 虚拟块存储设备 Virtual Block Device 类型和SCSI 小型计算机系统接口 Small Computer System Int
  • 不使用binlog,canal,kafka等,只用java+mybatis拦截器来实现项目中的异步双写主从数据库,代码逻辑全整理

    项目中因为要迁库 所以我要在原项目中接入我的双写逻辑 确保新旧两个库都有数据写入 假如新库写入失败 旧库数据也能写入 这就确保了重要数据不能丢失 一开始考虑的方案是使用数据同步工具 像是canal或是DTS等 但是环境这块卡的比较死 没有其