Flink CDC (Mysql为例)

2023-05-16

背景

业务中经常出现一些千万乃至亿级别的大表,此时可能考虑分库分表(Sharding-JDBC、MyCat等方案),也常同步数据进入ES中;同步数据这一业务场景中,Flink CDC是一个很不错的解决方案。

方案

如mysql、postgresql、sqlserver等,flink cdc通过读取binlog日志(注意:请先开启binlog日志),进行数据同步,实时性较好。

对数据的解析和消费进行了二次封装,使用者只需增加简单的配置,实现FlinkConsumerListener接口,关注编写业务代码即可。

代码

show coding

flink: flink cdc 暂时支持mysql

测试demo

创建一个springboot项目

依赖引入(引入上述工程打包后的依赖)

        <dependency>
            <groupId>com.kwin</groupId>
            <artifactId>flink</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>

配置文件

flink:
pipeline-name: flinkCDCTest
mysqlDataSource:
- port: 3306
hostname: 127.0.0.1
databaseList:
- flinktest
tableList:
- flinktest.student
username: root
password: 123456

如上,针对flinktest数据库的student表进行binlog监听。

flinktest.student的消费者

student实体

import lombok.Data;

/**
 * @author kwin
 * @Date 2022/7/25 18:27
 **/
@Data
public class Student {
    private Long id;

    private String name;

    private Integer age;

    private Integer maxInx;
}

消费者

import com.kwin.demo.server.module.flink.test.entity.Student;
import com.kwin.flink.sink.FlinkConsumerListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
 * @author kwin
 * @Date 2022/7/25 18:29
 **/
@Slf4j
@Component
public class StudentConsumerListener implements FlinkConsumerListener<Student> {
    @Override
    public String getDBName() {
        return "flinktest";
    }

    @Override
    public String getTable() {
        return "student";
    }

    @Override
    public void insert(Student data) {
        System.out.println("insert: " + data);
    }

    @Override
    public void update(Student srcData, Student destData) {
        System.out.println("update: 
src:" + srcData + "
dest:" + destData);
    }

    @Override
    public void delete(Student data) {
        System.out.println("delete:"+data);
    }
}

启动项目

flinktest.student修改数据时:

flinktest.student插入数据时:

flinktest.student删除数据时:

如上,使用者只需实现FlinkConsumerListener接口,即可对指定表的数据进行消费和业务逻辑操作。

最后

深知大多数初中级Java工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则近万的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!

因此收集整理了一份《Java开发全套学习资料》送给大家,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友,同时减轻大家的负担。

小编已加密:aHR0cHM6Ly9kb2NzLnFxLmNvbS9kb2MvRFVrVm9aSGxQZUVsTlkwUnc==出于安全原因,我们把网站通过base64编码了,大家可以通过base64解码把网址获取下来。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flink CDC (Mysql为例) 的相关文章

  • 使用 Python 2.7 和 MySQLdb 将二进制数据插入 MySQL 中的 blob 列时如何避免编码警告

    我在将二进制数据插入到longblob使用 Python 2 7 中的 MySQLdb 在 MySQL 中的列 但我收到一个编码警告 我不知道如何解决 test py 11 Warning Invalid utf8 character st
  • 语言翻译语法

    我正在尝试为我的项目添加另一种语言 我们知道语言可以表现出主语和谓语的差异 例如 英语 Mustafa和他的朋友去看电影ahmet today 土耳其 Mustafa布昆 阿卡达西ahmetile birlikte sinemaya git
  • 从多个表中选择 - 一对多关系

    我有这样的表 表产品 身份证 姓名 表格图像 产品 ID 网址 订单号 表价 产品 ID 组合 货币 价格 表数量 产品 ID 组合 数量 表 Product 与其他表是一对多关系 我需要查询表并得到类似这样的结果 伪数组 ProductI
  • PHP:使用输入和输出参数(不是“INOUT”)调用 MySQL 存储过程

    我想从 PHP 调用 MySQL 中的存储过程 该过程需要输入and输出参数 not INOUT 参数 举一个简单的例子 假设我在 MySQL 中有以下存储过程 DELIMITER DROP PROCEDURE IF EXISTS test
  • UNIX时间记录时区吗?

    我想问一下UNIX时间 UNIX时间是否记录时区 我将托管从美国芝加哥移至 JST 问题是我的整个 MySQL 数据库都有 UNIX 时间 芝加哥 美国时区 的记录 我有一个 PHP 代码来显示之前的时间 例如 3 天前 昨天等 当我搬到新
  • MySQL中是否有类似Oracle中“level”的函数[重复]

    这个问题在这里已经有答案了 我面临一个场景 如果输入是 10 我想要一个数字序列 1 2 3 10 在甲骨文中levelfunction 提供了该功能 我想知道如何在 MySQL 中执行相同的任务 谢谢 您可以在 mysql 中使用此查询
  • PDO::commit 之后使用 PDOStatement::rowCount 结果?

