Canal数据库监听

2023-05-16

1、什么是canal

canal是用java开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。目前,canal主要支持了MySQL的binlog解析,解析完成后才利用canal client 用来处理获得的相关数据。(数据库同步需要阿里的otter中间件,基于canal)

2、canal使用场景

(1)阿里otter(阿里用于进行异地数据库之间的同步框架)中间件的一部分,这是原始场景;

(2)更新缓存:如果有大量的请求发送到mysql的话,mysql查询速度慢,QPS上不去,光查mysql可能会瘫痪,那就可以在前面加个缓存,这个缓存有2个主要的问题。一是缓存没有怎么办,二是数据不一致怎么办。对于第一个问题查缓存没有就查询mysql,mysql再往缓存中写一份。对于第二个问题,如果数据库修改了,那就采用异步的方式进行修改,启动一个canal服务,监控mysql,只要一有变化就同步缓存,这样mysql和缓存就能达到最终的一致性;

(3)抓取业务数据新增变化表,用于制作拉链表:做拉链表是需要有增加时间和修改时间的,需要数据今天新增和变化的数据,如果时间不全就没办法知道哪些是修改的。可以通过canal把变化的抽到自己的表里,以后数据就从这个表出。

(4)取业务表的新增变化数据,用于制作实时统计

3、canal工作原理

1. MySQL主备复制原理

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)

  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)

  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

canal的工作原理:把自己伪装成slave,从master复制数据。读取binlog是需要master授权的,因为binlog是加密的,授权分用户名密码才能读。master授权后不知道读他的binlog的是从机还是canal,他的所有传输协议都符合从机的标准,所以master一直以为是从机读的。

4、 Canal 实战

  1. 需求分析:

(1)、注册用户成功,需要发送邮箱信息;

方式1:我们使用AOP切面,注册成功后,发送mq消息,监听mq消息队列,发送邮件。这样会存在业务层直接有耦合关系

方式2:使用canal监听数据库,监听用户表,如果用户表有数据插入,发送mq消息到队列中。监听mq消息队列,发送邮件。这样监听发送邮件与注册的业务没有任何耦合关系;

(2)、修改或者删除或者增加数据后,redis缓存中数据没有同步更新

方式1:使用AOP切面,@CacheEvict(value = "ums", key = "'resource'", beforeInvocation = false),先执行修改或者新增或者删除的代码,执行完成后,删除redis中的key。下次执行查询的数据后,将数据写到redis中@Cacheable(value = "ums", key = "'resource'");

这种方式有个弊端:

1.修改、删除、新增,每个方法上都需要添加@CacheEvict,删除redis中的key

2.每次执行修改、删除、新增后,执行查询的时候,先要去数据库查询,然后在同步到redis中;

方式2:使用canal监听数据库,监听对应的表,如果表中有增、删、改,发送mq消息,监听到mq消息队列,则删除redis中的对应key,重新读取最新的数据写入。这样修改或者增加完成后,返回查询数据,数据是正确的,并且返回后查询不需要等待较长时间,因为开辟了其他线程消费mq消息;

  1. 环境配置

所有的应用都部署到docker容器上

  1. MySQL开启 binlog 日志

1.docker查看mysql工作目录 docker inspect xx

MergedDir表示工作目录

2.进入工作目录

查看mysql配置文件

3.编辑mysql配置文件,在mysqld下增加如下配置

[mysqld]
log-bin=mysql-bin # 开启binlog
binlog-format=ROW # 选择ROW模式
server_id=1 # 配置MySQL replaction需要定义,不和Canal的slaveId重复即可

4.重启docker容器

docker restart myMysql

2. RabbitMQ 队列创建

  • 添加交换机 canal.exchange

  • 添加队列 canal.queue

  • 队列绑定交换机,设置routingkey

  1. canal配置与启动

1.下载镜像

[root@localhost /]# docker run --name myCanal -d -p 11111:11111 canal/canal-server

-d 后台启动 --name 起名字 -p端口映射 canal/canal-server镜像名称

  1. 查看canal工作目录

docker inspect myCanal

  1. 进入工作目录,修改配置文件

我们不用linux进入工作目录了,我们使用idea的sftp的功能进入工作目录修改配置文件

修改配置文件信息

修改文件后上传到服务器

3、springboot整合canal

  1. 引入jar包

        <!--  canal数据库监听,依赖的jar包。 需要注意canal监听的时候还需要mybatis-plus相关jar,没有监听的时候会报错 -->
        <dependency>
            <groupId>top.javatool</groupId>
            <artifactId>canal-spring-boot-starter</artifactId>
        </dependency>

        <!--  依赖 数据库已经mybatis-plus相关jar包    -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>

        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
        </dependency>
  1. springboot配置文件,配置canal信息

