使用TCP方式拉取Canal数据

2023-11-11

1 Canal对接Kafka联调

1.1 配置修改

canal.properties

修改 zk:

canal.zkServers = 10.51.50.219:2181

instance.properties

开启配置项:

canal.mq.dynamicTopic 是 Canal 的 MQ 动态 Topic 配置项:

  • test_javaedge_01 是kafka 的 topic
  • test_db.users 要监控的数据库、表
  • test_db.users 表发生变化时,Canal 将会把变化的数据推送到名为 test_javaedge_01:test_db.users 的 MQ Topic 中。
canal.mq.dynamicTopic=test_javaedge_01:test_db\\.users

开启一个消费者

[root@javaedge-kafka-dev bin]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_javaedge_01

datagrip 新增数据:

消费到该数据:

2 使用TCP方式拉取Canal数据

现在 serverMode 改回tcp。重启

javaedge@JavaEdgedeMac-mini deployer % jps
71002 CanalLauncher
javaedge@JavaEdgedeMac-mini deployer %

canal 同步程序

package com.javaedge.canal;

import com.alibaba.fastjson.JSON;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.common.base.CaseFormat;

import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;

public class CanalClientApp {
    public static void main(String[] args) throws Exception {

        CanalConnector connector = CanalConnectors.newSingleConnector(
                new InetSocketAddress("localhost", 11111),
                "example",
                null, null);

        while (true) {
            connector.connect();
            connector.subscribe("test_db.users");
            Message message = connector.get(100);
            List<CanalEntry.Entry> entries = message.getEntries();
            if (entries.size()>0) {
                for (CanalEntry.Entry entry : entries) {
                    String tableName = entry.getHeader().getTableName();

                    CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                    List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
                    CanalEntry.EventType eventType = rowChange.getEventType();

                    if (eventType == CanalEntry.EventType.INSERT) {
                        for (CanalEntry.RowData rowData : rowDatasList) {
                            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                            HashMap<Object, Object> map = new HashMap<>();
                            for (CanalEntry.Column column : afterColumnsList) {
                                String key = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, column.getName());
                                map.put(key, column.getValue());
                            }
                            System.out.println("tableName=" + tableName + "  map=" + JSON.toJSONString(map));
                        }
                    }
                }

            }

        }

    }
}

运行程序。操作 user 数据表,新增一行数据:

程序输出:

显然,后续不管你想把数据同步到哪儿去,都完全自由!

数据链路

MySQL -》canal server(tcp)-》canal client-》kafka。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用TCP方式拉取Canal数据 的相关文章

  • 图的拓扑序列

    拓扑序列 拓扑序是按照点的先后顺序排列的 拓扑序列满足以下两点 1 每个顶点在序列中出现且只出现一次 2 若存在一条从顶点 A 到顶点 B 的路径 那么在序列中顶点 A 出现在顶点 B 的前面 拓扑序列只存在于有向无环图中 可以理解成一个将
  • replace和replaceAll区别

    1 replace的参数是char和CharSequence 即可以支持字符的替换 也支持字符串的替换 CharSequence即字符串序列的意思 说白了也就是字符串 2 replaceAll的参数是regex 即基于规则表达式的替换 比如
  • 浅谈一下Python的现状、发展前景以及Python的就业岗位(含趋势)

    在面对职业选择时我们难免纠结 徘徊 不知道去哪能走多远 我们没有办法只考虑当下 无视未来 当下Python工程师正处于人才需求旺盛 供应短缺的时期 工资一路上涨 假如 即便选择了目前火热的Python编程 那你需要先来了解一下Python的
  • 知识产权贯标补贴政策查询大全及怎么申请,奖励20万

    华夏泰科小编认为知识产权贯标旨在促进企业知识产权创造 运用 保护能力提高 使管理工作制度化 规范化 常规化 整体推进全省企业知识产权管理水平有效提升 申请后可以提升企业无形资产价值 巩固企业市场地位 可向科技主管部门申请战略推进项目 专利实

