springboot+canal+mysql+redis缓存双写一致性

2023-11-14

canal官网地址:https://github.com/alibaba/canal/wiki/QuickStart
基本上按照官网的步骤来就行
在这里插入图片描述

准备

首先服务器上要安装好jdk,因为canal运行需要jdk,同时把canal对应的端口在服务中开放,否则连接不上

对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

查看是否开启binlog日志功能

SHOW VARIABLES LIKE 'log_bin';

如果为OFF说明没有开启

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

注意点,这里执行命令最好是在服务器端执行,因为我们通过连接客户端登录的自己创建的后台用户比如root ,没有授权权限,会出错
出现以下情况
在这里插入图片描述

DROP USER IF EXISTS 'canal'@'%'; #删除用户
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';   ## 创建用户
GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';  
FLUSH PRIVILEGES;
SELECT * FROM mysql.user;

启动

下载 canal, 访问 release 页面 , 选择需要的包下载, 我这里下载最新的

https://github.com/alibaba/canal/releases/tag/canal-1.1.6

解压缩

上传服务器选择一个位置,并解压指定的文件夹下面

mkdir /tmp/canal
tar zxvf canal.deployer-$version.tar.gz  -C /tmp/canal

解压之后

解压完成后,进入 /tmp/canal 目录,可以看到如下结构

drwxr-xr-x 2 jianghang jianghang  136 2013-02-05 21:51 bin
drwxr-xr-x 4 jianghang jianghang  160 2013-02-05 21:51 conf
drwxr-xr-x 2 jianghang jianghang 1.3K 2013-02-05 21:51 lib
drwxr-xr-x 2 jianghang jianghang   48 2013-02-05 21:29 logs

配置修改

vi conf/example/instance.properties
canal.instance.connectionCharset 代表数据库的编码方式对应到 java 中的编码类型,比如 UTF-8,GBK , ISO-8859-1
如果系统是1个 cpu,需要将 canal.instance.parser.parallel 设置为 false

## mysql serverId
canal.instance.mysql.slaveId = 1234 #默认是注释的,
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306 
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 
#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal  #canal默认用户名
canal.instance.dbPassword = canal  #canal默认密码 
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\*\\\\..\*  #监听的表,默认监听所有表

启动

sh bin/startup.sh

查看 server 日志

vi logs/canal/canal.log

2013-02-05 22:45:27.967 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2013-02-05 22:45:28.113 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]
2013-02-05 22:45:28.210 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

在这里插入图片描述

查看 instance 的日志

vi logs/example/example.log

2013-02-05 22:50:45.636 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2013-02-05 22:50:45.641 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2013-02-05 22:50:45.803 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
2013-02-05 22:50:45.810 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....

DEMO

依赖

 <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>


        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.73</version>
        </dependency>


        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.3.10</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

    

        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
        </dependency>

       

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <!-- 导入mysql驱动包-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.11</version>
        </dependency>
        <!-- 导入数据源依赖-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.1.9</version>
        </dependency>

     


        <!--canal-->


        <dependency>
        <groupId>top.javatool</groupId>
        <artifactId>canal-spring-boot-starter</artifactId>
        <version>1.2.1-RELEASE</version>

        </dependency>




    </dependencies>

代码

配置文件

server:
  port: 8081
spring:
  datasource:
    url: jdbc:mysql://42.192.46.136:3306/test?useUnicode=true&useSSL=false&characterEncoding=utf-8
    username: root
    password: lkzroot
    type: com.alibaba.druid.pool.DruidDataSource
  redis:
    password: 123456
    host: 42.192.43.136
    port: 16379
    lettuce:
      pool:
        max-active: 200
        max-idle: 10
        max-wait: -1ms
        min-idle: 3

测试类

package com.boot.canal.client;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import redis.clients.jedis.Jedis;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
 * @Author: lkz
 * @Title: RedisCanalClientExample
 * @Description: TODO
 * @Date: 2023/9/18 14:06
 */