    在 MySQL 文档中 有一个关于使用的注释mysql affected rows事务提交后 http php net manual en function mysql affected rows php http php net manu
  • 为 Mariadb 安装连接器 C

    所以 我想使用 Mariadb 有一个连接器 C https downloads mariadb org connector c https downloads mariadb org connector c 我该如何安装它 坦白说 它的文
  • 是否可以使用“WHERE”子句来选择SQL语句中的所有记录?

    晚上好 我很好奇是否可以在 SQL 语句中创建一个 WHERE 子句来显示所有记录 下面一些解释 随机 SQL 语句 Java JSP示例 正常情况 String SqlStatement SELECT FROM table example
  • 执行许多插入重复键更新错误:未使用所有参数

    所以我一直在尝试使用 python 2 7 15 使用 mysql connector 执行此查询 但由于某种原因 它似乎不起作用并且总是返回错误 并非所有参数都被使用 表更新有一个主键 即 ID 这是我尝试运行此 SQL 的查询 sql
  • 在mysql中搜索“SanF”时获取旧金山的记录

    当我搜索 SanF 时获得 San Francisco 记录 SELECT FROM table WHERE col LIKE san Works SELECT FROM table WHERE col LIKE san F Works S
  • 从数据库生成 XML 时出现 PHP 编码错误 [关闭]

    很难说出这里问的是什么 这个问题是含糊的 模糊的 不完整的 过于宽泛的或修辞性的 无法以目前的形式得到合理的回答 如需帮助澄清此问题以便重新打开 访问帮助中心 help reopen questions 我正在尝试获取一个简单的 PHP 服
  • 选择MySql表数据放入数组中

    我尝试从 mysql 捕获数据并将它们全部放入数组中 认为 users table id name code 1 gorge 2132 2 flix ksd02 3 jasmen skaod2 sql mysql query select
  • ejabberd 16.06 + mysql 5.5.50,消息历史记录不保存

    我使用ejabberd 16 06 mysql 5 5 50 消息历史记录没有保存 我的 ejabberd yml MySQL server odbc type mysql odbc server freldo odbc port 3306
  • 如何使用实体框架设置连接字符串

    我将 EF6 与 MySQL 结合使用 并有一个用于多个数据库的模型 我希望能够在我的表单中设置连接设置 如何以编程方式设置模型的连接字符串 你应该使用EntityConnectionFactory这就是您所需要的 public strin
  • PDO::commit() 成功或失败

    The PHP PDO 提交 http www php net manual en pdo commit php文档指出该方法成功时返回 TRUE 失败时返回 FALSE 这是指beginTransaction 和commit 之间的语句执
  • Sql:计算随时间的增长

    我几周前发布了这个问题 但我认为我没有清楚地提出这个问题 因为我得到的答案不是我想要的 我认为最好重新开始 我正在尝试查询数据库以检索一段时间内唯一条目的数量 数据看起来像这样 Day UserID 1 A 1 B 2 B 3 A 4 B
  • 使用MySQL计算单个表中借方和贷方的余额

    下面的 MySQL 表包含带有关联金额的借方或贷方 操作 如何选择具有非零 余额 的所有 CLIENT ID 我尝试将表连接到自身以计算所有借方和贷方总额 但有些东西无法正常工作 CLIENT ID ACTION TYPE ACTION A
  • 如何在Sequelize中从主模型同一级别的包含模型返回结果?

    这是我在项目中完成的代码和结果 我想获得包含模型的结果与主模型相同的结果 下面的代码是我所做的 序列化查询 User findAll include model Position attributes POSITION NAME then
  • 如何使用 PHP 从 MySQL 检索特定值?

    好吧 我已经厌倦了 过去一周我花了大部分空闲时间试图解决这个问题 我知道 SQL 中的查询已更改 但我无法弄清楚 我能找到的所有其他帖子似乎都已经过时了 如果有人能帮助我 我将非常感激 我想做的就是使用手动输入数据库的唯一 密码 来检索行的

随机推荐

  • 使用 Docker/LXC 迅速启动一个桌面系统

    原文出处 xff1a vpsee Docker是 dotCloud 的一个开源引擎 xff0c 旨在提供一种应用程序的自动化部署解决方案 xff0c 简单的说就是 xff0c 在 Linux 系统上迅速创建一个容器 xff08 类似虚拟机
  • [统计学笔记] 统计学计算题选讲(精华)

    统计学计算题选讲 第 1 题 某班级学生物理课程考试成绩分别为 68 89 88 84 86 87 75 73 72 68 75 82 97 58 81 54 79 76 95 76 71 60 90 65 76 72 76 85 89 9
  • Python爬取CSDN博客所有文章