随机推荐

  • win10下设置maven环境变量

    一 先去maven官网 http maven apache org download cgi 下载压缩包 下拉页面可以看到好多版本 注意下载的版本为红色标注版本 apache maven 3 5 0 bin zip 点击下载即可 二 将下载
  • JSR303(一) 简介

    1 简介 数据校验是任何一个应用程序都会用到的功能 无论是显示层还是持久层 通常 相同的校验逻辑会分散在各个层中 这样 不仅浪费了时间还会导致错误的发生 译注 重复代码 为了避免重复 开发人员经常会把这些校验逻辑直接写在领域模型里面 但是这
  • 关于$'\r': command not found错误的一点体会

    今天运行一个其他组开发的jar包 这个jar包由于运行参数是通过命令行的方式输入的 所以需要运行一个shell脚本来启动 启动脚本类似这样 bin bash jdbc driverClassName com mysql jdbc Drive
  • 力扣-->#剑指Offer 897 . 递增顺序搜索树(E)

    class Solution TreeNode curr public TreeNode increasingBST TreeNode root TreeNode first new TreeNode 1 用first 来记录curr的初始
  • 途客圈创业记--读书笔记

    一 初创公司股权结构 2011年6月公司创立 自筹启动资金50万 1 陈天和Alex 每人出资25万元 总计50万元 作为启动资金 2 陈天 CTO兼董事长 股份60 因为是想法的发起人 且在实现这个想法的过程中 Alex CEO 股份40
  • 【C++学习第三讲】C++语句

    目录 C 语句 一 引入 二 声明语句和变量 1 为什么变量必须声明 三 赋值语句 四 cout新花样 五 cout和printf 六 其他C 语句 1 使用cin 2 使用cout进行拼接 七 类简介 一 引入 C 程序是一组函数 而每个
  • Qt::FramelessWindowHint无边框化,移动,大小调整

    QT工作笔记壹 导读 开发环境 QT界面无边框 方法1 方法2 引用 导读 最近工作一个项目需要用QT设计一个UI 看了一下目前主流商业化UI 例如扣扣 微信 网易云音乐 结合个人审美 本人非美术专业 但游戏经验丰富 对人机交互界面有个人看
  • 为什么有了uwsgi 还要 nginx 服务器

    相信每一个使用nginx uwsgi django部署过的人 都感到非常复杂 到底为什么一个项目的发布要经过这么多层级 他们每一层有什么理由存在 这就带大家宏观地看待一下 首先nginx 是对外的服务接口 外部浏览器通过url访问nginx
  • MyBatis 的执行流程

    前言 MyBatis可能很多人都一直在用 但是MyBatis的SQL执行流程可能并不是所有人都清楚了 那么既然进来了 通读本文你将收获如下 1 Mapper接口和映射文件是如何进行绑定的 2 MyBatis中SQL语句的执行流程 3 自定义
  • Js Vue 获取当月第一天、最后一天、当天

    new Date 效果 2023 06 12 注 本文示例以获取当天为例 一 new Date 在vue中使用new Date 获取当月第一天 最后一天 当天 二 使用步骤 1 定义方法 代码如下 示例 param type 0 第一天 1
  • 如何在Gephi中可视化多层网络

    要借助一个插件实现 Isometric Layout 感谢插件作者对gephi社群提供的贡献 作者网页的图片 下载 url https gephi org plugins plugin isometric layout 点击这儿下载再在Ge
  • centos8.5.111安装mysql8.0

    修改网络源 Connecting to 192 168 182 154 22 Connection established To escape to local shell press Ctrl Alt Activate the web c
  • 关于CSS3:justify-self,justify-items和justify-content之间的区别

    这篇文章应该能帮到你 https www codenong com 48535585 总结 flex布局 这三个属性中 只能用justify content属性 1 justify content 2 justify items 3 jus
  • LeetCode: 高频树结构题目总结 - Python

    LeetCode 高频树结构题目总结 问题描述 LeetCode 98 验证二叉搜索树 根据中序遍历 判断大小 LeetCode 99 恢复二叉搜索树 搜索二叉树有两个节点搞错了 恢复好 LeetCode 100 相同的树 LeetCode
  • MyCobot六轴机械臂(六)--Myblockly模块简介

    1 Logic模块 如图3 11所示 表示if 条件 do 程序1 else 程序2 若满足条件则执行程 序1 否则执行程序2 所表示方法的详细讲解可查看图1 2下方的文字讲解 所表示的逻辑判断 返回值为true或者false 可以点击 中
  • spring源码研究之IoC容器在web容器中初始化过程

    前段时间在公司做了一个项目 项目用了spring框架实现 WEB容器是Tomct 5 虽然说把项目做完了 但是一直对spring的IoC容器在web容器如何启动和起作用的并不清楚 所以就抽时间看一下spring的源代码 借此了解它的原理 我
  • 经典SQL题目-求第N高的薪水的解法汇总及知识点复习

    这几天在看Leetcode的时候逐步开始留意SQL题目 不做不知道 一做才感觉自己的SQL太弱了 因此将一道经典题目 求第N高的薪水的解法进行汇总 MySQL 相关解法的原文链接已标注在文末 题目的链接为 第N高的薪水 一 题干 第N高的薪
  • [云原生专题-45]:Kubesphere云治理-基于Kubernetes 构建的企业级容器平台简介与总体架构

    作者主页 文火冰糖的硅基工坊 文火冰糖 王文兵 的博客 文火冰糖的硅基工坊 CSDN博客 本文网址 https blog csdn net HiWangWenBing article details 122905834 目录 前言 第1章
  • pip换源+更改默认安装位置

    本文档创建于2023年3月9日 本文记录了pip换源和更改默认安装位置的操作 主要用于pip的一些配置 方便下载和文件管理 pip换源 使用pip安装库时 如果用默认的库经常会遇到连接不上或下载慢的问题 更换为国内的库下载会更快 临时换源
  • 使用TCP方式拉取Canal数据

    1 Canal对接Kafka联调 1 1 配置修改 canal properties 修改 zk canal zkServers 10 51 50 219 2181 instance properties 开启配置项 canal mq dy