Canal实时监控案例

2023-05-16

Canal实时监控案例

文章目录

  • Canal实时监控案例
    • 0. 写在前面
    • 1. TCP 模式测试
      • 1.1 IDEA创建项目canal-module
    • 1.2 通用监视类——CanalClient
      • 1.2.1 Canal 封装的数据结构
      • 1.2.2 在 canal-module 模块下创建 cn.canal 包,并在该包下创建 CanalClient.java文件
    • 2. Kafka 模式测试


0. 写在前面

  • Canal版本:Canal-1.1.5
  • Kafka版本:Kafka-2.4.1
  • Zookeeper版本:Zookeeper-3.5.7

解压安装canal的tar.gz包之前,提前创建一个目录canal-x.x.x作为canal的安装目录,因为canal解压后是分散

1. TCP 模式测试

1.1 IDEA创建项目canal-module

编辑pom.xml文件:添加以下依赖

<dependencies>
    <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.client</artifactId>
        <version>1.1.2</version>
    </dependency>

    <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.protocol</artifactId>
        <version>1.1.5</version>
    </dependency>

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.4.1</version>
    </dependency>
</dependencies>

V1.1.5版本需要多添加canal.protocol这个依赖,如果是V1.1.2就不需要

1.2 通用监视类——CanalClient

1.2.1 Canal 封装的数据结构

Message:一次canal从日志中抓取的信息,一个message可以包含多个sql执行的结果

在这里插入图片描述

1.2.2 在 canal-module 模块下创建 cn.canal 包,并在该包下创建 CanalClient.java文件

代码如下:

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;

import java.net.InetSocketAddress;
import java.util.List;


public class CanalClient {

    public static void main(String[] args) throws InvalidProtocolBufferException, InterruptedException {
        // TODO 获取连接
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("node01", 11111),"example", "", "");

        while (true) {
            // TODO 连接
            canalConnector.connect();
            // TODO 订阅数据库test_canal
            canalConnector.subscribe("test_canal.*");
            // TODO 获取指定数量的数据
            Message message = canalConnector.get(100);
            // TODO 获取Entry集合
            List<CanalEntry.Entry> entries = message.getEntries();

            //TODO 判断集合是否为空,如果为空,则等待一会继续拉取数据
            if (entries.size() <= 0) {
                System.out.println("当次抓取没有数据,休息一会----------------");
                Thread.sleep(1000);
            } else {
                // TODO 遍历entries,单条解析
                for (CanalEntry.Entry entry : entries) {
                    // 1.获取表名
                    String tableName = entry.getHeader().getTableName();
                    // 2.获取类型
                    CanalEntry.EntryType entryType = entry.getEntryType();
                    / /3.获取序列化后的数据
                    ByteString storeValue = entry.getStoreValue();

                    //4.判断当前entryType类型是否为ROWDATA
                    if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {
                        // 5.反序列化数据
                        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
                        // 6.获取当前事件的操作类型
                        CanalEntry.EventType eventType = rowChange.getEventType();
                        // 7.获取数据集
                        List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();

                        // 8.遍历rowDataList,并打印数据集
                        for (CanalEntry.RowData rowData : rowDataList) {
                            // 之前的数据
                            JSONObject beforeData = new JSONObject();
                            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                            for (CanalEntry.Column column : beforeColumnsList) {
                                beforeData.put(column.getName(), column.getValue());
                            }
                            // 之后的数据
                            JSONObject afterData = new JSONObject();
                            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                            for (CanalEntry.Column column : afterColumnsList) {
                                afterData.put(column.getName(), column.getValue());
                            }
                            // 数据打印(控制台|Kafka)
                            System.out.println("Table:" + tableName +
                                    ",EventType:" + eventType +
                                    ",Before:" + beforeData +
                                    ",After:" + afterData);
                        }
                    } else {
                        System.out.println("当前操作类型为:" + entryType);
                    }
                }
            }
        }
    }
}