#如果配置了数据库监听发送mq消息到队列中,这个配置就不要了,因为这个默认是按照tcp连接方式连接的,我们修改了配置文件,连接方式修改为了mq
canal:
  server: 192.168.75.131:11111
  destination: example
  1. 编写测试代码


import com.powernode.entity.UmsUser;
import org.springframework.mail.javamail.JavaMailSender;

import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;

import javax.annotation.Resource;

/**
 * canal监听数据库,如果ums_user数据库插入数据,则发送邮件
 * 注意:
 *   如果我们先删除或者添加或者修改数据,服务启动后,依然会监听到。因此服务短暂的暂停不影响数据的完整性
 *
 * canal用在数据同步很好,例如:数据库更新,缓存也可以进行更新
 *
 * CanalTable 监听那个表
 */
@Component
@CanalTable(value = "ums_user")
public class UserHandler implements EntryHandler<UmsUser> {

    @Resource
    private JavaMailSender javaMailSender;

    /**
     * 监听添加
     * @param umsUser 添加的umsUser对象数据
     */
    public void insert(UmsUser umsUser) {
        System.out.println("监听添加");
        System.out.println("添加====" + umsUser);
        //监听到添加数据后,进行发送邮件,不需要在往mq中发现消息,mq消费消息发送邮件了。彻底与业务层解耦了
    }


    /**
     * 监听修改
     * @param before 监听修改前user对象的被修改字段的数据,其他没有修改的都是null
     *               修改前=======UmsUser(name=BB, phone=null, email=null, icon=null, password=null, active=null, sort=null, fileInfoListList=null, description=null)
     * @param after 监听修改后的user对象的数据
     */
    public void update(UmsUser before, UmsUser after) {
        System.out.println("监听修改");
        System.out.println("修改前=======" + before);
        System.out.println("修改后=======" + after);
    }

    /**
     *  监听删除
     * @param umsUser 删除umsUser对象数据
     */
    public void delete(UmsUser umsUser) {
        System.out.println("监听删除");
        System.out.println("删除====" + umsUser);
    }
}

启动spring项目

修改ums_user表信息,监听到休息的信息。

注意:canal只监听,增删改,不监听查询

通过上面我们可以发现,通过canal监听可以,对数据库的表进行监听,结合EntryHandler实现的增删改的方法,我们拿到增加、删除、修改的数据。可以对数据进行操作。

例如。添加数据后,我们可以在增加方法里面发送邮件;

4、springboot + canal +rabbitmq 整合

上面那种方法数据库表变更后,就会canal就会监听到,调用对应的方法处理。

我们可以使用异步进行处理,表变更后,发送mq消息到队列,消费消息发送邮件;

  1. 继续修改canal instance.properties配置文件

  1. 修改canal.properties配置文件信息

  1. 删除springboot配置文件中canal配置信息,因为如果使用mq连接形式就不是tcp了

  1. 编写测试代码,ums_user表修改、添加、删除数据,发送邮件

package com.powernode.core;

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.mail.javamail.MimeMessageHelper;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import javax.mail.MessagingException;
import javax.mail.internet.MimeMessage;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
 * canal监听数据库所有的表,如果数据库ums_user表插入数据,就给email队列中发送mq消息
 */
@Slf4j
@Component
public class ListenerMysqlEmail {

    public static final String QUEUE_NAME = "email";

    @Resource
    JavaMailSender javaMailSender;

    @Value("${spring.mail.username}")
    String from;

    /**
     * canal监控
     *  消费mq, 判断如果监控到shop.ums_user表数据有更新或者新增或者删除,发送邮件
     * @param message
     */
    @RabbitListener(queues = QUEUE_NAME)
    public void listenerMysqlEmailQueue(Message message) {
        String body = new String(message.getBody());
        List<String> typeList = Stream.of("INSERT", "UPDATE", "DELETE").collect(Collectors.toList());
        Map<String, Object> result = JSONObject.parseObject(body, Map.class);
        if("shop".equals(result.get("database")) && "ums_user".equals(result.get("table"))){
            if(typeList.contains((String) result.get("type"))){

                MimeMessage mimeMessage = javaMailSender.createMimeMessage();
                try {
                    MimeMessageHelper mimeMessageHelper = new MimeMessageHelper(mimeMessage,true);
                    List<Map<String, Object>> data =(List)result.get("data");
                    for (Map<String, Object> map:data) {
                        String text = "<p>尊敬的" +map.get("name") + "先生/女士" + "</p><br/>" + "<p>系统为您创建了了用户,登录名是您的手机号或者邮箱</p></br>"
                                + "<p>密码是" + map.get("password") +"</p>";
                        mimeMessageHelper.setSubject("系统消息");
                        mimeMessageHelper.setFrom(from);
                        mimeMessageHelper.setTo((String) map.get("email"));
                        mimeMessageHelper.setText(text);

                        javaMailSender.send(mimeMessage);
                    }
                } catch (MessagingException e) {
                    e.printStackTrace();
                    log.error("邮件发送异常" + e);
                }
            }
        }
    }

}

