大数据技术Canal总结和详细案例

2023-05-16

0 Canal介绍

Canal 是用 Java 开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。目前Canal 主要支持了 MySQL 的 Binlog 解析,解析完成后才利用 Canal Client 来处理获得的相关数据。(数据库同步需要阿里的 Otter 中间件,基于 Canal)

1 MySQL 的 Binlog

1.1 什么是 Binlog

MySQL 的二进制日志,它记录了所有的 DDL 和 DML(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间,MySQL 的二进制日志是事务安全型的。二进制有两个最重要的使用场景:

① MySQL Replication 在 Master 端开启 Binlog,Master 把它的二进制日志传递给 Slaves来达到 Master-Slave 数据一致的目的。

② 数据恢复,通过使用 MySQL Binlog 工具来使恢复数据。二进制日志包括两类文件:二进制日志索引文件(文件名后缀为.index)用于记录所有的二进制文件,二进制日志文件(文件名后缀为.00000*)记录数据库所有的 DDL 和 DML(除了数据查询语句)语句事件

1.2 Binlog分类

MySQL Binlog 的格式有三种,分别是 STATEMENT,MIXED,ROW。在配置文件中可以选择配
置 binlog_format= statement|mixed|row。三种格式的区别
1)statement
语句级,binlog 会记录每次一执行写操作的语句。相对 row 模式节省空
间,但是可能产生不一致性,比如“update tt set create_date=now()”,如果用 binlog 日志进行恢复,由于执行时间不同可能产生的数据就不同。

优点:节省空间。
缺点:有可能造成数据不一致。

2)row
行级, binlog 会记录每次操作后每行记录的变化。

优点:保持数据的绝对一致性。因为不管 sql 是什么,引用了什么函数,只记录执行后的效果。
缺点:占用较大空间。

3)mixed
statement 的升级版,一定程度上解决了因为一些情况而造成的 statement
模式不一致问题,默认还是 statement,在某些情况下譬如:当函数中包含 UUID() 时;包含AUTO_INCREMENT 字段的表被更新时;执行 INSERT DELAYED 语句时;用 UDF 时;会按照ROW 的方式进行处理

优点:节省空间,同时兼顾了一定的一致性。
缺点:还有些极个别情况依旧会造成不一致,另外 statement 和 mixed 对于需要对binlog 的监控的情况都不方便。

综合上面对比,Canal 想做监控分析,选择 row 格式比较合适

1.3 Mysql主从复制

1)Master 主库将改变记录,写到二进制日志(Binary Log)中;
2)Slave 从库向 MySQL Master 发送 dump 协议,将 Master 主库的 binary log events 拷贝到它的中继日志(relay log);
3)Slave 从库读取并重做中继日志中的事件,将改变的数据同步到自己的数据库
在这里插入图片描述
Canal原理很简单,即伪装成mysql的slave节点,假装从Master拷贝数据。使用场景如下:
在这里插入图片描述

1.4 修改mysql配置文件

修改/etc/my.cnf文件如下内容
在这里插入图片描述

#开启binlog
log_bin = mysql-bin
#binlog日志类型
binlog_format = row
#MySQL服务器唯一id
server_id = 1
#设置需要同步的库
binlog-do-db=canal

binlog-do-db 根据自己的情况进行修改,指定具体要同步的数据库,如果不配置则表示所有数据库均开启 Binlog。修改完成后重启mysql生效

service mysql restart

1.5 测试binlog是否生效

建表

create table student (
	id varchar(20)
	,name varchar(20)
	,age int
	,sex varchar(5)
) ENGINE=InnoDB
DEFAULT CHARACTER SET=utf8 COLLATE=utf8_general_ci;

插入数据

insert into student values('1001','zhangsan',18,'male');

对比查看未插入数据前binlog
在这里插入图片描述
可以看到binlog发生了变化,说明配置生效了。

1.6 Mysql中新建canal用户

CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

2 安装Canal

2.1 下载canal

进入官网下载canal安装包,这里以1.1.2版本为例

https://github.com/alibaba/canal/releases

2.2 创建canal文件夹并解压

这里的canal文件包解压后是很多文件,因此建议建立一个独立的文件夹用于存放解压后的文件。

mkdir canal
tar -zxvf canal.deployer-1.1.2.tar.gz -C ./canal

