基于Canal与Flink实现数据实时增量同步(一)

2023-11-17

点击上方蓝色字体,关注我

canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL(也支持mariaDB)。

准备

配置MySQL的binlog

常见的binlog命令

# 是否启用binlog日志
show variables like 'log_bin';
# 查看binlog类型
show global variables like 'binlog_format';
# 查看详细的日志配置信息
show global variables like '%log%';
# mysql数据存储目录
show variables like '%dir%';
# 查看binlog的目录
show global variables like "%log_bin%";
# 查看当前服务器使用的biglog文件及大小
show binary logs;
# 查看最新一个binlog日志文件名称和Position
show master status;

对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

授权

授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

部署canal

安装canal

  • 下载:https://github.com/alibaba/canal/releases

  • 解压缩

[kms@kms-1 softwares]$ tar -xzvf canal.deployer-1.1.4.tar.gz  -C /opt/modules/canal/
  • 目录结构

drwxr-xr-x 2 root root 4096 Mar  5 14:19 bin
drwxr-xr-x 5 root root 4096 Mar  5 13:54 conf
drwxr-xr-x 2 root root 4096 Mar  5 13:04 lib
drwxrwxrwx 4 root root 4096 Mar  5 14:19 logs

配置修改

  • 修改conf/example/instance.properties,修改内容如下:

## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = kms-1.apache.com:3306
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal  
canal.instance.dbPassword = canal
# mq config,kafka topic名称
canal.mq.topic=test
  • 修改conf/canal.properties,修改内容如下:

# 配置zookeeper地址
canal.zkServers =kms-2:2181,kms-3:2181,kms-4:2181
# 可选项: tcp(默认), kafka, RocketMQ,
canal.serverMode = kafka
# 配置kafka地址
canal.mq.servers = kms-2:9092,kms-3:9092,kms-4:9092

启动canal

sh bin/startup.sh

关闭canal

sh bin/stop.sh

部署Canal Admin(可选)

canal-admin设计上是为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作。

要求

canal-admin的限定依赖:

  • MySQL,用于存储配置和节点等相关数据

  • canal版本,要求>=1.1.4 (需要依赖canal-server提供面向admin的动态运维管理接口)

安装canal-admin

  • 下载

    https://github.com/alibaba/canal/releases

  • 解压缩

[kms@kms-1 softwares]$ tar -xzvf canal.admin-1.1.4.tar.gz  -C /opt/modules/canal-admin/
  • 目录结构

drwxrwxr-x 2 kms kms 4096 Mar  6 11:25 bin
drwxrwxr-x 3 kms kms 4096 Mar  6 11:25 conf
drwxrwxr-x 2 kms kms 4096 Mar  6 11:25 lib
drwxrwxr-x 2 kms kms 4096 Sep  2  2019 logs
  • 配置修改

vi conf/application.yml
server:
  port: 8089
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8

spring.datasource:
  address: kms-1:3306
  database: canal_manager
  username: canal
  password: canal
  driver-class-name: com.mysql.jdbc.Driver
  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
  hikari:
    maximum-pool-size: 30
    minimum-idle: 1

canal:
  adminUser: admin
  adminPasswd: admin
  • 初始化原数据库

mysql -uroot -p
# 导入初始化SQL
#注:(1)初始化SQL脚本里会默认创建canal_manager的数据库,建议使用root等有超级权限的账号进行初始化
#    (2)canal_manager.sql默认会在conf目录下
> mysql> source /opt/modules/canal-admin/conf/canal_manager.sql
  • 启动canal-admin

sh bin/startup.sh
  • 访问

可以通过 http://kms-1:8089/ 访问,默认密码:admin/123456

  • canal-server端配置

使用canal_local.properties的配置覆盖canal.properties,将下面配置内容配置在canal_local.properties文件里面,就可以了。

# register ip
canal.register.ip =
# canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster =
  • 启动canal-serve

sh bin/startup.sh  local

注意:先启canal-server,然后再启动canal-admin,之后登陆canal-admin就可以添加serve和instance了。