public class RedisCanalClientExample {


    public static final Integer _60SECONDS = 60;
    public static final String  Canal_host_ip = "127.0.0.1";

    private static void redisInsert(List<CanalEntry.Column> columns)
    {
        JSONObject jsonObject = new JSONObject();
        for (CanalEntry.Column column : columns)
        {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
            jsonObject.put(column.getName(),column.getValue());
        }
        if(columns.size() > 0)
        {
            try(Jedis jedis = RedisUtils.getJedis())
            {
                jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

    private static void redisDelete(List<CanalEntry.Column> columns)
    {
        JSONObject jsonObject = new JSONObject();
        for (CanalEntry.Column column : columns)
        {
            jsonObject.put(column.getName(),column.getValue());
        }
        if(columns.size() > 0)
        {
            try(Jedis jedis = RedisUtils.getJedis())
            {
                jedis.del(columns.get(0).getValue());
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

    private static void redisUpdate(List<CanalEntry.Column> columns)
    {
        JSONObject jsonObject = new JSONObject();
        for (CanalEntry.Column column : columns)
        {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
            jsonObject.put(column.getName(),column.getValue());
        }
        if(columns.size() > 0)
        {
            try(Jedis jedis = RedisUtils.getJedis())
            {
                jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());
                System.out.println("---------update after: "+jedis.get(columns.get(0).getValue()));
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

    public static void printEntry(List<CanalEntry.Entry> entrys) {
        for (CanalEntry.Entry entry : entrys) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            CanalEntry.RowChange rowChage = null;
            try {
                //获取变更的row数据
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error,data:" + entry.toString(),e);
            }
            //获取变动类型
            CanalEntry.EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));

            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.INSERT) {
                    redisInsert(rowData.getAfterColumnsList());
                } else if (eventType == CanalEntry.EventType.DELETE) {
                    redisDelete(rowData.getBeforeColumnsList());
                } else {//EventType.UPDATE
                    redisUpdate(rowData.getAfterColumnsList());
                }
            }
        }
    }


    public static void main(String[] args)
    {
        System.out.println("---------O(∩_∩)O哈哈~ initCanal() main方法-----------");

        //=================================
        // 创建链接canal服务端
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(Canal_host_ip, 11111),
                "example",
                "",
                "");
        int batchSize = 1000;
        //空闲空转计数器
        int emptyCount = 0;
        System.out.println("---------------------canal init OK,开始监听mysql变化------");
        try {
            connector.connect();
            //connector.subscribe(".*\\..*");
            connector.subscribe("test.test1");
            connector.rollback();
            int totalEmptyCount = 10 * _60SECONDS;
            while (emptyCount < totalEmptyCount) {
                System.out.println("我是canal,每秒一次正在监听:"+ UUID.randomUUID().toString());
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
                } else {
                    //计数器重新置零
                    emptyCount = 0;
                    printEntry(message.getEntries());
                }
                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }
            System.out.println("已经监听了"+totalEmptyCount+"秒,无任何消息,请重启重试......");
        } finally {
            connector.disconnect();
        }
    }


}

redis配置

package com.boot.canal.client;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;


public class RedisUtils
{
    public static final String  REDIS_IP_ADDR = "127.0.0.1";
    public static final String  REDIS_pwd = "123456";
    public static JedisPool jedisPool;

    static {
        JedisPoolConfig jedisPoolConfig=new JedisPoolConfig();
        jedisPoolConfig.setMaxTotal(20);
        jedisPoolConfig.setMaxIdle(10);
        jedisPool=new JedisPool(jedisPoolConfig,REDIS_IP_ADDR,16379,10000,REDIS_pwd);
    }

    public static Jedis getJedis() throws Exception {
        if(null!=jedisPool){
            return jedisPool.getResource();
        }
        throw new Exception("Jedispool is not ok");
    }

}

结果

修改表数据

在这里插入图片描述

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

springboot+canal+mysql+redis缓存双写一致性 的相关文章