2.3 修改canal.properties 的配置

说明:
① 这个文件是 canal 的基本通用配置,canal 端口号默认就是 11111,修改 canal 的输出 model,默认 tcp,改为输出到 kafka.
② 多实例配置如果创建多个实例,通过前面 canal 架构,我们可以知道,一个 canal 服务中可以有多个 instance,conf/下的每一个 example 即是一个实例,每个实例下面都有独立的配置文件。默认只有一个实例 example,如果需要多个实例处理不同的 MySQL 数据的话,直接拷贝出多个 example,并对其重新命名,命名和配置文件中指定的名称一致,然后修改canal.properties 中的 canal.destinations=实例 1,实例 2,实例 3。
在这里插入图片描述

2.4 修改 instance.properties

我们这里只读取一个 MySQL 数据,所以只有一个实例,这个实例的配置文件在conf/example 目录下

vim instance.properties
  1. 配置Mysql服务器地址
#################################################
## mysql serverId , v1.0.26+ will autoGen
canal.instance.mysql.slaveId=10
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=wavehouse-3:3306

在这里插入图片描述

注意:这里的canal.instance.mysql.slaveId=10,只要和my.cnf中的server_id不一样即可,因为在这里canal是伪装成mysql的slave,因此id要不一样。

2)配置连接 MySQL 的用户名和密码,默认就是我们前面授权的 canal

canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
canal.instance.defaultDatabaseName =canal
# enable druid Decrypt database password
canal.instance.enableDruid=false

在这里插入图片描述

2.5 启动canal

./startup.sh

在这里插入图片描述

3 实时监控

3.1 创建maven项目

3.2 添加依赖

<dependencies>
 <dependency>
 <groupId>com.alibaba.otter</groupId>
 <artifactId>canal.client</artifactId>
 <version>1.1.2</version>
</dependency>
 <dependency>
 <groupId>org.apache.kafka</groupId>
 <artifactId>kafka-clients</artifactId>
 <version>2.4.1</version>
 </dependency>
</dependencies>

3.3 根据Canal的架构写代码

在这里插入图片描述

package com.chen.canal;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;

import java.net.InetSocketAddress;
import java.util.List;

public class CanalClient {
    public static void main(String[] args) throws InvalidProtocolBufferException {
        //1.获取Canal连接对象
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.2.202", 11111),
                "example", "", "");
        while (true){
            //2.获取连接
            canalConnector.connect();
            //3.指定需要监控的数据库
            canalConnector.subscribe("canal.*");
            //4.获取message
            Message message = canalConnector.get(100);
            //4.1获取entries
            List<CanalEntry.Entry> entries = message.getEntries();
            //4.2 判断是否有数据
            if(entries.size() <= 0){
                System.out.println("当前没有数据,休息一下~~~~");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }else{
                //如果有数据,则进行遍历
                for (CanalEntry.Entry entry : entries) {
                    //5.获取表名
                    String tableName = entry.getHeader().getTableName();
                    //6.获取entryType
                    CanalEntry.EntryType entryType = entry.getEntryType();
                    //7.判断entryType类型是否未ROWDATA
                    if(CanalEntry.EntryType.ROWDATA.equals(entryType)){
                        //7.1如果是则序列化数据
                        ByteString storeValue = entry.getStoreValue();
                        //8.反序列化
                        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
                        //9.获取事件类型
                        CanalEntry.EventType eventType = rowChange.getEventType();
                        //10.获取具体的数据
                        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
                        //11.遍历打印数据
                        for (CanalEntry.RowData rowData : rowDatasList) {
                            //11.1获取before的数据
                            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                            //11.2新建JSON存放before数据
                            JSONObject beforeData = new JSONObject();
                            for (CanalEntry.Column column : beforeColumnsList) {
                                beforeData.put(column.getName(),column.getValue());
                            }
                            //11.3获取after数据
                            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                            //11.4新建JSON存放after数据
                            JSONObject afterData = new JSONObject();
                            for (CanalEntry.Column column : afterColumnsList) {
                                afterData.put(column.getName(),column.getValue());
                            }
                            //12.打印
                            System.out.println("TableName: "+tableName+" ,EvenType: "+eventType+
                                    " ,Before: "+beforeData+",After: "+afterData);

                        }
                    }
                }
                
            }
        }
    }
}