启动kafka控制台消费者测试

bin/kafka-console-consumer.sh --bootstrap-server kms-2:9092,kms-3:9092,kms-4:9092  --topic test --from-beginning

此时MySQL数据表若有变化,会将row类型的log写进Kakfa,具体格式为JSON:

  • insert操作

{
    "data":[
        {
            "id":"338",
            "city":"成都",
            "province":"四川省"
        }
    ],
    "database":"qfbap_ods",
    "es":1583394964000,
    "id":2,
    "isDdl":false,
    "mysqlType":{
        "id":"int(11)",
        "city":"varchar(256)",
        "province":"varchar(256)"
    },
    "old":null,
    "pkNames":[
        "id"
    ],
    "sql":"",
    "sqlType":{
        "id":4,
        "city":12,
        "province":12
    },
    "table":"code_city",
    "ts":1583394964361,
    "type":"INSERT"
}
  • update操作

{
    "data":[
        {
            "id":"338",
            "city":"绵阳市",
            "province":"四川省"
        }
    ],
    "database":"qfbap_ods",
    "es":1583395177000,
    "id":3,
    "isDdl":false,
    "mysqlType":{
        "id":"int(11)",
        "city":"varchar(256)",
        "province":"varchar(256)"
    },
    "old":[
        {
            "city":"成都"
        }
    ],
    "pkNames":[
        "id"
    ],
    "sql":"",
    "sqlType":{
        "id":4,
        "city":12,
        "province":12
    },
    "table":"code_city",
    "ts":1583395177408,
    "type":"UPDATE"
    }
  • delete操作

{
    "data":[
        {
            "id":"338",
            "city":"绵阳市",
            "province":"四川省"
        }
    ],
    "database":"qfbap_ods",
    "es":1583395333000,
    "id":4,
    "isDdl":false,
    "mysqlType":{
        "id":"int(11)",
        "city":"varchar(256)",
        "province":"varchar(256)"
    },
    "old":null,
    "pkNames":[
        "id"
    ],
    "sql":"",
    "sqlType":{
        "id":4,
        "city":12,
        "province":12
    },
    "table":"code_city",
    "ts":1583395333208,
    "type":"DELETE"
}


JSON日志格式解释

  • data:最新的数据,为JSON数组,如果是插入则表示最新插入的数据,如果是更新,则表示更新后的最新数据,如果是删除,则表示被删除的数据

  • database:数据库名称

  • es:事件时间,13位的时间戳

  • id:事件操作的序列号,1,2,3...

  • isDdl:是否是DDL操作

  • mysqlType:字段类型

  • old:旧数据

  • pkNames:主键名称

  • sql:SQL语句

  • sqlType:是经过canal转换处理的,比如unsigned int会被转化为Long,unsigned long会被转换为BigDecimal

  • table:表名

  • ts:日志时间

  • type:操作类型,比如DELETE,UPDATE,INSERT

小结

本文首先介绍了MySQL binlog日志的配置以及Canal的搭建,然后描述了通过canal数据传输到Kafka的配置,最后对canal解析之后的JSON数据进行了详细解释。本文是基于Canal与Flink实现数据实时增量同步的第一篇,在下一篇介绍如何使用Flink实现实时增量数据同步。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