5、编写测试代码,修改ums_resource表信息,删除redis缓存,重新刷新redis

package com.powernode.core;

import com.alibaba.fastjson.JSONObject;
import com.powernode.service.UmsResourceService;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Component
public class ListenerResource {
    @Resource
    private UmsResourceService umsResourceService;
    @Resource
    private RedisTemplate redisTemplate;

    /**
     * 监听数据库
     *  如果监听到了shop.ums_resource表数据更新或者删除,更新redis数据
     * @param message
     */
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = "email", durable = "true"),
                    exchange = @Exchange(value = "canal.exchage"),
                    key = "canalEmail"
            )
    })
    public void handleDataChange(Message message) {
        String body = new String(message.getBody());
        List<String> typeList = Stream.of("INSERT", "UPDATE", "DELETE").collect(Collectors.toList());
        Map<String, Object> result = JSONObject.parseObject(body, Map.class);
        if ("shop".equals(result.get("database")) && "ums_resource".equals(result.get("table"))) {
            if (typeList.contains((String) result.get("type"))) {
                //如果是增删改,删除redis中的key,重新将数据刷新到redis中
                if (redisTemplate.delete("ums::resource")) {
                    umsResourceService.getAll();
                }
            }
        }
    }
}

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Canal数据库监听 的相关文章

  • 使用canal连接kafka

    这篇主要是项目还原 xff0c 目的是记录构建时遇到的各种奇葩坑 xff0c 避免下次迷路 废话不多说 xff0c 直接上手 默认已安装docker xff0c docker compose xff0c nodejs xff0c yarn
  • canal 修改配置信息后监听不到mysql数据并报错can‘t find start position for example

    原由 xff1a 数据库地址变化 canal 需要修改监听 问题 xff1a 修改配置信息后重启canal 但并无监听到数据库信息变化 分析 xff1a canal 与数据库之间断层 xff0c 导致信息传输失败 解决 xff1a xff0
  • 数据同步之初识Canal

    git地址 xff1a 阿里巴巴Canal的Git地址 Canal基于日志增量订阅和消费的业务包括 xff1a 数据库镜像 数据库实时备份索引构建和实时维护 拆分异构索引 倒排索引 业务cache刷新 带业务逻辑的增量数据处理 Mysql
  • 谷粒学院(二十)Canal数据同步工具

    一 Canal介绍 1 应用场景 在前面的统计分析功能中 xff0c 我们采取了服务调用获取统计数据 xff0c 这样耦合度高 xff0c 效率相对较低 xff0c 目前我采取另一种实现方式 xff0c 通过实时同步数据库表的方式实现 xf
  • Canal监控MySQL数据到Kafka详细步骤(jdk+zookeeper+kafka+canal+mysql)

    目录 一 前言二 环境准备三 安装JDK四 安装zookeeper五 安装Kafka六 安装MySQL七 安装canal服务端 xff08 canal监控mysql数据发送到kafka xff09 八 测试是否可以监控到数据九 结语 一 前
  • [搬运]Ali Canal Prometheus QuickStart

    Prometheus QuickStart lcybo edited this page on 29 Aug 2018 3 revisions Pages 38 Contents 目录 HomeIntroduction 简介Quick St
  • canal 修改配置信息后监听不到mysql数据并报错can‘t find start position for example

    原由 xff1a 数据库地址变化 canal 需要修改监听 问题 xff1a 修改配置信息后重启canal 但并无监听到数据库信息变化 分析 xff1a canal 与数据库之间断层 xff0c 导致信息传输失败 解决 xff1a xff0
  • Canal数据库监听

    1 什么是canal canal是用java开发的基于数据库增量日志解析 xff0c 提供增量数据订阅 amp 消费的中间件 目前 xff0c canal主要支持了MySQL的binlog解析 xff0c 解析完成后才利用canal cli
  • 使用canal配合rocketmq监听mysql的binlog日志

    目录 一 安装配置canal 1 1 安装canal 1 2 配置canal基本属性 1 3 配置canal的mysql 二 mysql配置 2 1 开启mysql的binlog日志 2 2 配置 canal 专用用户 2 3 启动cana
  • Elasticsearch7.9集群部署,head插件,canal同步mysql数据到es,亲自测试,无坑

    Elasticsearch集群部署 1 服务器规划 10 4 7 11 node1 10 4 7 12 node2 10 4 7 13 node3 1 集群相关 一个运行中的 Elasticsearch 实例称为一个节点 xff0c 而集群
  • prometheus+grafana监控mysql、canal服务器

    一 prometheus配置 1 prometheus安装 1 1官网下载安装包 xff1a https prometheus io download 1 2解压安装包 xff1a tar zxvf prometheus 2 6 1 lin
  • Canal入门(二)

    Canal入门 xff08 二 xff09 canal kafka quickStart 1 基本说明 canal 1 1 1版本之后 默认支持将canal server接收到的binlog数据直接投递到MQ 目前默认支持的MQ系统有 ka
  • Canal AdminGuide

    背景 先前开源了一个开源项目 xff1a 阿里巴巴开源项目 基于mysql数据库binlog的增量订阅 amp 消费 本文主要是介绍一下如何部署 amp 使用 环境要求 1 操作系统 a 纯java开发 xff0c windows linu
  • 不同业务场景下数据同步方案设计

    企业开发实践中通常需要提供数据搜索的功能 例如 电商系统中的商品搜索 订单搜索等 通常 搜索任务通常由搜索引擎担当 如Elasticsearch 而我们的原始数据为了安全性等问题通常存储在关系型数据库中 在搜索数据前 我们需要先将数据从关系
  • 搭建: canal部署与实例运行

    1 准备 github https github com alibaba canal 里面有包括canal的文档 server端 client端的 例子 源码包等等 2 canal概述 canal是应阿里巴巴存在杭州和美国的双机房部署 存在
  • 深入解析中间件之-Canal

    canal 阿里巴巴mysql数据库binlog的增量订阅 消费组件 MySQL binlog MySQL主从复制 mysql服务端修改配置并重启 1 2 3 4 5 6 7 8 9 10 11 12 vi etc my cnf mysql
  • 使用TCP方式拉取Canal数据

    1 Canal对接Kafka联调 1 1 配置修改 canal properties 修改 zk canal zkServers 10 51 50 219 2181 instance properties 开启配置项 canal mq dy
  • Springboot2(44)集成canal

    源码地址 springboot2教程系列 canal高可用部署安装和配置参数详解 前言 canal是阿里巴巴的基于数据库增量日志解析 提供增量数据订阅 消费 目前主要支持了mysql 可以用于比如数据库数据变化的监听从而同步缓存 如Redi
  • 使用canal同步数据,踩坑排雷全过程

    1 mysql配置 1 检查binlog功能是否有开启 mysql gt show variables like log bin Variable name Value log bin OFF 1 row in set 0 00 sec 如
  • 实战:实现缓存和数据库一致性方案

    原创 微信公众号 阿Q说代码 欢迎分享 转载请保留出处 哈喽大家好 我是阿Q 最近不是正好在研究 canal 嘛 刚巧前两天看了一篇关于解决缓存与数据库一致性问题的文章 里边提到了一种解决方案是结合 canal 来操作的 所以阿Q就想趁热打

