大数据技术之 Flink-CDC

2023-11-18

1 CDC简介

1.1 什么是 CDC

CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

1.2 CDC 的种类 

CDC 主要分为基于查询基于 Binlog 两种方式,我们主要了解一下这两种之间的区别:

基于查询的 CDC

基于 Binlog 的 CDC

开源产品

Sqoop、Kafka JDBC Source

Canal、Maxwell、Debezium

执行模式

Batch

Streaming

是否可以捕获所有数据变化

延迟性

高延迟

低延迟

是否增加数据库压力

1.3 Flink-CDC

Flink 社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL等数据库直接读取全量数据增量变更数据的 source 组件。目前也已开源,开源地址:https://github.com/ververica/flink-cdc-connectors

 2  Flink CDC 案例实操

2.1 DataStream 方式的应用

2.1.1 导入依赖

  <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.1.3</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.49</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>1.12.0</version>
        </dependency>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>2.0.0</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.75</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

2.1.2 编写代码

package com.atguigu;

import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FlinkCDC {

    public static void main(String[] args) throws Exception {

        //1.获取Flink 执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //1.1 开启CK
//        env.enableCheckpointing(5000);
//        env.getCheckpointConfig().setCheckpointTimeout(10000);
//        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//
//        env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/cdc-test/ck"));

        //2.通过FlinkCDC构建SourceFunction
        DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
                .hostname("hadoop102")
                .port(3306)
                .username("root")
                .password("000000")
                .databaseList("cdc_test")
//不配置表的话,就会监听库下所有的表的数据
//                .tableList("cdc_test.user_info")
                .deserializer(new StringDebeziumDeserializationSchema())
                .startupOptions(StartupOptions.initial())
                .build();
        DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction);

        //3.数据打印
        dataStreamSource.print();

        //4.启动任务
        env.execute("FlinkCDC");

    }

}

StartupOptions:
initial:
        第一次启动时 读取原表已有的历史数据, 操作类型为READ, 之后不断做检查点存储
        第二次启动时 一定要指明检查点文件的具体位置, 这样就可以断点续传; 即使Flink宕机了, 重启后是从上次offset开始读, 而不是latest
        检查点在打包部署后才有用, 因为那样才可以指明检查点的具体位置

earliest:
        从BinLog第一行数据开始读, 最好先给这个数据库加上BinLog后, 再去读取创建数据库

latest:
        读取最新变更数据, 从Flink程序启动后开始算

timestamp:
        可以从BinLog某一时刻的数据开始读

specificOffset:
        指明BinLog文件位置和从哪个offset开始读;

这个一般来说不怎么用, 因为本地没存offset的信息, 很难知道offset读到哪了

2.1.3 案例测试

2)开启 MySQL Binlog 并重启 MySQL

3)启动 Flink 集群

[atguigu@hadoop102 flink-standalone]$ bin/start-cluster.sh

4)启动 HDFS 集群

[atguigu@hadoop102 flink-standalone]$ start-dfs.sh

5)启动程序

[atguigu@hadoop102 flink-standalone]$ bin/flink run -c com.atguigu.FlinkCDC flink-1.0-SNAPSHOT-jar-with-dependencies.jar

6)在 MySQL 的 gmall-flink.z_user_info 表中添加、修改或者删除数据

7)给当前的 Flink 程序创建 Savepoint

[atguigu@hadoop102 flink-standalone]$ bin/flink savepoint JobId hdfs://hadoop102:8020/flink/save

8)关闭程序以后从 Savepoint 重启程序

[atguigu@hadoop102 flink-standalone]$ bin/flink run -s hdfs://hadoop102:8020/flink/save/... -c com.atguigu.FlinkCDC flink-1.0-SNAPSHOT-jar-with-dependencies.jar

2.2 FlinkSQL 方式的应用

2.2.1 代码实现