    需求 Python爬取某个账号CSDN博客所有文章的标题 xff0c 类型 xff0c 创建时间 xff0c 阅读数量 xff0c 并将结果保存至Excel 分析 CSDN主页URL为 xff1a https blog csdn net s
  • 乐鑫ESP32-C3项目(8)- USB串口和JTAG控制器

    摘录自参考手册之 23 USB串口 JTAG控制器 可用于烧录芯片外部flash 读取程序输出的数据 JTAG调试 仅占用2个管脚接电脑USB即可 xff0c 无需其他转换器 包含CDC ACM xff08 通信设备类抽象控制模型 xff0
  • launch 文件解析

    roslaunch工具是ros中python实现的程序启动工具 xff0c 通过读取launch文件中的参数配置 属性配置等来启动一系列节点 xff1b 很多ROS包或源码包中都有launch文件 xff0c 一般为该程序包能够运行起来的基
  • git 下载特定分支

    1 esp32上有2个分支 现在要下载特定的分支 git clone b esp32 homekit ble ssh liuyuhai 64 gerrit yeedev com 29418 yeelink firmware esp32 2
  • Docker: Ubuntu使用VNC运行基于Docker容器里的桌面系统

    https hub docker com r dorowu ubuntu desktop lxde vnc https github com fcwu docker ubuntu vnc desktop docker ubuntu vnc
  • JavaScript学习--splice()函数详解

    splice 函数详解 splice 方法向 从数组中添加 删除项目 xff0c 然后返回被删除的项目 注释 xff1a 该方法会改变原始数组 参数 xff1a index 必需 整数 xff0c 规定添加 删除项目的位置 xff0c 使用
  • docker查看容器IP的方法

    1 进入容器内部后 cat etc hosts 会显示自己以及 link 软连接的容器IP 2 使用命令 docker inspect docker inspect f 39 range NetworkSettings Networks I
  • java下载需要oracle账户

    https www oracle com java technologies javase downloads html 目前在官网下载低于jdk1 8的java jdk的时候需要登陆 xff0c 这边分享一个账号 xff0c 方便下载 账
  • uni-app网络请求封装(完整版)

    目录结构 根目录开始 api 所有请求 user js 用户请求api store 全局store管理 modules api模块 user js 用户请求模块 index js 状态管理初始化 utils 全局公用方法
  • vue+flv.js+SpringBoot+websocket实现视频监控与回放

    vue 43 flv js 43 SpringBoot 43 websocket实现视频监控与回放 需求 vue 43 springboot的项目 需要在页面展示出海康的硬盘录像机连接的摄像头的实时监控画面以及回放功能 之前项目里是纯前端实
  • ERROR 1064 (42000) You have an error in your SQL syntax; check the manual that corresponds to your

    在MySQL中导入 sql文件时 通过 use data source C info sql use data 数据库名称为data source C info sql 提前把 sql文件放入一个不太复杂的文件夹 xff0c 路径中最好不要
  • vue中实现路由跳转的三种方式(超详细整理)

    vue中实现路由跳转的三种方式 一 使用vue router vue router 本质是一个第三方的包 用的时候需要下载 步骤 xff08 7步法 xff09 xff1a 下载vue router模块到当前工程 yarn add vue
  • docker删除镜像、容器命令

    所有镜像和容器都删除的命令 docker system prune a 查看镜像 docker images 删除单个镜像 docker rmi f lt 镜像id gt 删除所有镜像 xff0c 不删除容器 docker rmi dock
  • Linux更改文件名

    使用mv a b 就可以重命名了 mv move移动文件 xff08 延伸功能 xff1a 重命名 linux系统没有专门的重命名命名 xff09 基本格式 mv filename newname 转载至 xff1a https blog
  • Docker如何对镜像进行命名

    1 在创建镜像时直接给镜像加上名称 xff0c 如下命令 xff1a docker build t mydemo f DockerFile txt 2 当使用创建命令忘记加上镜像名称了 xff0c 此时使用 docker imges 查看镜
  • Linux 下三种方式设置环境变量

    1 在Windows 系统下 xff0c 很多软件安装都需要配置环境变量 xff0c 比如 安装 jdk xff0c 如果不配置环境变量 xff0c 在非软件安装的目录下运行javac 命令 xff0c 将会报告找不到文件 xff0c 类似
  • nvm详解(mac环境nvm安装步骤及踩坑问题)

    1 定义 nvm xff0c 全称 Node Version Manager xff0c 也就是node版本控制 xff1b 它是一个命令行应用 xff0c 可以协助您快速地 更新 安装 使用 卸载 本机的全局 node js 版本 有时候
  • Flink CDC (Mysql为例)

    背景 业务中经常出现一些千万乃至亿级别的大表 xff0c 此时可能考虑分库分表 xff08 Sharding JDBC MyCat等方案 xff09 xff0c 也常同步数据进入ES中 xff1b 同步数据这一业务场景中 xff0c Fli