开启canal,运行CanalClient查程序,对订阅的数据库canal_test下的表进行增删改操作,同时观察控制台的输出情况

  • 增加数据

单词插入一条数据

insert into user_info values('1001', 'zss', 'male');

在这里插入图片描述

一条sql影响多行

insert into user_info values('1002', 'lisi', 'female'),('1001', 'zss', 'male');

在这里插入图片描述

  • 修改数据

在这里插入图片描述

  • 删除数据

在这里插入图片描述

2. Kafka 模式测试

  • 修改 canal.properties 中 canal 的输出 model,默认 tcp,改为输出到kafka
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = kafka
  • 修改 Kafka 集群的地址
##################################################
#########                    Kafka                   #############
##################################################
kafka.bootstrap.servers = node01:9092,node02:9092,node03:9092
  • 修改 instance.properties 输出到 Kafka 的主题(canal_test)以及分区数
# mq config
canal.mq.topic=canal_test
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6

注意:默认还是输出到指定 Kafka 主题的一个 kafka 分区,因为多个分区并行可能会打乱 binlog 的顺序, 如果要提高并行度, 首先设置 kafka 的分区数>1, 然后设置 canal.mq.partitionHash 属性

  • 启动Canal
[zhangsan@node01 example]$ cd /opt/module/canal/ 
[zhangsan@node01 example]$  bin/startup.sh
  • 看到 CanalLauncher 你表示启动成功,同时会创建 canal_test 主题
