使用canal配合rocketmq监听mysql的binlog日志

2023-05-16

目录

一. 安装配置canal

1.1 安装canal

1.2 配置canal基本属性

1.3 配置canal的mysql

二. mysql配置

2.1 开启mysql的binlog日志

2.2 配置 canal 专用用户

2.3 启动canal

三. java代码的实现

3.1 mq消费者

3.2 运行模拟


一. 安装配置canal

1.1 安装canal

        canal由阿里开源到了github, 可在此页面 github地址, 下载开发版本的压缩包, 由于github下载巨慢, 也可以自己寻找国内资源

         之后解压缩, canal 的文件目录如下

1.2 配置canal基本属性

        进入到 conf 文件夹, 目录结构如下


        先进行基础的配置, vim命令进入到 canal.properties 文件, 配置 canal 服务的ip地址, 以及端口号

         之后配置 canal 可视化控制台, 我这里没安装, 所以暂时不配置

        由于本次是配合 rocketMQ 使用, 所以在下面的配置中找到 canal.serverMode, 并设置为 rocketMQ 

        之后, 进入到关键的配置, 配置 rocketMQ 各项属性

                rocketmq.producer.group : 生产者分组, 和在代码中指定的 group 的含义一致       

                rocketmq.namesrv.addr : mq 的 nameSvr 的ip地址

                rocketmq.tag : 生产者生产消息时, 指定的 tag

1.3 配置canal的mysql

        进入 conf 下的 example 文件夹, 目录如下, vim 命令进入 instance.properties 

        几个比较重要的配置

         canal.instance.mysql.slaveId : 因为 canal 的工作原理是伪装成 mysql 的 slave, 所以这里需要设置 slave 的 id

        canal.instance.master.address : 数据库的 ip 地址以及端口号        

        canal.instance.dbUsername : 要监听的数据库的登录用户名

        canal.instance.dbPassword : 数据库的登录密码 

        canal.instance.filter.regex : 要监听的数据库表, 正则表达式匹配模式, 没配置就是全部监听 

        canal.mq.topic : mq 生产者的发送消息时, 指定的 topic, 和 java代码的含义一致

二. mysql配置

2.1 开启mysql的binlog日志

        由于 canal 的工作原理, 所以需要开启 mysql 的 binlog 日志, vim 命令编辑 etc下的 my.cnf 文件, 在  mysqld 下添加如下配置 :

                log-bin : 配置 binlog 日志文件目录

                binlog-format : 配置 binlog 日志文件的格式

                server_id : 配置 mysql 主节点的 id, 不能和集群中id, 或是从节点中的 id 重复, 该 id 不能和 canal 的 slaveId 重复

        之后查看 mysql binlog 的开启状态, show variables like '%log_bin%', 可以看到是开启状态

2.2 配置 canal 专用用户

        在 canal 配置文件中, 我使用的 root 用户登录的, 所以不在给 mysql 配置 canal 用户, 可参考其他博客

2.3 启动canal

        进入到bin文件夹, 目录如下, 使用命令  ./startup.sh 启 canal, 特别注意, 如果服务器内存不充足, 可以先使用 vim 编辑 startup.sh, 把文件中的 java参数的使用内存设置小一点

        之后观察 canal.log 日志, 可以看到启动成功  

         查看 rocketmq_client.log 日志, 发现 canal 一直在向 mq 发送心跳检测, 并输出了 mq 的 group, 实例id信息

         登录 rocketMQ 可视化控制台, 可以看到 canal 注册到 mq 中的生产者实例, 以及 topic 信息, 这些信息和我们之前在 canal 的配置文件中配置的一致 

 

        至此, 关于服务器中 canal 的配置介绍完毕, 下面用java代码实现 mq 消费者监生产者的信息

三. java代码的实现

3.1 mq消费者

        由于 canal 使用的 mq 模式, 他可以说是一个 mq 生产者, 所以我们需要定义 mq 的消费者, 代码如下, 消费者要监听的 topic 和 tag 和 canal 配置文件中配置的一致

消费者抽象类

/**
 * @author canxiusi.yan
 * @description AbstractProducer 抽象SysLog抽象父类
 * @date 2022/5/10 10:03
 */
public abstract class AbstractBinLogConsumer {

    /**
     * 子类通用日志
     */
    protected static final Logger logger = LoggerFactory.getLogger(AbstractBinLogConsumer.class);

    private DefaultMQPushConsumer consumer;

    private final static String GROUP_NAME = "canal-rocketmq-es%canal";