  • 如何绑定值 INSERT INTO mysql perl

    我有下面的代码可以工作 但我需要知道如何绑定它们以确保安全 如果我只是将 new row 替换为 并将其放入执行中我收到错误 感谢您的帮助 foreach my field account field order new row param
  • MySQL如何从多个表中获取数据

    我正在寻找 php MySQL jquery 的帮助 我有2张桌子 table1表 1 有 4 列 id 标题 desc thumb img tabel2表 2 有 3 列 id 表 id img 我只想将 2 个表与 get QS 的值进
  • 当php脚本通过ajax运行时显示进度条

    我有一个通过 ajax 向服务器提交值的表单
  • 使用 PHP 和 MySQL 的服务器端事件

    我正在使用 PHP 和 MySQL 构建一个 非常基本的 应用程序 该应用程序的目的是在网页上显示 实时 数据交易 这些交易来自于transactionsMySQL 数据库中的表 到目前为止 我可以在网页上检索并显示数据 不过我期待看到数据
  • 使用 Python 2.7 和 MySQLdb 将二进制数据插入 MySQL 中的 blob 列时如何避免编码警告

    我在将二进制数据插入到longblob使用 Python 2 7 中的 MySQLdb 在 MySQL 中的列 但我收到一个编码警告 我不知道如何解决 test py 11 Warning Invalid utf8 character st
  • 语言翻译语法

    我正在尝试为我的项目添加另一种语言 我们知道语言可以表现出主语和谓语的差异 例如 英语 Mustafa和他的朋友去看电影ahmet today 土耳其 Mustafa布昆 阿卡达西ahmetile birlikte sinemaya git
  • 解码Json数据数组并插入到mysql

    这个问题可能已经在这里问过 但我尝试搜索找不到它 我有如下 Json 数据 CityInfo CityCode 5599 Name DRUSKININKAI CityCode 2003 Name KAUNAS CityCode 2573 N
  • MySQL获取最后10条记录中的第一条记录

    在Mysql中 我试图获取最后10条记录中最旧的记录 为了得到最后 10 个我会简单地做SELECT FROM table ORDER BY id DESC LIMIT 10 为了获得最旧的 我只需使用 ASC 顺序 我需要首先按 DESC
  • PHP:使用输入和输出参数(不是“INOUT”)调用 MySQL 存储过程

    我想从 PHP 调用 MySQL 中的存储过程 该过程需要输入and输出参数 not INOUT 参数 举一个简单的例子 假设我在 MySQL 中有以下存储过程 DELIMITER DROP PROCEDURE IF EXISTS test
  • 如何在 MariaDB 10 中启用大索引?

    在 Debian Jessie 中 我安装了 MariaDB 服务器 10 0 30 并尝试增加最大密钥长度 AFAIU 这取决于配置参数innodb large prefix正在启用 根据docs https mariadb com kb
  • WooCommerce:在数据库中查找产品

    我正在使用 WooCommerce 创建一个网站 我想根据用户在主页搜索表单中输入的邮政编码来限制用户可用的产品 为了能够实现这一目标 我必须在 phpMyAdmin 的数据库中指定每个产品的条件 但我似乎找不到它 有谁知道 phpmyAd
  • 基于列顺序的查询速度

    数据库中列类型的顺序对查询时间有影响吗 例如 具有混合排序 INT TEXT VARCHAR INT TEXT 的表的查询速度是否会比具有连续类型 INT INT VARCHAR TEXT TEXT 的表慢 答案是肯定的 这确实很重要 而且
  • 如何在 WP_Query 中按日期排序?

    我已经尝试过这种方式但是orderby and order不适用于 WP Query 类 posts new WP Query array post type gt block code orderby gt post date order
  • 需要有关使用 PHP 在 mysql 数据库中插入逗号分隔数据的帮助

