canal监听mysql实践

2023-05-16

canal监听mysql实践

canal是用java开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。目前,canal主要支持了MySQL的binlog解析,解析完成后才利用canal client 用来处理获得的相关数据。(数据库同步需要阿里的otter中间件,基于canal)。使用场景包括:

1.缓存更新

2.异步数据库或者同步到关系型数据库的中间媒介

canal介绍及工作原理

基于日志增量订阅&消费支持的业务:

  1. 数据库镜像
  2. 数据库实时备份
  3. 多级索引 (卖家和买家各自分库索引)
  4. search build
  5. 业务cache刷新
  6. 价格变化等重要业务消息

这里也介绍了业务cache刷新和价格变化等重要数据变更消息的监听。

Canal原理相对比较简单:

img

  1. canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议

  2. mysql master收到dump请求,开始推送binary log给slave(也就是canal)

  3. canal解析binary log对象(原始为byte流)

img

Canal架构及工作原理

  1. server 代表一个 canal 运行实例,对应于一个 jvm
  2. instance 对应于一个数据队列 (1个 canal server 对应 1…n 个 instance )
  3. instance 下的子模块
  4. eventParser: 数据源接入,模拟 slave 协议和 master 进行交互,协议解析
  5. eventSink: Parser 和 Store 链接器,进行数据过滤,加工,分发的工作
  6. eventStore: 数据存储
  7. metaManager: 增量订阅 & 消费信息管理器
    img
  • EventSink起到一个类似channel的功能,可以对数据进行过滤、分发/路由(1:n)、归并(n:1)和加工。EventSink是连接EventParser和EventStore的桥梁。
  • EventStore实现模式是内存模式,内存结构为环形队列,由三个指针(Put、Get和Ack)标识数据存储和读取的位置。
  • MetaManager是增量订阅&消费信息管理器,增量订阅和消费之间的协议包括get/ack/rollback,分别为:
  • Message getWithoutAck(int batchSize),允许指定batchSize,一次可以获取多条,每次返回的对象为Message,包含的内容为:batch id[唯一标识]和entries[具体的数据对象]
  • void rollback(long batchId),顾名思义,回滚上次的get请求,重新获取数据。基于get获取的batchId进行提交,避免误操作
  • void ack(long batchId),顾名思议,确认已经消费成功,通知server删除数据。基于get获取的batchId进行提交,避免误操作

docker canal搭建

先在Docker Hub中下载canal-server镜像

docker pull canal/canal-server:latest

先启动Canal,用于复制properties配置文件

docker run -p 11111:11111 --name canal -d canal/canal-server:latest

初次启动Canal镜像后,将instance.properties文件复制到宿主机,用于后续挂载使用

docker cp canal:/home/admin/canal-server/conf/example/instance.properties  /mydata/canal/conf/

修改instance.properties,该文件主要配置监听的mysql实例

#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0

# enable gtid use true/false 未开启gtid主从同步
canal.instance.gtidon=false

# position info 在同一宿主机内 若有主从数据库,填写主数据库地址
canal.instance.master.address=172.17.0.1:3306
#需要读取的起始的binlog文件 不填写的话默认应该是从最新的Binlog开始监听
canal.instance.master.journal.name=
#需要读取的起始的binlog文件的偏移量
canal.instance.master.position=
#需要读取的起始的binlog的时间戳
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

# 从数据库地址 主备切换时使用
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=.*\\..*
# table black regex 不需要监听的名单
canal.instance.filter.black.regex=mysql\\..*,sys\\..*,performance_schema\\..*,information_schema\\..*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config 默认的sql存储队列
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
#canal.mq.partitionsNum=3
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################

Canal为我们提供了canal.instance.filter.regex与canal.instance.filter.black.regex选项参数来过滤数据库表数据解析,类似黑白名单。常见例子有:
●所有表:.* or .\…
●canal schema下所有表:canal\…*
●canal下的以canal打头的表:canal\.canal.*
●canal schema下的一张表:canal\.test1
●多个规则组合使用:canal\…*,mysql.test1,mysql.test2 (逗号分隔)

修改canal.properties,该文件主要时配置canal server

#################################################
#########     destinations      #############
#################################################
##配置监听多数据实例的地方 单数据库监听的话这里配置example就可以
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
# set this value to 'true' means that when binlog pos not found, skip to latest.
# WARN: pls keep 'false' in production env, or if you know what you want.
canal.auto.reset.latest.pos.mode = false

canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ 选择的消费队列
canal.serverMode = tcp