基于Canal与Flink实现数据实时增量同步(一) 的相关文章

  • 如何知道内存中是否已经存在类的实例?

    如何知道内存中是否已经存在类的实例 我的问题是 如果存在类实例 则不想读取方法 这是我的代码 private void jButton java awt event ActionEvent evt PNLSpcMaster pnlSpc n
  • 图像在 3D 空间中绕 Y 轴旋转

    我有一个 BufferedImage 我想用 theta 角而不是仿射变换绕 Java 中的 Y 轴旋转图像 图片 旋转将如下图所示 矩形将是图像 我可以通过旋转图像的每个像素并绘制图像来做到这一点 因为我必须旋转很多图像 所以我认为这不是
  • MySQL Workbench 深色主题

    我刚刚开始学习 SQL 课程 并且一直在尝试不同的 GUI 我喜欢使用 MySQL Workbench 但白色背景刺瞎了我的眼睛 我已经搜索并找到了一些其他讨论编辑 xml 文件的相关帖子 我尝试用几种不同的方式对其进行编辑 但无济于事 我
  • 带嵌入式 tomcat 的 spring-boot 不会将请求分派到控制器

    我有一个使用 spring boot 和嵌入式 Tomcat 容器的应用程序 据我所知 我的代码与 spring boot 相同示例项目 https github com spring projects spring boot tree m
  • 新行分隔符不适用于 group_concat 函数

    我有一根绳子 name lastname name2 lastname2 包含数据库表中的值 我想显示它 喜欢 name lastname name2 lastname2 我使用 group concat 函数 它适用于逗号分隔符 但我需要
  • 使用 Copy.CopyIntoItems Web 服务将文件上传到 SharePoint 2010 时收到 400 错误请求

    SharePoint 新手 我尝试使用 Java 的 CopyIntoItems Web 服务方法将文档上传到 SharePoint 但不断收到 400 错误请求 我使用 Java 的 wsimport 从 wsdl 文件生成类文件 这是我
  • 如何使用Gson将JSONArray转换为List?

    在我的 Android 项目中 我试图将收到的 JSONArray 转换为列表 在 的帮助下这个答案 https stackoverflow com questions 8371274 how to parse json array in
  • 相对重力

    我最近开始使用jMonkey引擎 这非常好 但我在尝试实现相对重力时陷入了困境 我想让行星彼此围绕轨道运行 不一定是完美的圆形轨道 取决于速度 所以每个对象都应该影响其他对象 我现在拥有的 关闭全球重力 bulletAppState get
  • java多线程中“私有最终对象”锁定有什么用?

    java多线程中 私有最终对象 锁定有什么用 据我的理解 我认为要使一个类成为线程安全的 我们应该使用内部锁定 将所有方法标记为同步并使用 this 将它们锁定在对象的监视器上 或者我们可以用方法中的私有最终对象锁替换类的 this 上标记
  • 错误包括 bouncycastle 提供商

    我需要使用bouncycastle provider我的项目中的库 我已将其包含在 gradle 项目中 apply plugin application sourceCompatibility 1 6 version 1 0 0 main
  • 检测 MySQL 中的 utf8 损坏字符

    我有一个数据库 其中有一堆损坏的 utf8 字符分散在多个表中 字符列表不是很广泛 AFAIK 修复给定的表非常简单 update orderItem set itemName replace itemName 但我无法找到检测损坏字符的方
  • Mysql使用触发器建表

    我尝试在 Mysql 触发器内创建表 但没有创建 如何使用触发器创建表 这里传递的表的名称是动态的 据我所知 在触发器内创建表是不可能的 看这里 http forums mysql com read php 99 121849 122609
  • SFTP Java - 管道关闭 Jsch 异常

    我正在研究一种 java 方法 将文件从一个位置复制到另一个远程位置 我的代码如下 我尝试使用jsch 0 1 42 0 1 50 0 1 54 public static void processFiles ArrayList
  • Java 日期和 MySQL 时间戳时区

    我正在编辑一段代码 其基本功能是 timestamp new Date 然后坚持下去timestamp中的变量TIMESTAMPMySQL 表列 然而 通过调试我看到Date显示在正确时区的对象 GMT 1 当持久化在数据库上时 它是GMT
  • Spring Boot 中的服务限流能力

    有什么办法可以实现Spring中其余服务的服务限制能力 特别是Spring boot 这里的期望是 我的服务暴露于外界 目前每秒 分钟的服务调用数量没有限制 我们希望通过设置限制来控制这一点 我有一个替代选项 通过跟踪并发哈希映射或任何缓存
  • 在 Bluemix 中激活 PHP 扩展

    这纯粹是 Bluemix 问题 我的代码在本地主机上顺利运行 但是当我将其迁移到 Bluemix 时 我的数据库连接失败了 检查日志 我发现问题 调用未定义的函数 mysqli init HTTP 响应 500 我发现扩展已被禁用以使其更小
  • 致命异常:OkHttp 调度程序

    我在 Android 应用程序中使用 OkHttp 库向天气 API 发出 Web 请求 我已经实现了我的代码 但在执行请求时遇到了致命异常 我也已经在我的清单中添加了互联网权限 MainActivity java private Curr
  • InnoDB如何存储字符列?

    这个问题仅解决 短 的问题CHAR and VARCHAR列存储在 InnoDB 表中 Does a CHAR 10 列正好占用 10 个字节吗 尾随空格会发生什么情况 对于每个字符需要超过 1 个字节的字符集怎么办 如何VARCHAR 1
  • 如何查找列表/集合是否包含在另一个列表中

    我有一个产品 ID 列表 我想找出哪些订单包含所有这些产品 订单表的结构如下 order id product id 1 222 1 555 2 333 显然我可以通过 PHP 中的一些循环来做到这一点 但我想知道是否有一种优雅的方法可以纯
  • 与手动搜索列表相比,Collections.binarySearch 的性能如何?

    我想知道该使用哪一个 我有一份学生名单 我想用他的名字搜索一个学生 到目前为止 我是通过迭代列表手动完成的 如下所示 for int i 0 i lt list size i Student student list get i if st