    数据库表中已有的演示数据 INSERT INTO csvtbl ID SKU Product Name Model Make Year From Year To VALUES 1 C2AZ 3B584 AR Power Steering P
  • 使用 EXPLAIN 进行 MYSQL 存储过程调用

    如何分析和使用 EXPLAIN 来调用我的存储过程 我需要优化查询时间 但是似乎没有地方可以执行 EXPLAIN 调用 proc name 你可以试试 set profiling 1 call proc name show profiles
  • MySql 查询在选择中将 NULL 替换为空字符串

    如何用空字符串替换 select 中的 NULL 值 输出 NULL 值看起来不太专业 这是非常不寻常的 根据我的语法 我希望它能够工作 我希望能得到一个解释 为什么没有 select CASE prereq WHEN prereq IS
  • PDO fetch() 失败时会抛出异常吗?

    有没有方法PDO语句 fetch http php net manual en pdostatement fetch php如果 PDO 错误报告系统设置为抛出异常 则在失败时抛出异常 例如 如果我设置 PDO ATTR ERRMODE g
  • MySQL 触发器和 SUM()

    我有两张桌子 学生桌和家庭桌 在学生中 我有列 st venue 和total venue 家里我有收入 Total Revenue 是学生 st 收入与家庭收入之和 其中 family id student student id stud
  • Spark SQL/Hive 查询通过 Join 永远持续下去

    所以我正在做一些应该很简单的事情 但显然它不在 Spark SQL 中 如果我在 MySQL 中运行以下查询 查询将在不到一秒的时间内完成 SELECT ua address id FROM user u inner join user a
  • 更改Docker容器中的mysql密码

    我如何更改 docker 容器中的 root 密码 因为一旦我停止 mysql 服务 容器就会自动停止 我应该停止 mysql 容器并部署一个新容器吗 您可以使用正在运行的容器更改它docker exec session https doc

随机推荐

  • 怎么找到一个好名字idea插件开发

    VFS简介 虚拟文件系统 VFS 是一个Intellij Platform组件 它封装了大部分对活动文件的处理操作 为了达成以下目的 提供一个处理文件的通用API 而不关心文件的具体位置 无论文件位于磁盘上 归档文件中还是HTTP服务器上
  • Java基础_JDK下载安装与配置

    文章目录 开始前准备 JDK下载 JDK安装 环境配置 验证环境配置是否成功 开始前准备 在配置JDK之前 我们需要先下载并安装好JDK 若已下载则可以跳过该环节 JDK和JRE的区别 JRE是运行环境 程序运行必需 JDK是开发环境 包含
  • Mysql—C语言API接口

    Mysql C语言API接口 一 mysql arp访问数据的操作流程 1 初始化mysql操作句柄 2 连接mysql服务器 3 设置mysql客户端字符集 保持与服务器一致 4 选择要操作的数据库 5 定义sql语句 并且执行语句 6
  • 超级楼梯

    有一楼梯共M级 刚开始时你在第一级 若每次只能跨上一级或二级 要走上第M级 共有多少种走法 Input 输入数据首先包含一个整数N 表示测试实例的个数 然后是N行数据 每行包含一个整数M 1 lt M lt 40 表示楼梯的级数 Outpu
  • 战双服务器信息保留多久,《战双帕弥什》6月10日服务器异常处理进度及补偿方案...

    尊敬的各位指挥官 真的十分抱歉 傍晚的服务器异常给大家带来了不好的游戏体验 我们恳请指挥官的原谅 经过慎重的考虑 我们公布本次服务器异常问题的处理方案如下 1 星火服 问题描述 部分指挥官发生了不同程度的数据异常 处理方案 我们决定统一将星
  • STM32——SD卡实验(SDIO方式)

    一 SD卡简介 1 什么是SD卡 SD卡 Secure Digital Memory Card 即 安全数码卡 它是在MMC的基础上发展而来 是一种基于半导体快闪记忆器的新一代记忆设备 它被广泛的用于便携装置上使用 例如数码相机 个人数码助
  • 【二分法】剑指offer:二维数组中的查找