消费队列模式与Server-client模式一致,主要区别如下:

  • 不需要CanalServerWithNetty,改为CanalMQProducer投递消息给消息队列
  • 不使用CanalClient,改为MqClient获取消息队列的消息进行消费

这种模式相比于Server-client模式

  • 下游解耦,利用消息队列的特性,可以支持多个客户端广播消费、集群消费、重复消费等

  • 会增加系统的复杂度,增加一些延迟
    - [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-uLGBhukW-1668440711353)(http://p3-tt.byteimg.com/large/pgc-image/7ba19ac6829f4d4688d0f4e93ab2919f?from=pc)]

    本地的instance.properties:容器的instance.properties 将容器的instance.properties配置文件挂载到宿主机,方便后续变更
    docker stop canal;docker rm canal; 重新生成容器
    docker run -p 11111:11111 --name canal -v /mydata/canal/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties -d canal/canal-server:latest

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jp1I8Fvr-1668440711354)(C:\Users\煎饼果子\AppData\Roaming\Typora\typora-user-images\image-20221113191556796.png)]

查看消费实例example的日志可以看出canal监听的binlog位置正好是连接时的binlog位置,前提是未指定了Binlog的位置。客户端开始连接后便可以从指定位置开始消费增量的binlog。binlog-format=ROW # 选择 ROW 模式.

java客户端实例消费

1.引入pom文件

            <!--canal-->
            <dependency>
                <groupId>com.alibaba.otter</groupId>
                <artifactId>canal.client</artifactId>
                <version>1.1.5</version>
            </dependency>

            <!-- MessageCanalEntry.Entry等来自此安装包 -->
            <dependency>
                <groupId>com.alibaba.otter</groupId>
                <artifactId>canal.protocol</artifactId>
                <version>1.1.5</version>
            </dependency>

2.application.yml配置文件canal

canal:
  serverAddress: 42.192.183.193
  serverPort: 11111
  instance: #多个instance
    - example

对应的properties文件

@Component
@ConfigurationProperties(prefix = "canal")
@Data
public class CanalInstanceProperties {

    /**
     * canal server地址
     */
    private String serverAddress;

    /**
     * canal server端口
     */
    private Integer serverPort;

    /**
     * canal 监听实例
     */
    private Set<String> instance;

}

3.监听数据库变动代码

@Component
@Slf4j
public class MysqlDataListening {

    private static final ThreadFactory springThreadFactory = new CustomizableThreadFactory("canal-pool-");

    private static final ExecutorService executors = Executors.newFixedThreadPool(1, springThreadFactory);

    @Autowired
    private CanalInstanceProperties canalInstanceProperties;


    @PostConstruct
    private void startListening() {
        canalInstanceProperties.getInstance().forEach(
            instanceName -> {
                executors.submit(() -> {
                    connector(instanceName);
                });
            }
        );
    }

    /**
     * 消费canal的线程池
     */
    public void connector(String instance){
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(
                new InetSocketAddress(canalInstanceProperties.getServerAddress(), canalInstanceProperties.getServerPort()),
                instance, "", "");
        canalConnector.connect();
        //订阅所有消息
        canalConnector.subscribe(".*\\..*");
        // canalConnector.subscribe("test1.*"); 只订阅test1数据库下的所有表
        //恢复到之前同步的那个位置
        canalConnector.rollback();

        for(;;){
            //获取指定数量的数据,但是不做确认标记,下一次取还会取到这些信息。 注:不会阻塞,若不够100,则有多少返回多少
            Message message = canalConnector.getWithoutAck(100);
            //获取消息id
            long batchId = message.getId();
            int size = message.getEntries().size();
            if (size == 0 || batchId == -1) {
                try{
                    Thread.sleep(1000);
                } catch (InterruptedException ignored) {
                }
            }
            if(batchId != -1){
                log.info("instance -> {}, msgId -> {}", instance, batchId);
                printEnity(message.getEntries());
                //提交确认
                canalConnector.ack(batchId);
                //处理失败,回滚数据
                //canalConnector.rollback(batchId);
            }
        }
    }