随机推荐

  • IP首部报文字段

    一 IP首部报文字段 字段如下图所示 二 每个字段的含义 版本 表示 IP 协议的版本 通信双方使用的 IP 协议版本必须一致 目前广泛使用的IP协议版本号为 4 即 IPv4 首部长度 这个字段所表示数的单位是 32 位字长 1 个 32
  • postgreSQL中无法更改数据的问题

    增对这个bug 参考博客 解决Navicat修改Mysql数据后刷新恢复原样的问题 无法提交事务 Studiouss的博客 CSDN博客 发现我的问题是解决了 因为我确实没有设置主键 或者是设置主键没有保存造成的 这样就解决了 点击刷新 表
  • 自动化测试面试题及答案大全(2)

    问题1 Selenium是什么 流行的版本有哪些 是一个开源的web自动化测试的框架 支持多种编程语言 支持跨浏览器平台进行测试 Selenium 1 0或Selenium RC Selenium 2 0或Selenium Webdrive
  • VisualStudio神级插件——JetBrains Resharper 2018.2.3 Ultimate完美破解版教程

    VisualStudio神级插件 JetBrains Resharper 2018 2 3 Ultimate完美破解版 教程 ReSharper是一个JetBrains公司出品的著名的代码生成工具 是Visual Studio里面的一个插件
  • 中职本科计算机大学课程设置,中职学校计算机专业课程设置问题与对策研究——以湖南省五所中职学校为例...

    摘要 随着我国市场经济的发展 产业结构和劳动力结构不断调整 因此对劳动者的素质和结构都提出了新的要求 形成了对技能型人才需求的调整增长态势 技能型人才的紧缺 结构性失业问题已成为制约我国经济增长的瓶颈 中职教育作为目前职业教育的主体 它承担
  • 文件服务器 安全,文件服务器 安全

    文件服务器 安全 内容精选 换一换 云堡垒机支持批量导出资源信息 用于本地备份资源配置 以及便于快速管理资源基本信息 为加强资源信息安全管理 支持加密导出资源信息 导出的主机资源文件中包含主机基本信息 主机下所有资源账户信息 主机资源账户明
  • linuxmake没有指明目标并且找不到makefile_Makefile笔记

    一般来说 无论是C C 还是pas 首先要把源文件编译成中间代码文件 在Windows下也就是 obj 文件 UNIX下是 o 文件 即 Object File 这个动作叫做编译 compile 然后再把大量的Object File合成执行
  • ssh 连接报错:Unable to negotiate with 192.168.xx.xx port 22: no matching key exchange method found.

    用 ssh 连接 Linux 服务器时 很偶然的情况下出现了如下报错 Unable to negotiate with xx xx xx xx port 22 no matching key exchange method found Th
  • LeetCode题目笔记--12.整数转罗马数字

    题目描述 题目跟前面13题描述一样 就是问题变为整数转成罗马数字 思路 上一道题罗马数字转整数比较简单 因为不存在罗马数字表示冲突的问题 即不存在一个罗马数字对应多个整数 而这个问题中 就要考虑一下这个问题了 因为如果不加以约束的话 一个整
  • 【设计模式】用Java实现状态模式

    一 状态模式介绍与使用场景 状态模式是一种行为设计模式 它允许对象在内部状态发生改变时改变其行为 该模式将对象的行为包装在不同的状态类中 使得对象的行为可以根据其当前状态动态改变 状态模式通常由以下几个角色组成 环境类 Context 环境
  • c++中的时间处理(1)localtime、localtime_r和localtime_s

    c 中对时间的处理有好几个函数 很多C 程序员可能用过 但不一定完全搞得清楚 这里 我先讲解下 localtime localtime r和localtime s的使用 1 localtime localtime用来获取系统时间 精度为秒
  • Python 3.4安装pandas库时遇到的问题:no matching distribution found for numpy==1.9.3

    Window XP 其实已经安装了numpy10 0 1 但在cmd中pip install pandas时提示 no matching distribution found for numpy 1 9 3 然后卸载了之前的numpy 又使
  • selenium3和selenium4的区别

    1 初始化浏览器对象 在初始化driver对象的时候 selenium4多了一个Service类 用来管理驱动程序的启动 停止 service Service r D python39 chromedriver exe driver web
  • 手写字符识别

    一 手写字符识别原理 以下来源网上 手写数字识别 可以采用图像识别的方法 左边的x是手写之后的图像 右边的y是对应的数字 对于图像信息 计算机是用数值来进行表示的 机器学习让计算机具备智能 实际上是训练出数值模型w对于新的输入x 可以通过与
  • STL容器总结

    1 Vector 本质是动态数组 拥有一段连续的内存空间 并且起始地址不变 能非常好的支持随机存取 即 操作符 但由于它的内存空间是连续的 所以在中间进行插入和删除会造成内存块的拷贝 如果空间不够 则另外分配新的两倍大小的空间 然后把旧空间
  • mysql不等于的写法_mysql 不等于 符号写法

    经过测试发现mysql中用 lt gt 与 都是可以的 但sqlserver中不识别 所以建议用 lt gt selece from jb51 where id lt gt 45 sql 里 符号 lt gt 于 的区别 lt gt 与 都
  • 打印机驱动安装教程

    工作中 尤其是从事半文秘工作的人 不是全文秘 没有安装打印机驱动经验 这里就来说说如何安装佳能打印机驱动 准备安装资料 佳能打印驱动 安装步骤 1 我这里是压缩文件 解压后 点击Setup exe开始安装 2 要同意才能下一步安装 3 根据
  • jenkins运行Linux后台命令

    这里是指广义上的后台 不管是shell命令nohub或者其他 只要是需要常驻linux后台的命令或者程序 如果通过Jenkins启动 当任务结束时Jenkins都会清理掉此次任务中的所有相关进程 现象就是明明运行成功了但是实际找不到进程 解
  • 安装CPU版本的pytorch和torchvision(Win10)

    前言 在使用以下方法之前 我是用了网上说的搭建清华镜像进行下载 虽然pytorch下载成功了 但是在下载torchvision的时候就一直成功不了 在网络的大千世界中 我终于悟到先本地下载再安装的方法 第一步 找好对应的版本 第二步 下载本
  • 基于Canal与Flink实现数据实时增量同步(一)

    点击上方蓝色字体 关注我 canal是阿里巴巴旗下的一款开源项目 纯Java开发 基于数据库增量日志解析 提供增量数据订阅 消费 目前主要支持了MySQL 也支持mariaDB 准备 配置MySQL的binlog 常见的binlog命令 是