3.3.1 插入数据

insert into student values('1002','lisi',28,'fe');

插入数据后我们可以发现是After更新,Before不更新,因为Before之前没有数据为空
在这里插入图片描述

3.3.2 插入多行数据

insert into student values('1003','wangwu',29,'fe'),('1004','zhaoliu',38,'male'),('1005','zhuqi',8,'male')

在这里插入图片描述

3.3.3 更新数据

update student set age=22 where id=1002;

在这里插入图片描述

更新数据会在before和after中均有数据,before是修改前数据,after则是修改后数据

3.3. 删除数据

delete from student where id=1003;

在这里插入图片描述
删除后数据不存在,所以before中有数据,而after中没有数据

3.4 Kafka模式测试

1)修改 canal.properties 中 canal 的输出 model,默认 tcp,改为输出到 kafka
在这里插入图片描述

2)修改 Kafka 集群的地址
在这里插入图片描述

3)修改 instance.properties 输出到 Kafka 的主题以及分区数
注意:默认还是输出到指定 Kafka 主题的一个 kafka 分区,因为多个分区并行可能会打乱 binlog 的顺序 , 如 果 要 提 高 并 行 度 , 首 先 设 置 kafka 的 分 区 数 >1, 然 后 设 置canal.mq.partitionHash 属性
在这里插入图片描述
4)启动 Canal
启动Canal之前,先启动kafka,启动kafka需要先启动zookeeper

bin/zkServer.sh start
bin/kafka-server-start.sh -daemon config/server.properties

在这里插入图片描述
5)看到 CanalLauncher 你表示启动成功,同时会创建 canal_test 主题
在这里插入图片描述

6)启动 Kafka 消费客户端测试,查看消费情况

bin/kafka-console-consumer.sh --bootstrap-server 192.168.2.200:9092 --topic canal_test

7)向 MySQL 中插入数据后查看消费者控制台

insert into student values('1008','zhuba',29,'fe'),('1009','yangjiu',38,'male');

在这里插入图片描述
kafka是将插入后的多条数据写在json数组中

  1. 更新MySQL数据查看消费者控制台
update student set name='zhuba2' where id=1008;

在这里插入图片描述
更新后Kafka将old字段展示为旧的数据,data字段展示为新的数据
9)删除MySQL数据查看消费控制台

delete from student where id=1005;

在这里插入图片描述

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

大数据技术Canal总结和详细案例 的相关文章