    private  void printEnity(List<CanalEntry.Entry> entries) {
        for (CanalEntry.Entry entry : entries) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
                    || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            CanalEntry.RowChange rowChange = null;
            try{
                // 序列化数据
                rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (InvalidProtocolBufferException e) {
                e.printStackTrace();
            }
            assert rowChange != null;
            CanalEntry.EventType eventType = rowChange.getEventType();
            log.info(String.format("================>; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                     eventType));

            if (rowChange.getEventType() == CanalEntry.EventType.QUERY || rowChange.getIsDdl()) {
                log.info("sql ------------>{}" ,rowChange.getSql());
            }

            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                    switch (rowChange.getEventType()){
                        //如果希望监听多种事件,可以手动增加case
                        case UPDATE:
                            printColumn(rowData.getAfterColumnsList());
                            printColumn(rowData.getBeforeColumnsList());
                            break;
                        case INSERT:
                            printColumn(rowData.getAfterColumnsList());
                            break;
                        case DELETE:
                            printColumn(rowData.getBeforeColumnsList());
                            break;
                        default:
                    }
                }

        }
    }

    private void printColumn(List<CanalEntry.Column> columns) {
        StringBuilder sb = new StringBuilder();
        for (CanalEntry.Column column : columns) {
            sb.append("[");
            sb.append(column.getName()).append(" : ").append(column.getValue()).append("    update=").append(column.getUpdated());
            sb.append("]");
            sb.append("    ");
        }
        log.info(sb.toString());
    }

数据库变动效果

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kaZaI9oH-1668440711355)(C:\Users\煎饼果子\AppData\Roaming\Typora\typora-user-images\image-20221114223319230.png)]

注意的问题**canal client:**为了保证有序性,一份实例(instance)同一时间只能由一个canal client进行get/ack/rollback操作,否则客户端接收无法保证有序。canal server 上的一个 instance 只能有一个 client 消费。clientId是固定的,Binlog文件落入文件保存。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-g1zNll5x-1668440711356)(C:\Users\煎饼果子\AppData\Roaming\Typora\typora-user-images\image-20221113224244554.png)]

由于保证了有序性,生产过快而消费慢的问题,如何解决消费堆积问题

其次在使用Canal自带客户端进行同步时需要自己手动调用get()或者getWithoutAck()进行拉取
拉取日志后进行同步只能一条一条处理,效率比较低
为了解决上面的问题打算在日志同步过程中引入MQ来作为中间同步,Canal支持RocketMQ和Kafka两种,最终选用Kafka来进行

总结

canal的原理是借助mysql主从复制的协议,模拟从数据库拉取增量Binlog日。canal通过Instance作为一个从数据库实例,客户端连接实例后有序消费增量的Binlog日志。有几点特别注意的是,一是canal的生产消费模型是一个带指针的数组,分别指向生产位置、消费位置和ack位置,来控制消费和生产的队列。二是Binlog的配置需要时row格式,canal的解析针对row格式做了适配。三是canal通过client竞争的方式保证消费时只有一个client消费,保证binlog的有序性。四是,生产端数据量大的时候canal会存在消费不及时的问题,存在一定延时性。性能分析时业务binlog入库到canal client拿到数据,基本可以达到10~20w的TPS。具体业务解析时肯定要低于这个,不过对于一般业务来说,已足够用。

参考

https://github.com/luozijing/springLearning 代码仓

https://blog.csdn.net/gudejundd/article/details/119358028 缓存删除解决方案

https://zhuanlan.zhihu.com/p/345736518-ShardingSphere canal详解

https://github.com/alibaba/canal/wiki/%E7%AE%80%E4%BB%8B canal详解

https://github.com/alibaba/canal/wiki/performance canal性能

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