    对于二维数组中每一个一维数组 用二分查找 判断能否找到该数 leetcode专题笔记 二分法查找 1 附leetcode题 m0 52043808的博客 CSDN博客 代码 class Solution public bool binsea
  • react useState自定义hook函数(管理多组件公共状态)

    效果图如下 代码如下 div div
  • 抓取房地产信息:一个Python爬虫实战案例

    目录 目录 1 准备工作 2 分析网页结构 3 编写爬虫 4 提取房源信息 5 输出结果
  • 结合Simulink仿真聊聊PID--理论部分

    如何理解PID PID这个名词一般是电子信息类的学生都不会陌生 它是控制领域非常常见的算法 并且经久不衰 P 比例 I 积分 D 微分 为什么需要用到PID呢 打一个比方 一个12v的电池装到小车上 设置50 的占空比 相当于在小车电机上的
  • document.get:fail Error: cannot find document with _id xx, please make sure that the document exists

    1 解决方案 修改权限 或 新建此数据集 2 详细解读 小程序入门云开发的时候总是会遇到这样的问题 而且很坑很难发现 Error errCode 1 errMsg document get fail Error cannot find do
  • CTFhub技能树_Web RCE

    一 eval执行 1 分析 打开网站显示如下代码 大体意思是 判断cmd是否被设置 若cmd被赋值 则执行如下语句 否则就继续显示以上代码 eval REQUEST cmd 其中 eval 该函数可以把字符串作为PHP代码执行 REQUES
  • Windows上的RocketMQ安装以及测试

    废话不多说 步骤一 下载安装RocketMQ 地址 RocketMQ 官方网站 RocketMQ 步骤二 配置环境解压启动RocketMQ 解压后的目录 配置环境 启动bin目录里的文件 请使用cmd去启动 start mqnamesrv
  • mongodb的c++接口的说明

    mongodb的c 接口的说明 作者 habadog 日期 2011 年 08 月 02 日 发表评论 3 查看评论 mongodb c 接口说明 说明 IN表示输入参数 OUT表示输出参数 1 构造函数 DBClientConnectio
  • MySQL 逗号分隔,字符串拆分(横转竖)

    文章目录 一 含分隔符的字符串拆分 1 数字拆分 2 字段拆分 也就是行转列 二 涉及函数 1 字符串拆分 SUBSTRING INDEX str delim count 2 替换函数 replace str from str to str
  • node(npm)配置vue时出现 vue不是内部或外部命令!!!!!!

    网上找了很多资料 基本是说对了的 主要是环境问题 path 解决办法 用Everything这个软件搜索vue cmd的位置 搜索到这个批处理文件后把这个文件的路径加入Path中就行了 重新启动cmd执行 vue 看到下面的就说明 path
  • 生产注意事项

    目录 1 可用性 2 兼容性特性 3 运行时限制 4 OPLOG大小限制 5 WiredTiger缓存 6 事务和安全性 7 分片配置限制 8 分片集群和仲裁器 9 三成员主次仲裁器架构 10 获取锁 11 待处理的DDL操作和事务 12
  • COMS原理及门电路设计

    目录 1 N P MOS管的物理结构图 2 N P MOS管的工作原理 3 N P MOS管的抽象模型 4 典型门电路设计 1 cmos反相器设计 2 coms与非门与或非门设计 3 与或非门 或与非门设计 4 异或 同或设计 5 设计方法
  • Retroft各个版本的jar包和源码下载地址

    Retrofit各个版本下载 download jar包下载 http 101 96 8 155 central maven org maven2 com squareup retrofit2 retrofit 2 5 0 retrofit
  • springboot+canal+mysql+redis缓存双写一致性

    canal官网地址 https github com alibaba canal wiki QuickStart 基本上按照官网的步骤来就行 准备 首先服务器上要安装好jdk 因为canal运行需要jdk 同时把canal对应的端口在服务中