[zhangsan@node01 example]$ jps 
2269 Jps
2253 CanalLauncher
  • 启动 Kafka 消费客户端测试,查看消费情况
	[zhangsan@node01 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server node01:9092 --topic canal_test
  • 向 MySQL 中插入|修改|删除数据后查看消费者控制台

Kafka 消费者控制台

  • 增加数据

单词插入一条数据

insert into user_info values('1001', 'zss', 'male');

在这里插入图片描述

一条sql影响多行

insert into user_info values('1002', 'lisi', 'female'),('1001', 'zss', 'male');
update user_info 

在这里插入图片描述

  • 修改数据

tp

  • 删除数据

在这里插入图片描述

结束!

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Canal实时监控案例 的相关文章

  • 校园论坛(Java)—— 校园周边模块

    校园论坛 Java 校园周边模块 文章目录 校园论坛 Java 校园周边模块 1 写在前面 2 系统结构设计 2 1 各个页面之间的调用关系 2 2 校园周边页面设计 3 校园周边模块设计 3 1 校园周边主界面的实现 3 2 增加附近的交
  • 校园论坛(Java)—— 结束篇

    校园论坛 Java 结束篇 文章目录 校园论坛 Java 结束篇 1 写在前面 2 系统总体设计 2 1 设计流程 2 2 各个页面之间的调用关系 3 系统实现的可行性 4 系统制作的局限性 5 总结 6 项目代码 1 写在前面 Windo
  • Windows远程连接Redis(Linux)

    Windows远程连接Redis xff08 Linux xff09 文章目录 Windows远程连接Redis xff08 Linux xff09 1 写在前面2 配置redis conf3 启动Redis3 1 开启redis服务3 2
  • 批量数据导入Neo4j的方式

    批量数据导入Neo4j的方式 文章目录 批量数据导入Neo4j的方式1 写在前面2 前置芝士3 CSV数据导入Neo4j3 1 LOAD CSV Cypher命令3 2 neo4j admin命令3 3 Kettle导入工具 4 数据导入失
  • Neo4j的Java API操作

    Neo4j的Java API操作 文章目录 Neo4j的Java API操作0 写在前面1 前置芝士2 准备工作2 1 为项目引入Neo4j依赖2 2 启动和停止 3 Java操作Neo4j4 参考资料 0 写在前面 Linux版本 xff
  • NoSQL数据库原理与应用综合项目——起始篇

    NoSQL数据库原理与应用综合项目 起始篇 文章目录 NoSQL数据库原理与应用综合项目 起始篇 0 写在前面 1 项目说明 1 1 项目背景 1 2 项目功能 2 数据集和数据预处理 2 1 数据集 2 2 数据预处理 2 2 1 图书出
  • android -- 蓝牙 bluetooth (三)搜索蓝牙

    接上篇打开蓝牙继续 xff0c 来一起看下蓝牙搜索的流程 xff0c 触发蓝牙搜索的条件形式上有两种 xff0c 一是在蓝牙设置界面开启蓝牙会直接开始搜索 xff0c 另一个是先打开蓝牙开关在进入蓝牙设置界面也会触发搜索 xff0c 也可能
  • 单源最短路径问题——分支限界法(Java)

    单源最短路径问题 分支限界法 xff08 Java xff09 文章目录 单源最短路径问题 分支限界法 xff08 Java xff09 1 前置芝士1 1 分支限界法求解目标1 2 分支限界法引言1 3 分支限界法基本思想1 4 两种典型
  • 符号三角形问题(Java)

    符号三角形问题 xff08 Java xff09 文章目录 符号三角形问题 xff08 Java xff09 1 前置介绍2 算法设计3 程序代码4 算法效率5 参考资料 1 前置介绍 符号三角形定义 如下图所示 xff0c 符号三角形是由
  • 装载问题 ——分支限界法(Java)

    装载问题 分支限界法 xff08 Java xff09 文章目录 装载问题 分支限界法 xff08 Java xff09 1 问题描述2 算法设计3 算法的改进4 程序代码5 参考资料 1 问题描述 有一批共n个集装箱要装上2艘载重量分别为
  • 装载问题 ——回溯法(Java)

    装载问题 回溯法 xff08 Java xff09 文章目录 装载问题 回溯法 xff08 Java xff09 1 问题描述1 1 装载问题1 2 转换问题 2 算法设计2 1 可行性约束函数2 2 上界函数2 3 解空间树2 4 剪枝函
  • 上传项目代码到Github|Gitee

    上传项目代码到Github Gitee 文章目录 上传项目代码到Github Gitee1 前置准备1 1 Git 安装1 2 在 Git 中设置用户名1 2 1 为计算机上的每个存储库设置 Git 用户名1 2 2 为一个仓库设置 Git
  • NoSQL数据库原理与应用综合项目——HBase篇

    NoSQL数据库原理与应用综合项目 HBase篇 文章目录 NoSQL数据库原理与应用综合项目 HBase篇 0 写在前面 1 本地数据或HDFS数据导入到HBase 2 Hbase数据库表操作 2 1 Java API 连接HBase 2
  • NoSQL数据库原理与应用综合项目——MongoDB篇

    NoSQL数据库原理与应用综合项目 MongoDB篇 文章目录 NoSQL数据库原理与应用综合项目 MongoDB篇 0 写在前面 1 本地数据或HDFS数据导入到MongoDB 2 MongoDB数据库表操作 2 1 Java API 连
  • NoSQL数据库原理与应用综合项目——Redis篇

    NoSQL数据库原理与应用综合项目 Redis篇 文章目录 NoSQL数据库原理与应用综合项目 Redis篇 0 写在前面 1 本地数据或HDFS数据导入到Redis 2 Redis数据库表操作 2 1 Java API 连接Redis 2
  • NoSQL数据库原理与应用综合项目——Neo4j篇

    NoSQL数据库原理与应用综合项目 Neo4j篇 文章目录 NoSQL数据库原理与应用综合项目 Neo4j篇 0 写在前面 1 本地数据或HDFS数据导入到Neo4j 2 Neo4j数据库表操作 2 1 使用Python连接Neo4j 2
  • Hadoop综合项目——二手房统计分析(起始篇)

    Hadoop综合项目 二手房统计分析 起始篇 文章目录 Hadoop综合项目 二手房统计分析 起始篇 0 写在前面 1 项目背景与功能 1 1 项目背景 1 2 项目功能 2 数据集和数据预处理 2 1 数据集 2 2 数据预处理 2 2
  • android -- 蓝牙 bluetooth (四)OPP文件传输

    在前面android 蓝牙 bluetooth xff08 一 xff09 入门文章结尾中提到了会按四个方面来写这系列的文章 xff0c 前面已写了蓝牙打开和蓝牙搜索 xff0c 这次一起来看下蓝牙文件分享的流程 xff0c 也就是蓝牙应用
  • Hadoop综合项目——二手房统计分析(MapReduce篇)

    Hadoop综合项目 二手房统计分析 MapReduce篇 文章目录 Hadoop综合项目 二手房统计分析 MapReduce篇 0 写在前面 1 MapReduce统计分析 1 1 统计四大一线城市房价的最值 1 2 按照城市分区统计二手
  • Hadoop综合项目——二手房统计分析(Hive篇)

    Hadoop综合项目 二手房统计分析 Hive篇 文章目录 Hadoop综合项目 二手房统计分析 Hive篇 0 写在前面 1 Hive统计分析 1 1 本地数据 HDFS数据导入到Hive 1 2 楼龄超过20年的二手房比例 1 3 四大

随机推荐

  • Hadoop综合项目——二手房统计分析(可视化篇)

    Hadoop综合项目 二手房统计分析 可视化篇 文章目录 Hadoop综合项目 二手房统计分析 可视化篇 0 写在前面 1 数据可视化 1 1 二手房四大一线城市总价Top5 1 2 统计各个楼龄段的二手房比例 1 3 统计各个城市二手房标
  • Git Bash Here和RStudio软件的问题解决

    Git Bash Here和RStudio软件的问题解决 文章目录 Git Bash Here和RStudio软件的问题解决0 写在前面1 Git软件在任务栏图标空白2 RStudio软件2 1 警告信息InormalizePath pat
  • 算法的复杂性分析

    算法的复杂性分析 文章目录 算法的复杂性分析0 算法评价的基本原则1 影响程序运行时间的因素2 算法复杂度2 1 算法的时间复杂度2 2 渐进表示法2 2 1 运行时间的上界2 2 运行时间的下界2 2 3 运行时间的准确界 3 总结4 参
  • 整数划分问题(Java递归)

    整数划分问题 xff08 Java递归 xff09 文章目录 整数划分问题 xff08 Java递归 xff09 0 问题描述1 递归式2 代码3 参考 0 问题描述 整数划分问题 将正整数n表示成一系列正整数之和 xff1a n 61 n
  • 快速排序(Java分治法)

    快速排序 xff08 Java分治法 xff09 文章目录 快速排序 xff08 Java分治法 xff09 0 分治策略1 思路步骤2 代码3 复杂度分析3 1 最好情况3 2 最坏情况3 3 平均情况3 4 性能影响因素 4 合并排序V
  • 动态规划算法

    动态规划算法 文章目录 动态规划算法0 动态规划的思想方法1 动态规划法的设计思想2 动态规划基本步骤3 动态规划算法设计步骤3 1 动态规划算法的基本要素 4 两种实现的比较5 备忘录方法6 备忘录方法与动态规划比较7 参考 0 动态规划
  • 最长公共子序列(LCS)

    最长公共子序列 xff08 LCS xff09 文章目录 最长公共子序列 xff08 LCS xff09 0 写在前面1 问题描述2 最长公共子序列的结构3 子问题的递归结构4 计算最优值5 算法的改进6 参考 0 写在前面 本文文字大都来
  • 贪心算法(Java)

    贪心算法 文章目录 贪心算法0 写在前面1 贪心算法的基本要素1 1 贪心选择性质1 2 最优子结构性质1 3 贪心算法与动态规划算法的差异 2 贪心算法的特点3 贪心法的正确性证明4 活动安排问题4 1 问题描述4 2 贪心法的设计思想4
  • android -- 蓝牙 bluetooth (五)接电话与听音乐

    前段时间似乎所有的事情都赶在一起 xff0c 回家 集体出游 出差 xff0c 折腾了近一个月 xff0c 终于算暂时清静了 xff0c 但清静只是暂时 xff0c 估计马上又要出差了 xff0c 所以赶紧把蓝牙这一部分的文章了结下 xff
  • 保研/考研复试-数据结构

    数据结构 1 时间复杂度 xff1a 是一个描述算法性能的函数 xff0c 可以定性的描述算法的运行时间 是评价算法优劣的重要指标 2 快速找到第K个数 1 xff09 可以使用快排的思想 xff0c 因为快排每次可以找到一个第i位置的数
  • Github高效搜索方式

    Github高效搜索方式 文章目录 Github高效搜索方式0 写在前面1 常用的搜索功能1 1 直接搜索1 2 寻找指定用户 大小的仓库1 3 搜索仓库1 4 查找特定star范围的仓库1 5 查找指定主题1 6 查找仓库语言1 7 搜索
  • 电子游戏销售之缺失值检测与处理

    电子游戏销售之缺失值检测与处理 文章目录 电子游戏销售之缺失值检测与处理 0 写在前面 1 数据缺失值预处理 1 1 表的形状 1 2 原始数据每个特征缺失和非缺失的数目 1 3 每个特征缺失的率 1 4 处理后各特征缺失值的数目 1 5
  • 电子游戏销售之回归模型与数据可视化

    电子游戏销售之回归模型与数据可视化 文章目录 电子游戏销售之回归模型与数据可视化0 写在前面1 回归模型1 1 模型建立准备1 2 建立模型1 3 模型分析 2 数据可视化3 参考资料 0 写在前面 该篇文章的任务包括以下3个方面 检测与处
  • 基于R的Bilibili视频数据建模及分析——预处理篇

    基于R的Bilibili视频数据建模及分析 预处理篇 文章目录 基于R的Bilibili视频数据建模及分析 预处理篇0 写在前面1 项目介绍1 1 项目背景1 2 数据来源1 3 数据集展示 2 数据预处理2 1 删除空数据2 2 增加id
  • 基于R的Bilibili视频数据建模及分析——变量相关性分析篇

    基于R的Bilibili视频数据建模及分析 变量相关性分析篇 文章目录 基于R的Bilibili视频数据建模及分析 变量相关性分析篇 0 写在前面 1 数据分析 1 1 变量相关性分析 1 2 单元数据直观展示 1 3 多元数据直观展示 2
  • 基于R的Bilibili视频数据建模及分析——聚类分析篇

    基于R的Bilibili视频数据建模及分析 聚类分析篇 文章目录 基于R的Bilibili视频数据建模及分析 聚类分析篇 0 写在前面 1 数据分析 1 1 聚类分析 1 2 聚类统计 1 3 系统聚类 1 4 Kmeans与主成分分析 2
  • 基于R的Bilibili视频数据建模及分析——建模-因子分析篇

    基于R的Bilibili视频数据建模及分析 建模 因子分析篇 文章目录 基于R的Bilibili视频数据建模及分析 建模 因子分析篇 0 写在前面 1 数据分析 1 1 建模 因子分析 1 2 对数线性模型 1 3 主成分分析 1 4 因子
  • 大数据技术之Maxwell基础知识

    大数据技术之Maxwell基础知识 文章目录 大数据技术之Maxwell基础知识0 写在前面1 Maxwell 概述1 1 Maxwell 定义1 2 Maxwell 工作原理1 2 1 MySQL 主从复制过程1 2 2 Maxwell
  • 大数据技术之Maxwell入门案例学习

    大数据技术之Maxwell入门案例学习 文章目录 大数据技术之Maxwell入门案例学习1 写在前面2 Maxwell 使用2 1 Maxwell 安装部署2 2 Maxwell 入门案例2 2 1 监控 Mysql 数据并在控制台打印2
  • Canal实时监控案例

    Canal实时监控案例 文章目录 Canal实时监控案例0 写在前面1 TCP 模式测试1 1 IDEA创建项目canal module 1 2 通用监视类 CanalClient1 2 1 Canal 封装的数据结构1 2 2 在 can