    /**
     * 初始化mq消费者
     */
    @PostConstruct
    public void initConsumer() {
        // 相同的业务消费者分到同组, 需要指定相同的tag,
        consumer = new DefaultMQPushConsumer(GROUP_NAME);
        consumer.setNamesrvAddr("ip:port");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.setInstanceName("bin_log_consumer");
        consumer.setPullInterval(1000);
        // 设置每次从队列中拉取的消息数
        consumer.setPullBatchSize(1000);
        try {
            consumer.subscribe("binlog_topic", "binlog_es_tag");
        } catch (MQClientException e) {
            logger.error("[binlog消费者初始化异常]", e);
            System.exit(-1);
        }
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            if (CollectionUtils.isEmpty(msgs)) {
                logger.info(LogUtils.format("[消息列表为空], msgs=<{0}>", msgs));
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
            for (MessageExt msg : msgs) {
                try {
                    handlerMsg(msg);
                } catch (Exception e) {
                    logger.error("[日志消费处理异常]", e);
                }
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        logger.info(LogUtils.format("[binlog消费者初始化完成], consumer=<{0}>", consumer));
    }

    /**
     * 启动消费者
     *
     * @param event
     */
    @EventListener(ApplicationPreparedEvent.class)
    public void startSysLogConsumer(ApplicationPreparedEvent event) {
        try {
            consumer.start();
        } catch (MQClientException e) {
            logger.error(LogUtils.format("[binlog消费者start异常], nameServerAddress={}"), e);
            System.exit(-1);
        }
    }

    @PreDestroy
    public void closeConsumer() {
        consumer.shutdown();
    }

    /**
     * 解析binlog日志
     * @param msg
     */
    protected abstract void handlerMsg(MessageExt msg);

}

 消费者实现类, 这里只把监听的消息, 做下日志打印

/**
 * @author canxiusi.yan
 * @description BinlogConsumer
 * @date 2022/6/28 15:07
 */
@Component
public class BinlogConsumer extends AbstractBinLogConsumer {

    /**
     * 日志
     */
    private static final Logger logger = LoggerFactory.getLogger(BinlogConsumer.class);

    @Override
    protected void handlerMsg(MessageExt msg) {
        JSONObject jsonObject = JSONObject.parseObject(new String(msg.getBody()));
        // 影响到几条sql, 就会有几条日志
        logger.info(LogUtils.format("监听到binlog日志, json=<{0}>", jsonObject));
    }
}

3.2 运行模拟

        运行程序, 登录 rocketmq 控制台, 发现消费者已经注册成功

        我这里使用 postman 调用新增数据的接口, 向 db 插入一条sql, 流程:  向db插入数据, 生成 binlog 日志, 把日志同步给伪装成了从节点的 canal, 然后把 binlog 日志发送给 mq, 之后代码中的消费者监听到数据, 观察日志打印, 可以发现, 已经监听到了消息

        格式化消息后, 可以看到, 本次 db 变更的类型, 变更的数据, 都是能获取到的, 后面就可以做很多事情了, 比如把数据同步到 redis, es搜索引擎等

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用canal配合rocketmq监听mysql的binlog日志 的相关文章

  • 将程序存储在 phpMyAdmin 中

    我必须将存储过程添加到 MySQL 数据库 问题是托管提供php我的管理员来管理数据库 我在网上搜索了一下 想法是运行创建程序的MySQL本机语句 但由于程序的代码通常可能有 我们必须更改 MySQL 中的分隔符 php我的管理员没有这个选
  • 子查询与连接

    我重构了从另一家公司继承的应用程序的一个缓慢部分 以使用内部联接而不是子查询 例如 WHERE id IN SELECT id FROM 重构后的查询运行速度提高了约 100 倍 50 秒到 0 3 我预计会有改进 但谁能解释为什么它如此剧
  • 从数据库中给定时间起经过的时间

    我有一个 HTML 表 其中包含从数据库中提取的记录 我正在使用 PHP MySQL 我的表中名为 Timer 的列未从数据库中检索 我需要在此处显示经过的时间 从数据库中的特定时间开始 例如 假设现在的时间是2013年2月21日下午6点2
  • 优化mysql中日期类型字段的查询

    我目前准备了以下查询 select sum amount as total from incomes where YEAR date 2019 and MONTH date 07 and incomes deleted at is null
  • 如何使组合键唯一?

    I am making a database of students in one school Here is what I have so far 如果您不喜欢阅读 请跳至 简而言之 部分 问题是我对这个设计并不满意 我想要的组合gra
  • mysql jdbc 与 SSL 连接在 tls 握手级别失败

    我们的 mysql 服务器配置为仅接受与 ssl 密码 DHE RSA AES256 GCM SHA384 的连接 我正在使用 java mysql connector java 8 0 15 和 java 8 openjdk 版本 1 8
  • AttributeError:尝试在 python 中运行 sqlalchemy 来管理我的 SQL 数据库时,“Engine”对象没有属性“execute”

    我有以下代码行不断给我一个错误 即引擎对象没有对象执行 我认为我一切都对 但不知道接下来会发生什么 似乎其他人也遇到了这个问题 重新启动他们的笔记本电脑就可以了 我正在使用 Pycharm 并已重新启动但没有任何解决方案 任何帮助是极大的赞
  • MySQL - 从表中删除空值行

    我有一张桌子 user 有超过 60 列 其中一列的名称是 用户名 我想删除其中的行username字段为空或NULL 我怎样才能做到这一点 谢谢你 Try this DELETE FROM user WHERE username IS N
  • 尝试通过比较不同的表从 SQL 查询输出正确的值

    我对 SQL 非常陌生 需要有关如何使用正确的查询完成此任务的帮助 我有 2 张桌子需要使用 表 TB1 有 id Name 1 bob 2 blow 3 joe 表 TB2 有 compid property 1 bob 2 blow 我
  • 如何在php中正确显示另一种语言的mysql表数据

    我有一个 mySQL 表 其中一列中的数据采用英语以外的语言 波斯语 当我在表中输入数据时 它会正确显示 但是当我想在 php 文件中显示数据时 它会显示如下 好吧 我应该怎么做才能以正确的形式显示数据 由于我经常使用 非英语 字符 因此要
  • 使用 cfchart 标签在单个饼图中显示多个查询的数据

    请考虑以下代码 现在我的代码中有以下代码 cfm页面内的 tag DataSource xx xx x xx Name of the database sgemail Name of the relevant column event vc
  • PHP MySQL 查询带有 %s 和 %d

    SELECT COUNT AS test FROM s WHERE id d AND tmp mail lt gt 什么是 s and d for 这些是使用的格式符号 例如经过sprintf 例子 Output SELECT COUNT
  • 使用 Hibernate 和 MySQL、全局和本地进行 Spring 事务管理

    我正在使用 MySQL Server 5 1 Spring 3 0 5 和 Hibernate 3 6 开发 Web 应用程序 我使用 Springs 事务管理 我是新手 所以如果我问一个容易回答的问题 请耐心等待 1 我读到了有关全局 x
  • 从所有表中选择

    我的数据库中有很多表都具有相同的结构 我想从所有表中进行选择 而不必像这样列出所有表 SELECT name FROM table1 table2 table3 table4 我尝试过 但这不起作用 SELECT name FROM 有没有
  • MySQL Workbench 忽略外键

    在处理 MySQL Workbench 中的 SQL 编辑器时 我偶然发现了一些奇怪的事情 其中 执行似乎忽略了外键约束 这是一个例子 create database testdb use testdb create table t1 te
  • 在 MySQL 中插入时检查并防止相似字符串

    简要信息 我有3张桌子 Set id name SetItem set id item id position TempSet id 我有一个函数可以生成新的随机组合Item桌子 基本上 总是在成功生成之后 我在中创建一个新行Set表 获取
  • Laravel:使用 Faker 播种多个独特的列

    介绍 怎么样 伙计们 我有一个关于模型工厂和多个独特列的问题 背景 我有一个名为 Image 的模型 该模型将语言支持存储在单独的模型中 图片文字 图片文字 has an image id栏 语言栏和文本栏 图片文字有一个约束MySQL那个
  • 如何在php中根据url从mysql获取数据?

    我在 mysql 数据库中有一个页表 其中包含 page name title content author 字段 我想用 php 来获取它http www domain com index php page page name http
  • 数据库级别的别名列名 [MySQL]

    别名 可能是错误的词 因为它是在将列 表名称作为查询中的其他名称引用的上下文中使用的 我感兴趣的是是否有一种方法可以在数据库中为列指定两个名称 如果我要打印这样的表格 它看起来会是这样的 mysql gt SELECT FROM User
  • 使用数据库进行日志记录

    大多数日志似乎都是纯文本形式 而不是放入 MySQL 其他类型的数据库中 这是否有原因 在我看来 将它们放入数据库将使分析变得非常非常容易 但这会以牺牲速度还是其他什么为代价 我不太关心可移植性 显然你会有数据库连接的文本日志 我能想到两大

随机推荐