package com.atguigu;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class FlinkSQLCDC {

    public static void main(String[] args) throws Exception {

        //1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //2.使用FLINKSQL DDL模式构建CDC 表
        tableEnv.executeSql("CREATE TABLE user_info ( " +
                " id STRING primary key, " +
                " name STRING, " +
                " sex STRING " +
                ") WITH ( " +
                " 'connector' = 'mysql-cdc', " +
                " 'scan.startup.mode' = 'latest-offset', " +
                " 'hostname' = 'hadoop102', " +
                " 'port' = '3306', " +
                " 'username' = 'root', " +
                " 'password' = '000000', " +
                " 'database-name' = 'cdc_test', " +
                " 'table-name' = 'user_info' " +
                ")");

        //3.查询数据并转换为流输出
        Table table = tableEnv.sqlQuery("select * from user_info");
        DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(table, Row.class);
        retractStream.print();

        //4.启动
        env.execute("FlinkSQLCDC");

    }

}

2.3 自定义反序列化器

2.3.1 代码实现

package com.atguigu.func;

import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.List;

public class CustomerDeserializationSchema implements DebeziumDeserializationSchema<String> {


    /**
     * {
     * "db":"",
     * "tableName":"",
     * "before":{"id":"1001","name":""...},
     * "after":{"id":"1001","name":""...},
     * "op":""
     * }
     */
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {

        //创建JSON对象用于封装结果数据
        JSONObject result = new JSONObject();

        //获取库名&表名
        String topic = sourceRecord.topic();
        String[] fields = topic.split("\\.");
        result.put("db", fields[1]);
        result.put("tableName", fields[2]);

        //获取before数据
        Struct value = (Struct) sourceRecord.value();
        Struct before = value.getStruct("before");
        JSONObject beforeJson = new JSONObject();
        if (before != null) {
            //获取列信息
            Schema schema = before.schema();
            List<Field> fieldList = schema.fields();

            for (Field field : fieldList) {
                beforeJson.put(field.name(), before.get(field));
            }
        }
        result.put("before", beforeJson);

        //获取after数据
        Struct after = value.getStruct("after");
        JSONObject afterJson = new JSONObject();
        if (after != null) {
            //获取列信息
            Schema schema = after.schema();
            List<Field> fieldList = schema.fields();

            for (Field field : fieldList) {
                afterJson.put(field.name(), after.get(field));
            }
        }
        result.put("after", afterJson);

        //获取操作类型
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        result.put("op", operation);

        //输出数据
        collector.collect(result.toJSONString());

    }

    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}

 3  Flink-CDC 2.0

本章图片来源于北京站 Flink Meetup 分享的《详解 Flink-CDC》

痛点:

3.2 设计目标

3.3 设计实现

3.3.1 整体概览

在对于有主键的表做初始化模式,整体的流程主要分为 5 个阶段:

1.Chunk 切分;2.Chunk 分配;(实现并行读取数据&CheckPoint

3.Chunk 读取;(实现无锁读取)

4.Chunk 汇报;

5.Chunk 分配。

3.3.2 Chunk 切分

根据 Netflix DBlog 的论文中的无锁算法原理,对于目标表按照主键进行数据分片,设置每个切片的区间为左闭右开或者左开右闭来保证数据的连续性。

3.3.3 Chunk 分配

将划分好的 Chunk 分发给多个 SourceReader,每个 SourceReader 读取表中的一部分数据,实现了并行读取的目标

同时在每个 Chunk 读取的时候可以单独 CheckPoint,某个 Chunk 读取失败只需要单独执行该 Chunk 的任务,而不需要像 1.x 中失败了只能从头读取。

若每个 SourceReader 保证了数据一致性,则全表就保证了数据一致性。

3.3.4 Chunk 读取

读取可以分为 5 个阶段

1)SourceReader 读取表数据之前先记录当前的 Binlog 位置信息记为低位点;

2)SourceReader 将自身区间内的数据查询出来并放置在 buffer 中;

3)查询完成之后记录当前的 Binlog 位置信息记为高位点;

4)在增量部分消费从低位点到高位点的 Binlog