随机推荐

  • ∏这个是什么符号?

    是各项连乘的运算符号 读大写的 xff08 pai xff09 例如 xff1a i 61 1 xff08 符号下面 xff09 n xff08 符号上面 xff09 ai 符号右面 表示a1 a2 an 符号下面表示右面式子可变参量的下限
  • python获取图片的颜色信息

    span class pun style font family none font size 14px span h1 style font family none font size 24px padding 5px margin 5p
  • Python:TypeError: 'int' object is not callable

    一个函数的部分代码如下 xff1a python view plain copy def loadData len 61 dataSet len trainingSet extend dataSet len 3 4 testSet exte
  • FreeRTOS — 消息队列

    以下内容转载自安富莱电子 xff1a http forum armfly com forum php FreeRTOS 的一个重要的通信机制 消息队列 xff0c 消息队列在实际项目中应用较多 1 消息队列 1 1 消息队列的概念及其作用
  • 学校人力资源管理系统——物理结构设计

    新 建学校人力资源管理系统 数据库 1 创建相关表 1 1 创建部门信息表 部门信息表的创建代码如下 xff0c 部门信息表在SQL中的信息显示如图6 1所示 create table 部门信息表 部门编号 char 6 not null
  • python 安装serial模块

    想用Python来实现对串口的控制 xff0c 写好了脚本 xff0c 现在将这个脚本拿到另外一个电脑上去运行 xff1b 运行时提示错误 xff0c 说是没有安装serial模块 xff0c 于是乎安装 pip install seria
  • Doxygen使用教程(个人总结)

    简介Doxygen 一 xff0e 什么是Doxygen Doxygen 是一个程序的文件产生工具 xff0c 可将程序中的特定批注转换成为说明文件 通常我们在写程序时 xff0c 或多或少都会写上批注 xff0c 但是对于其它人而言 xf
  • STM32F401 I2S(full duplex)全双工示例代码

    USER CODE BEGIN Header 64 file main c 64 brief Main program body This notice applies to any and all portions of this fil
  • 一、认识与学习Linux中的BASH 之 1.1 什么是bash

    1 1 什么是bash 1 1 1 什么是bash bash全称为The Bourne Again shell xff0c 是Bourne Shell的扩展 xff0c 是基于GUN构架发展出来的语言 xff0c 有很灵活和强大的编程接口
  • 阿里云ECS服务器环境搭建(1) —— ubuntu 16.04 图形界面的安装

    阿里云ECS服务器环境搭建 xff08 1 xff09 ubuntu 16 04 图形界面的安装 1 背景 在我们购买阿里云ECS服务器之后 xff0c 默认的系统环境是很干净的 xff0c 我购买的是ubuntu16 04 xff0c 远
  • Python+pandas+每天股票涨了多少

    第一步 xff1a 得到某支股票历年来的交易数据 方法见 xff1a https blog csdn net zwy 0309 article details 108217342 在此 xff0c 我使用以下脚本得到股票 xff08 代码
  • 2021-06-02

    在ROS中仿真模型中添加gps传感器 获取gps传感器模型包为自己的机器人添加gps传感器将gps之中的经度纬度坐标转化为自己地图中的坐标 1 获取gps传感器模型包 link http wiki ros org hector gazebo
  • Dockerfile如何编写(指令详解)

    本文个人博客地址 xff1a https www leafage top posts detail 21525V8AP Dockerfile Dockerfile 描述了组装镜像的步骤 xff0c 其中每条指令都是单独执行的 除了FROM指
  • 关于Home Lab的搭建——硬件选择篇(迷你主机)(一)

    关于Home Lab 这个名词出自哪里 xff0c 我也不清楚 不过 xff0c 可以这样来理解Home Lab xff0c Home Lab是一台作为实验使用的电脑 xff0c 试验的内容多数是关于计算机网络的搭建 系统安装 测试 xff
  • Adaboost基本二分类算法

    最早类型的Adaboost是由Yoav Freund和Robert E Schapire提出的 xff0c 一种用于二分类的boosting集成学习方法 也是李航 统计学习方法 中所介绍的Adaboost 它将一系列弱分类器的线性组合 xf
  • Springboot 项目金蝶中间件AAS-9.0启动报错 javax.persistence.Table.indexes()[Ljavax/persistence/Index 问题解决

    Springboot 项目金蝶中间件AAS 9 0启动报错 java lang NoSuchMethodError javax persistence Table indexes Ljavax persistence Index问题解决方法
  • CMakeLists配置(常用的)

    一 xff1a 最小组成 cmake 最小版本需求 cmake minimum required VERSION 2 8 project 名字 project MyEsp32AllCode 可执行文件生成 add executable PR
  • SLAM学习笔记(四)定位

    原创博客 xff1a http blog csdn net renshengrumenglibing viewmode 61 contents 机器人定位的目的是为了知道 自己在什么地方 xff0c 目前 xff0c 机器人定位的方法可以分
  • SLAM实习岗位面经

    一 地平线 一面主要是在问关于SLAM岗位的技术问题 xff0c 然后还问了一些比较简单的C 43 43 基础知识 其实总体而言 xff0c 面试的问题都比较基础 xff0c 比较考验SLAM基本功 xff0c 如果只知道一些SLAM皮毛的
  • Canal数据库监听

    1 什么是canal canal是用java开发的基于数据库增量日志解析 xff0c 提供增量数据订阅 amp 消费的中间件 目前 xff0c canal主要支持了MySQL的binlog解析 xff0c 解析完成后才利用canal cli