canal监听mysql实践 的相关文章

  • ORDER BY 之后的 GROUP BY

    我需要去做GROUP BY after ORDER BY 我不明白为什么 MySQL 不支持这一点 这是我的代码 SELECT pages id contents id language ORDER BY FIND IN SET langu
  • 如何使用 exec.Command 在 golang 中执行 Mysql 脚本

    您好 我正在尝试执行一个脚本以使用 Golang 将数据填充到数据库中 func executeTestScript cmd exec Command usr local mysql bin mysql h127 0 0 1 P3333 u
  • MySQL 选择第一个字符在哪里

    如何选择单元格的第一个字符并使用它来定义返回的内容 看看MySQL 字符串 和 控制流 功能 http dev mysql com doc refman 5 1 en functions html 例如 SELECT IF LEFT myF
  • 选择查询不适用于使用Parameters.AddWithValue 的参数

    C 中的以下查询不起作用 但我看不出问题所在 string Getquery select from user tbl where emp id emp id and birthdate birthdate cmdR Parameters
  • MySQL 中的 group_concat 性能问题

    我添加了一个group concat到一个查询并杀死了性能 添加之前和之后的解释计划是相同的 所以我对如何优化它感到困惑 这是查询的简化版本 SELECT curRow curRow 1 AS row number docID docTyp
  • 如何告诉node.js mysql没有在默认端口上运行?

    我遇到了与此人类似的问题 连接 ECONNREFUSED 节点 js sql https stackoverflow com questions 8825342 connect econnrefused node js sql 我正在尝试将
  • 选择 mysql 枚举的 php 函数

    因此 我创建了一个函数 它将从数据库中的枚举字段中提取值
  • Mysql 在给定日期时间范围内插入随机日期时间

    使用 SQL 我可以在给出范围的列中插入随机日期时间值吗 例如 给定一个范围2010 04 30 14 53 27 to 2012 04 30 14 53 27 我对范围部分感到困惑 因为我刚刚做了这个 INSERT INTO someta
  • 插入 Mysql 表时防止 Json 排序

    在发送 AJAX 请求时 Json Content 的重新排序已经是一个已知问题 但我不知道在将 Json content 插入 JSON 类型的 Mysql 表时也会发生同样的情况 在这种情况下 mysql 服务器在保存之前也会对其内容进
  • 使用 PHP 显示 Mysql 中的图像

    这就是我的数据库中的表的样子 我正在尝试显示我存储的图像 它是 mimetype longblob 当我运行代码时 它会给我一个带有 的小框 没有错误 只是那个框 有谁知道错误是什么以及如何修复它 Display Index Display
  • MySql 复合索引

    我们使用 MySql 作为我们的数据库 以下查询在 mysql 表 大约 2500 万条记录 上运行 我在这里粘贴了两个查询 查询运行得太慢 我想知道更好的复合索引是否可以改善这种情况 你知道最好的综合指数是什么吗 并建议我这些查询是否需要
  • 使用java在mysql中插入带有\\的文件路径

    我正在使用java制作一个独立的应用程序 并且我需要插入用户从文件选择器中选择的图像的路径 我正在获取文件的路径 但是当我将其存储在数据库 mysql 中时 它不会存储 所以当我检索该路径时 该文件不会显示 如何存储文件的路径 这样就可以使
  • 如何使用 SQL - INSERT...ON DUPLICATE KEY UPDATE?

    我有一个脚本可以捕获推文并将其放入数据库中 我将在 cronjob 上运行脚本 然后在我的网站上显示数据库中的推文 以防止达到 Twitter API 的限制 所以我不想在我的数据库中有重复的推文 我知道我可以使用 INSERT ON DU
  • 尝试在 Mac OSX 上的 virtualenv 和 MySQL 中安装 Django CMS 时出错

    当我尝试使用 virutalenv 安装带有 MySQL 的 django CMS 时 出现以下错误 RuntimeError maximum recursion depth exceeded Users ethan Sites env b
  • 如何限制mySQL中的搜索和替换字符串

    我用它来搜索和替换 mySQL 中的字符串 UPDATE products SET prodname REPLACE prodname S S 这些产品包含诸如 TYLENOL TABS 100 S 之类的字符串 我想将其转换为 TYLEN
  • MySQL 行级锁

    我不确定行级锁是如何工作的 但这是我的问题 我有一个表 T id int balance int engine InnoDB 我想锁定 ID 1 的行 所以我开始一个像这样的事务 start transaction select from
  • Yii2 从 MySQL 中的表登录的分步指南

    我开始在 Yii2 中迈出第一步 到目前为止 我已经能够编写一个应用程序并将数据库中的表连接到它 就像我在 Yii1 中学到的那样 该表是contacts我的创建视图中的表单将数据发送到数据库 没有任何问题 问题是我只能在 Yii2 内置的
  • MySQL如何在没有过程/函数的情况下执行命令块

    我尝试在 MySQL Workbench 上运行一段 SQL 命令 就像在 SQL Server 上一样 但它告诉我 声明在此位置无效 我在网上看到了各种这样的例子 我真的不明白为什么会出现这个错误 一些提示 代码 其中 SQL Serve
  • PhpMyAdmin 导出不包括 mysqldump 中的主键

    用PhpMyAdmin导出同一个表的结构 DROP TABLE IF EXISTS test apprentis CREATE TABLE IF NOT EXISTS test apprentis a id smallint 10 NOT
  • sql连接一个表中的两个字段

    我有一个预订表 其中有两个人 我想将 person 1 作为一行返回 将 person 2 作为新行返回 但该人的 id 与人员表相关 这是我所得到的 但没有提取预订信息 SELECT people FROM select booking

随机推荐