5)根据主键,对 buffer 中的数据进行修正并输出。

通过以上 5 个阶段可以保证每个 Chunk 最终的输出就是在高位点时该 Chunk 中最新的数据,

但是目前只是做到了保证单个 Chunk 中的数据一致性。

3.3.5 Chunk 汇报

在 Snapshot Chunk 读取完成之后,有一个汇报的流程,如上图所示,即 SourceReader 需要将 Snapshot Chunk 完成信息汇报给 SourceEnumerator。

3.3.6 Chunk 分配

FlinkCDC 是支持全量+增量数据同步的,在 SourceEnumerator 接收到所有的 Snapshot Chunk 完成信息之后,还有一个消费增量数据(Binlog的任务,此时是通过下发 Binlog Chunk给任意一个 SourceReader 进行单并发读取来实现的。

3.4 核心原理分析

3.4.1 Binlog Chunk 中开始读取位置源码

MySqlHybridSplitAssigner

private MySqlBinlogSplit createBinlogSplit(){
    
final List<MySqlSnapshotSplit> assignedSnapshotSplit=snapshotSplitAssigner.getAssignedSplits().values().stream().sorted(Comparator.comparing(MySqlSplit::splitId)).collect(Collectors.toList());
        Map<String, BinlogOffset> splitFinishedOffsets=snapshotSplitAssigner.getSplitFinishedOffsets();
final List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos=new ArrayList<>();

        final Map<TableId, TableChanges.TableChange>tableSchemas=new HashMap<>();
        BinlogOffset minBinlogOffset=BinlogOffset.INITIAL_OFFSET;for(MySqlSnapshotSplit split:assignedSnapshotSplit){
// find the min binlog offset

        BinlogOffset binlogOffset=splitFinishedOffsets.get(split.splitId());if(binlogOffset.compareTo(minBinlogOffset)< 0){
        minBinlogOffset=binlogOffset;
        }
        finishedSnapshotSplitInfos.add(
        new FinishedSnapshotSplitInfo(
        split.getTableId(),
        split.splitId(),
        split.getSplitStart(),
        split.getSplitEnd(),
        binlogOffset));
        tableSchemas.putAll(split.getTableSchemas());
        }
final MySqlSnapshotSplit lastSnapshotSplit=assignedSnapshotSplit.get(assignedSnapshotSplit.size()-1).asSnapshotSplit();
        return new MySqlBinlogSplit(
        BINLOG_SPLIT_ID,
        lastSnapshotSplit.getSplitKeyType(),
        minBinlogOffset,
        BinlogOffset.NO_STOPPING_OFFSET,
        finishedSnapshotSplitInfos,
        tableSchemas);
        }

3.4.2 读取低位点到高位点之间的 Binlog

BinlogSplitReader
/**
*Returns the record should emit or not.
*
*<p>The watermark signal algorithm is the binlog split reader only sends the binlog event
that
*belongs to its finished snapshot splits. For each snapshot split, the binlog event is valid
*since the offset is after its high watermark.
*
* <pre> E.g: the data input is :
*	snapshot-split-0 info : [0,	1024) highWatermark0
*snapshot-split-1 info : [1024, 2048) highWatermark1
*the data output is:
*only the binlog event belong to [0, 1024) and offset is after highWatermark0 should send,
*only the binlog event belong to [1024, 2048) and offset is after highWatermark1 should
send.
*</pre>
*/
private boolean shouldEmit(SourceRecord sourceRecord) {
   if (isDataChangeRecord(sourceRecord)) {
TableId tableId = getTableId(sourceRecord);
BinlogOffset position = getBinlogPosition(sourceRecord);
//aligned, all snapshot splits of the table has reached max highWatermark if (position.isAtOrBefore(maxSplitHighWatermarkMap.get(tableId))) {
return true;
}
Object[] key = getSplitKey(
currentBinlogSplit.getSplitKeyType(),
sourceRecord,
statefulTaskContext.getSchemaNameAdjuster()); for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo.get(tableId)) {
        if (RecordUtils.splitKeyRangeContains(
             key, splitInfo.getSplitStart(), splitInfo.getSplitEnd())
            &&position.isAtOrBefore(splitInfo.getHighWatermark())) { return true;
          }
        }
//not in the monitored splits scope, do not emit return false;
    }