随机推荐

  • 查看容器暴露的端口

    docker ps 查看运行中的容器 docker port c5b 查看容器c5b的端口状态 27017是容器端口 xff1b 0 0 0 0 37017是27017映射到宿主机上的端口
  • python元组实例

    脚本 xff1a usr bin python coding utf 8 names 61 34 kevin 34 34 liqunxing 34 34 chenxianan 34 print names 0 names 1 names 2
  • 机房温度告警

    https blog csdn net ot512csdn article details 80175323
  • k8s部署教程

    https mp weixin qq com s biz 61 MzI5MjA5Mjg5OA 61 61 amp mid 61 2247484395 amp idx 61 1 amp sn 61 0767cc24ec99ce818e41f7
  • Win10笔记本用雷电3接口外接显卡加速tensorflow深度学习步骤

    简介 xff1a 最近入手了一块rtx3060 xff0c 但自己的主力设备是笔记本 xff0c 于是萌生了通过外接显卡来加速深度学习的想法 xff0c 配置过程中遇到一些小问题 xff0c 经过调试最后解决了 xff0c 现在简单把整个过
  • 使用GitHub和DockerHub自动构建并发布镜像

    要使用自动构建 xff0c 必须在 Docker Hub 和GitHub上拥有一个帐户 首先登录您的Docker Hub账号 xff0c 创建一个Repository xff0c 创建类似如下页面 点击创建的Repository xff0c
  • 三 机器人仿真软件Gazebo介绍

    ROS教程 这是小弟的学习笔记 xff0c 有错求请拍 xff0c 多指教 xff0c 谢谢 三 机器人仿真软件Gazebo介绍 Gazebo功能 1 构建机器人运动仿真模型 在Gazebo里 xff0c 提供了最基础的三个物体 xff0c
  • Protobuf生成Go代码指南

    这个教程中将会描述protocol buffer编译器通过给定的 proto会编译生成什么Go代码 教程针对的是proto3版本的protobuf 在阅读之前确保你已经阅读过Protobuf语言指南 编译器调用 Protobuf核心的工具集
  • ROS sensor_msgs/LaserScan Message简单说明

    std msgs Header header float32 angle min 开始扫描角度 float32 angle max 结束扫描角度 float32 angle increment 每次扫描增加的角度 xff08 角度分辨率 x
  • 双系统Ubuntu分区

    假设整个空闲空间有200G xff0c 主要分4个区 xff1a 1 给系统分区EFI xff1a 在唯一的一个空闲分区上添加 xff0c 大小200M xff0c 逻辑分区 xff0c 空间起始位置 xff0c 用于efi xff1b 这
  • 2 用D435i运行VINS-fusion

    文章目录 1 VINS fusion的安装1 1 环境和依赖的安装1 2 编译VINS Fusion1 3 编译错误解决方法 2 VINS Fusion跑数据集3 用相机运行VINS Fusion 环境 xff1a Ubuntu20 04
  • Python每日一个小程序

    前几天上网 xff0c 收集了20多道Python练习题 这些练习题还是很有价值的 xff0c 正好最近忙着复习准备校招 xff0c 可以用来练手 我会把每道题都写一篇博客详细阐述解题思路和源代码 xff0c 在每道题目后面附上博客地址 希
  • 数分下(第1讲):一阶微分方程的三类模型求解

    第1 1讲 xff1a 一阶微分方程的解法 第一周第一讲将用3个小时时间 xff0c 完整讲解一阶微分方程y 61 f x y 的三种典型模型求解方法 掌握以下知识点 xff0c 并熟练做题训练 对应同济高数教材第七章1 4节 知识点脑图如
  • 非常详细的 Linux C/C++ 学习路线总结!助我拿下腾讯offer

    点击关注上方 五分钟学算法 xff0c 设为 置顶或星标 xff0c 第一时间送达干货 转自后端技术学堂 正文 我的另一篇文章 腾讯 C 43 43 后台开发面试笔试知识点参考笔记 整理了 C 43 43 后台开发知识点 xff0c 本文尝
  • 一线互联网公司程序员技术面试的流程以及注意事项

    先来了解面试的流程是什么 xff0c 然后再一一做准备 xff01 企业一般通过几轮技术面试来考察大家的各项能力 xff0c 一般流程如下 xff1a 一面机试 xff1a 一般会考选择题和编程题 二面基础算法面 xff1a 就是基础的算法
  • 为什么C++永不过时?

    Linus曾说过 xff1a C 43 43 是一门很恐怖的语言 xff0c 而比它更恐怖的是很多不合格的程序员在使用着它 xff01 这足以说明C 43 43 有多难 xff01 不过 xff0c 你也要明白 难度越高意味着含金量与竞争力
  • STM32 USB学习笔记6

    主机环境 xff1a Windows 7 SP1 开发环境 xff1a MDK5 14 目标板 xff1a STM32F103C8T6 开发库 xff1a STM32F1Cube库和STM32 USB Device Library 现在来分
  • Invalid bound statement (not found)

    目录 一 遇到的问题 二 分析思路 1 映射文件 2 测试类 三 解决方案 一 遇到的问题 前几日 xff0c 有个工作不久的同事找我帮他解决一个 Mybatis 的问题 他写了一个增删改查 xff0c 但是在启动程序的时候报错 xff1a
  • ThinkPHP6 解决小程序调用接口返回错误是网页的尴尬

    背景 早在开始了解ThinkPHP时就一直记得一段话 xff1a 在一开始无知的我以为出现错误后能在调试阶段优雅的了解错误信息 xff0c 但结果大家试一下便知道 xff0c 十分尴尬 尤其是当在小程序里请求api xff0c 在过程中发生
  • 大数据技术Canal总结和详细案例

    0 Canal介绍 Canal 是用 Java 开发的基于数据库增量日志解析 xff0c 提供增量数据订阅 amp 消费的中间件 目前Canal 主要支持了 MySQL 的 Binlog 解析 xff0c 解析完成后才利用 Canal Cl