//always send the schema change event and signal event
//we need record them to state of Flink
return true;
}

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

大数据技术之 Flink-CDC 的相关文章

  • java 基础重学(一)-面向对象

    一 什么是面向过程 面向过程 是一种以事件为中心的编程思想 编程的时候把解决问题的步骤分析出来 然后用函数把这些步骤实现 在一步一步的具体步骤中再按顺序调用函数 自顶向下 逐步求精 模块化封装函数主张按功能把软件系统逐步细分 对每个涉及到的
  • 华为OD机试真题 Java 实现【整数对最小和】【2022Q4 100分】,附详细解题思路

    一 题目描述 给定两个整数数组array1 array2 数组元素按升序排列 假设从array1 array2中分别取出一个元素可构成一对元素 现在需要取出k对元素 并对取出的所有元素求和 计算和的最小值 注意 两对元素如果对应于array
  • 在win10, win11 家庭版中安装远程桌面服务

    win10 win11 家庭版中提供远程桌面服务 简介 在windows家庭版中 是不提供远程桌面服务的 你没有办法使用远程桌面连接到windows家庭版中 当然 你可用升级windows 版本到专业版 这样就可用享受到windows自带的

随机推荐

  • MySQL从5.6版本到5.7版本的升级过程

    MySQL从5 6版本到5 7版本的升级过程 二进制升级过程 1 介绍 此处因原有的版本就是5 6的 就不再赘述5 6的安装过程了 原有数据库5 6的目录情况 basedir usr local mysql base目录是做的软链 指向my
  • Matplotlib输出中文显示问题

    Matplotlib输出中文显示问题 试过觉得有用的办法 http www 360doc com content 14 0713 12 16740871 394080556 shtml
  • ggplot2读书笔记6:第四章 语法 基础理论

    碎碎念ing 终于结束了 ggplot2 的第一部分 Getting Started 今天开始看第二部分 语法 第四章 Mastering the Grammar 介绍了ggplot2的一些基础语法知识 大概是对前期内容在理论上做一个总结
  • x390拆机 升级内存和硬盘_战66拆机加内存折腾手记

    前言 今年6 18时入HP 战66二代AMD版时就在计划双十一时升级内存到16G 并顺带加个机械硬盘 结果用了4个多月后感觉目前硬盘空间尚无压力 所以只计划加内存 晒单 之前有朋友用过芝奇 说还不错 既有逼格也有保障 没有等到11 11当天
  • openGL增强表面细节----高度贴图

    openGL系列文章目录 文章目录 openGL系列文章目录 前言 一 代码 主程序c 效果 前言 现在我们扩展法线贴图的概念 从纹理图像用于扰动法向量到扰乱顶点位置本身 实 际上 以这种方式修改对象的几何体具有一定的优势 例如使表面特征沿
  • 简单的移动阵型补全

    需求 模拟企鹅群以一种三角形的阵型移动 并向目标发动冲击 冲击时候的企鹅不在跟随阵型移动 阵型内的企鹅跟随阵型移动 并且会自动补全阵型 企鹅群体大概是这样的阵型 o o o o o o 可以抽象成一个数组 fromation new int
  • SQL中partition关键字的使用

    最近在写后台语句时候 运用到了partition这样一个关键字 先大致说一下背景 有一种数据表 如下 现在需要取出 每一个人最近的一次打卡时间 思路是 先把数据按照人名分组 然后在每个组里面按照时间排倒叙 最后取出每组的第一条数据即可 pa
  • 【Hadoop学起来】Linux配置$HADOOP_HOME/etc/hadoop/hadoop-env.sh时找不到JAVA_HOME?

    正文之前 今天很气愤 想要学点东西 但是老是被环境所限制 Hadoop这个见鬼的环境 我只是运行单机模式 结果就是都不成功 好不容易磕磕盼盼的终于把啥缺的东西都找出来了结果最后还是失败了 暂时我真的不想去看失败记录 因为快要睡了明天再说吧
  • Premiere Pro 2022有哪些新增功能吸引了你

    Premiere Pro2022正式出现在大家的面前 那么如此大版本的更新 都有哪些变化呢 今天我们就来谈谈pr22版本的变化 安装 Premiere Pro 2022中文 图形和标题 Premiere Pro 中的图形和字幕工作流程具有多
  • 华为星闪联盟:引领无线通信技术创新的先锋

    星闪 NearLink 是由华为倡导并发起的新一代无线短距通信技术 它从零到一全新设计 是为了满足万物互联时代个性化 多样化的极致 创新体验需求而诞生的 这项技术汇聚了中国300多家头部企业和机构的集体智慧 华为更是其中的主要贡献方 在过去
  • 微信小程序scroll-view滚动到指定位置

    自动滚动到指定位置 在小程序开发过程中 为了满足一个特殊的要求 因此需要一个特殊的功能 即在打开小程序数据加载完成的时候 需要页面自动滚动到指定位置 或者页面的底部 在各种度度 狗狗之后 通过各种尝试 算是找到了一个比较完美的方案 微信扫描
  • JVM-类加载详解

    一 JVM类加载过程 JVM类加载过程如下图 JVM类加载过程分为 加载 链接 初始化 使用 卸载 这五个阶段 其中链接阶段又包括 验证 准备 解析 加载 通过类的完全限定名 查找此类的二进制字节码文件 通过该字节码文件创建Class对象
  • springboot 结合 ice(飞冰) 实现上传功能

    ice 前端代码 我用的是一个拖拽的模板
  • Disk /dev/sdb doesn‘t contain a valid partition table

    使用fdisk l grep Disk命令查看磁盘情况提示Disk dev sdb doesn t contain a valid partition table 解决方案如下 使用 命令fdisk dev sdb 然后按照下面的步骤操作即
  • 【蓝桥杯 和与乘积】

    题目描述 解题思路 首先想想可以组成答案的区间有什么性质 很直观可以想到排除长度为1的和长度为2的 构成答案的区间肯定是由几个非1的数加上一堆1构成的 那么可以很容易的想到区间长度k有下面这个等式 k mul sm tot mul为区间非1
  • [Ubuntu]GTest安装和测试

    1 Ubuntu直接通过控制台安装 sudo apt get install libgtest dev 2 编译链接库 2 1进入gtest文件夹 cd usr src gtest 2 2编译 没有安装Cmake的请先安装cmake sud
  • 计算机无法识别华为m3,华为8寸M3(非青春版)电脑连接问题报告

    设备是一上市时候就买了 wifi版32g 具体时间忘了 用到现在整体感觉都挺好 就在这个长假因为某些原因想连电脑 这是第一次连 结果就发现了问题 设备情况 win10和win7电脑各一台 均安装了华为助手 5根usb线 本机EMUI5 01
  • 数据结构(3)— 线性表之顺序存储详解介绍(含代码)

    1 博客代码在 数据结构代码 GitHub仓库 线性表介绍 线性表的基础概念 1 甲骨文表示 线性表是零个或多个数据元素的有限序列 2 线性表 顾名思义 就是说这个数据存储是线性的 而线性的东西具有什么特征呢 lt 1 gt 数据是一对一的
  • JSWebrtc.js +对应视频实例化

    HTML div class context div
  • 大数据技术之 Flink-CDC

    第1章 CDC简介 1 1 什么是 CDC CDC 是 Change Data Capture 变更数据获取 的简称 核心思想是 监测并捕获数据库的变动 包括数据或数据表的插入 更新以及删除等 将这些变更按发生的顺序完整记录下来 写入到消息