Flink与Kafka的爱恨情仇

2023-05-16

FlinkKafkaConsumer 源码剖析

FlinkKafkaConsumer 的继承关系如下图所示。
在这里插入图片描述

可以发现几个版本的 FlinkKafkaConsumer 都继承自 FlinkKafkaConsumerBase 抽象类,所以可知 FlinkKafkaConsumerBase 是最核心的类了。FlinkKafkaConsumerBase 实现了 CheckpointedFunction、CheckpointListener 接口,继承了 RichParallelSourceFunction 抽象类来读取 Kafka 数据。
在这里插入图片描述

在 FlinkKafkaConsumerBase 中的 open 方法中做了大量的配置初始化工作,然后在 run 方法里面是由 AbstractFetcher 来获取数据的,在 AbstractFetcher 中有用 List> 来存储着所有订阅分区的状态信息,包括了下面这些字段:

private final KafkaTopicPartition partition;    //分区
private final KPH kafkaPartitionHandle;
private volatile long offset;   //消费到的 offset
private volatile long committedOffset;  //提交的 offset

在 FlinkKafkaConsumerBase 中还有字段定义 Flink 自动发现 Kafka 主题和分区个数的时间,默认是不开启的(时间为 Long.MIN_VALUE),像如果传入的是正则表达式参数,那么动态的发现主题还是有意义的,如果配置的已经是固定的 Topic,那么完全就没有开启这个的必要,另外就是 Kafka 的分区个数的自动发现,像高峰流量的时期,如果 Kafka 的分区扩容了,但是在 Flink 这边没有配置这个参数那就会导致 Kafka 新分区中的数据不会被消费到,这个参数由 flink.partition-discovery.interval-millis 控制。

FlinkKafkaProducer 源码剖析
FlinkKafkaProducer 这个有些特殊,不同版本的类结构有些不一样,如 FlinkKafkaProducer011 是继承的 TwoPhaseCommitSinkFunction 抽象类,而 FlinkKafkaProducer010 和 FlinkKafkaProducer09 是基于 FlinkKafkaProducerBase 类来实现的。
在这里插入图片描述

在这里插入图片描述

在 Kafka 0.11.x 版本后支持了事务,这让 Flink 与 Kafka 的事务相结合从而实现端到端的 Exactly once 才有了可能。

数据 Sink 到下游的 Kafka,可你能会关心数据的分区策略,在 Flink 中自带了一种就是 FlinkFixedPartitioner,它使用的是 round-robin 策略进行下发到下游 Kafka Topic 的分区上的,当然也提供了 FlinkKafkaPartitioner 接口供你去实现自定义的分区策略。

使用 Flink-connector-kafka 可能会遇到的问题

如何消费多个 Kafka Topic

通常可能会有很多类型的数据全部发到 Kafka,但是发送的数据却不是在同一个 Topic 里面,然后在 Flink 处消费的时候,又要去同时消费这些多个 Topic,在 Flink 中除了支持可以消费单个 Topic 的数据,还支持传入多个 Topic,另外还支持 Topic 的正则表达式(因为有时候可能会事先不确定到底会有多少个 Topic,所以使用正则来处理会比较好,只要在 Kafka 建立的 Topic 名是有规律的就行),如下几种构造器可以传入不同参数来创建 FlinkKafkaConsumer 对象。

//单个 Topic
public FlinkKafkaConsumer011(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
    this(Collections.singletonList(topic), valueDeserializer, props);
}

//多个 Topic
public FlinkKafkaConsumer011(List<String> topics, DeserializationSchema<T> deserializer, Properties props) {
    this(topics, new KafkaDeserializationSchemaWrapper<>(deserializer), props);
}

//正则表达式 Topic
public FlinkKafkaConsumer011(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props) {
    this(subscriptionPattern, new KafkaDeserializationSchemaWrapper<>(valueDeserializer), props);
}

想要获取数据的元数据信息

在消费 Kafka 数据的时候,有时候想获取到数据是从哪个 Topic、哪个分区里面过来的,这条数据的 offset 值是多少。这些元数据信息在有的场景真的需要,那么这种场景下该如何获取呢?其实在获取数据进行反序列化的时候使用 KafkaDeserializationSchema 就行。

public interface KafkaDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {

    boolean isEndOfStream(T nextElement);

    T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception;
}

在 KafkaDeserializationSchema 接口中的 deserialize 方法里面的 ConsumerRecord 类中是包含了数据的元数据信息。

public class ConsumerRecord<K, V> {
    private final String topic;
    private final int partition;
    private final long offset;
    private final long timestamp;
    private final TimestampType timestampType;
    private final long checksum;
    private final int serializedKeySize;
    private final int serializedValueSize;
    private final K key;
    private final V value;
}

所在在使用 FlinkKafkaConsumer011 构造对象的的时候可以传入实现 KafkaDeserializationSchema 接口后的参数对象。

//单个 Topic
public FlinkKafkaConsumer011(String topic, KafkaDeserializationSchema<T> deserializer, Properties props) {
    this(Collections.singletonList(topic), deserializer, props);
}

//多个 Topic
public FlinkKafkaConsumer011(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props) {
    super(topics, deserializer, props);
}

//正则表达式 Topic
public FlinkKafkaConsumer011(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props) {
    super(subscriptionPattern, deserializer, props);
}

多种数据类型

因为在 Kafka 的数据的类型可能会有很多种类型,比如是纯 String、String 类型的 JSON、Avro、Protobuf。那么源数据类型不同,在消费 Kafka 的时候反序列化也是会有一定的不同,但最终还是依赖前面的 KafkaDeserializationSchema 或者 DeserializationSchema (反序列化的 Schema),数据经过处理后的结果再次发到 Kafka 数据类型也是会有多种,它依赖的是 SerializationSchema(序列化的 Schema)。

序列化失败

因为数据是从 Kafka 过来的,难以避免的是 Kafka 中的数据可能会出现 null 或者不符合预期规范的数据,然后在反序列化的时候如果作业里面没有做异常处理的话,就会导致作业失败重启,这样情况可以在反序列化处做异常处理,保证作业的健壮性。

Kafka 消费 Offset 的选择
因为在 Flink Kafka Consumer 中是支持配置如何确定从 Kafka 分区开始消费的起始位置的。

FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer0111<>(...);
consumer.setStartFromEarliest();     //从最早的数据开始消费
consumer.setStartFromLatest();       //从最新的数据开始消费
consumer.setStartFromTimestamp(...); //从根据指定的时间戳(ms)处开始消费
consumer.setStartFromGroupOffsets(); //默认从提交的 offset 开始消费

另外还支持根据分区指定的 offset 去消费 Topic 数据,示例如下:

Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
specificStartOffsets.put(new KafkaTopicPartition("topic", 0), 23L);
specificStartOffsets.put(new KafkaTopicPartition("topic", 1), 31L);
specificStartOffsets.put(new KafkaTopicPartition("topic", 2), 43L);

myConsumer.setStartFromSpecificOffsets(specificStartOffsets);

注意:这种情况下如果该分区中不存在指定的 Offset 了,则会使用默认的 setStartFromGroupOffsets 来消费分区中的数据。如果作业是从 Checkpoint 或者 Savepoint 还原的,那么上面这些配置无效,作业会根据状态中存储的 Offset 为准,然后开始消费。

每个 Kafka 分区的时间戳

当以 Kafka 来作为数据源的时候,通常每个 Kafka 分区的数据时间戳是递增的(事件是有序的),但是当你作业设置多个并行度的时候,Flink 去消费 Kafka 数据流是并行的,那么并行的去消费 Kafka 分区的数据就会导致打乱原每个分区的数据时间戳的顺序。在这种情况下,你可以使用 Flink 中的 Kafka-partition-aware 特性来生成水印,使用该特性后,水印会在 Kafka 消费端生成,然后每个 Kafka 分区和每个分区上的水印最后的合并方式和水印在数据流 shuffle 过程中的合并方式一致。

如果事件时间戳严格按照每个 Kafka 分区升序,则可以使用前面提到的 AscendingTimestampExtractor 水印生成器来为每个分区生成水印。下面代码如何使用 per-Kafka-partition 来生成水印。

FlinkKafkaConsumer011<Event> kafkaSource = new FlinkKafkaConsumer011<>("topic", schema, props);
kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Event>() {

    @Override
    public long extractAscendingTimestamp(Event event) {
        return event.eventTimestamp();
    }
});

DataStream<Event> stream = env.addSource(kafkaSource);

上面这几种策略是支持可以配置的,需要在作业中指定,具体选择哪种是需要根据作业的业务需求来判断的。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flink与Kafka的爱恨情仇 的相关文章

  • VMware虚拟机安装CentOS7

    VMware虚拟机安装CentOS7 2021 3 18 23 42 58 软件环境 虚拟机 xff1a VMware Workstation Linux xff1a CentOS 7 x86 64 DVD 1708 iso镜像文件 虚拟机
  • Linux-ContentOs关闭防火墙

    Centos 永久关闭防火墙 2021 3 19 22 22 43 1 打开虚拟机 xff0c 然后输入命令 systemctl status firewalld service 并按下回车键 2 出现上图中的active xff08 ru
  • Linux一键安装部署环境

    安装Java环境 yum install y java 1 8 0 openjdk devel x86 64 安装mariadb 1 vi install mariadb sh 创建文件并打开 2 输入 i 进入编辑 复制一下内容粘贴 bi
  • ubantu安装搜狗常见问题

    安装详情 参考步骤 语言设置里已经设置为fcitx xff0c 也装了这个 xff0c 但按照教程 xff0c 重启后 xff0c 点击键盘图标 xff0c 配置语言时 xff0c 却弹出来了这样的对话框 xff08 您正在图形界面下运行
  • ubuntu下virtualbox的安装、卸载

    一 添加VirtualBox的源并安装5 1版本 virtualbox官网 xff1a https www virtualbox org wiki Download Old Builds 虽然也可以直接安装deb包 xff08 例如 xff
  • CoreData的简单使用<二、CoreData两张数据库表的关联操作>

    两张数据库表的关联操作 在实际的数据结构中难免会遇到两张数据表需要进行关联 xff0c 比如通用的例子 xff0c 公司有两个员工张三和李四 xff0c 分别属于iOS和android部门 xff0c 如果所有的员工属于一张表 xff0c
  • 瑞士移位赛赛制解释

    这种比赛是用瑞士轮转法实施的积分编排制比赛 当参赛队数较多 xff0c 而比赛时间又不很充裕时 xff0c 瑞士移位法是理想的比赛方法 比赛分开闭室 各队都给予编号 xff0c 在整个比赛过程中各队的编号不动 第1轮对阵一般安排强队对弱队
  • Springboot:Spring Assistant创建项目框架

    Springboot Spring Assistant创建项目框架 Springboot自动配置项目流程步骤如下所示 第一步 第二步 第三步 第四步
  • vnc server干什么用的,简单介绍vnc server是干什么用的

    VNC Server 是一般 Linux 发行版都会附带的 vnc服务器软件 vncserver 是一个为了满足分布式用户共享服务器上面的资源 xff0c 而在服务器上开启的一项服务 xff0c 对应的客户端软件为vncviewer xff
  • GITHUB下载慢解决办法-插件解决

    在油猴上搜索此插件 或者直接导入插件代码如下 61 61 UserScript 61 61 64 name Github 增强 高速下载 64 name zh CN Github 增强 高速下载 64 name zh TW Github 增
  • OSPF路由协议配置实验

    实验目的 xff1a 理解OSPF xff1b 掌握OSPF的配置方法 xff1b 掌握查看OSPF协议的相关信息 实验器材 xff1a 路由器及PC机 xff0c 双绞线 实验内容 xff1a 本实验通过在路由器上配置OSPF协议 xff
  • linux 22.04版本ubuntu换源

    1 备份原始源文件 执行下面命令 xff0c 将原来的源文件备份保存 sudo cp etc apt sources list etc apt sources list bak 2 修改源文件sources list 打开源文件 xff0c
  • jquery dialog 弹出 ajax加载数据

    我之前就是想实现一个弹框 xff0c 在弹框的时候用ajax 动态加载要展示的内容 xff0c 这个地方实现起来并不难 xff0c 但是在实现的时候遇到了一个坑 页面 xff1a lt div id 61 34 logDiv 34 styl
  • 解决STM32新增加函数出现Undefined symbol HAL_ADC_Init (referred from main.o). 问题

    最近在开发STM32L0xx系列产品的ADC采集项目 xff0c 在老工程上添加库函数 xff0c stm32l0xx hal adc h stm32l0xx hal adc c函数 进行ADC初始化 xff0c 编译不通过出现 xff0c
  • GET,POST,PUT,DELETE,OPTIONS等请求方式简单总结

    之前做的java web项目 xff0c 基本上只使用get和post的请求方式 xff0c 但是现在新项目额外增加了put xff0c delete xff0c 查了点资料 xff0c 做个简单的总结 1 GET get请求是用来获取数据
  • IOS TableView Cell重用机制及TableView常用Code

    写的太好了 xff0c 多谢楼主的无私分享 文章来自 xff1a http heidianfeng blog 163 com blog static 6184345620121114104552518 创建UITableViewContro
  • 编译Linux驱动程序

    基于Ubuntu 12 10 xff0c 编译Linux驱动程序 1 准备linux内核源码 此时 xff0c 我要编译的驱动是基于Ubuntu 12 10内核的 xff0c 所以我下载的是其对应的内核源码包 xff1a linux 3 5
  • 卸载windows10子系统卸载linux

    参考地址 xff1a https docs microsoft com en us windows wsl wsl config 查看所有已经安装的分发版本 xff1a wsl list all 查看正在运行的分发版本 xff1a wsl
  • github镜像站

    github镜像站 xff1a https hub fastgit org GitHub 在国内经常会出现无法访问的情况 xff0c 下面分享几个 GitHub 镜像站供大家使用 xff01 全局加速 可直接访问站点 xff0c 查看代码等
  • vnc连接linux失败,vnc连接linux失败解决办法

    在日常工作学习中 xff0c 经常会使用到vnc连接 xff0c 那有小伙伴知道如何进行vnc连接linux吗 xff1f 当vnc连接linux失败又该如何解决呢 xff1f 之前有简单介绍过如何实现vnc连接linux 那接下来让我们一

随机推荐

  • 域名是如何关联到CDN的

    用户在访问一个域名的时候 xff0c 网络中是怎么知道这个域名到底是配置在哪一个CDN厂商的呢 xff1f 笔者以test1 huiziguoxueshe com为例 xff0c 来描述下具体的过程 xff0c 如下所示 xff1a ste
  • 第五章:软件详细设计

    真是应了那句 xff1a 白天游四方 晚上点灯补裤裆 捂脸 xff09 xff0c 孩子睡了夜深人静了 我才是开始我的小笔记整理工作 详细设计是软件设计的第二阶段 xff0c 这一阶段的工作 xff0c 就是要对系统中的每个模块给出足够详细
  • 第7章 软件测试(1)

    今天是个开心的日子 xff0c 具体为啥开心我知道你懂得 继续我的龟速学习小笔记 它来了它来了 xff0c 你说它很简单 xff0c 当我看到此章的一部分内容后发现了原来学霸和学渣差别就是如此大 xff08 捂脸 xff09 xff0c 今
  • 第7章 软件测试(3)

    一晃3天没有学习了 xff0c 昨天的阅读量创立了一个新高 xff0c 内心还是很欢喜的 7 4 2黑盒技术 黑盒技术着重测试软件功能 xff0c 需重点研究需求说明和总体设计中有关程序功能输入 输出之间的关系等信息 xff0c 从而与测试
  • 第七章 软件测试(此章完结)

    春乏秋困 xff0c 一个早上哈气连天 脖子酸 腰痛 xff08 捂脸 xff09 近期叫醒我的不是闹钟也不是梦想 xff0c 而是凌晨4点和6点广播大喇叭喊居民做核酸的声音 xff0c 还是别的小区的 xff08 再次捂脸 xff09 也
  • 第十章:面向对象分析(2)

    3 泛化关系 泛化关系和类找那个的泛化概念是一样的 xff0c 于用例继承父用例的行为和含义 xff0c 还可以增加或覆盖父用例的行为 xff0c 子用例可以出现在任何父用例出现的位置 xff08 父和子均有具体的实例 xff09 也可以重
  • 第十章:面向对象分析(此章完结)

    10 4 4建立活动图 活动具体表现为由一系列动作组成的执行过程 xff0c 将各种活动及不同活动之间的转换 xff0c 用图形进行表示就构成了活动图 xff0c 作用是对系统的行为建模 1 活动图与流程图 活动图描述系统使用的活动 xff
  • 第十五章 软件工程新技术

    俺家老大说这一章我不需要仔细看 xff0c 快快过一遍就行 xff08 可能是觉得以我的能力一时半会也用不到吧 xff08 捂脸 xff09 xff09 那么我就抄一段本章小结吧 xff0c 后面如有需要我在重新认真学习 xff08 奸笑
  • 第四章 软件测试方法(2)

    上周学习了白盒 xff0c 本周开始学习黑盒测试 4 3黑盒测试 黑盒测试 xff08 Black Box Testing xff09 也称功能测试 xff0c 主要测试每个功能是否正常使用 是软件测试使用中最广泛的一类测试 在黑盒测试中
  • vnc viewer手机中文版,超好用的5款vnc viewer手机中文版

    在平时工作中 xff0c 经常会用到vnc viewer软件 当软件打开都是英文介绍 xff0c 真的让人很头痛 在各种各样的vnc viewer手机中文版软件中 xff0c 要想找到那款让你使用方便的软件 xff0c 真的很不容易 xff
  • 第九章 APP项目测试(4) 测试工具

    接上面一篇 继续 xff08 7 xff09 kill process after error 参数说明 xff1a 用于指定当应用程序发生错误时 xff0c 是否停止运行 如果指定此参数 xff0c 当应用程序发生错误时 xff0c 应用
  • 第九章 APP项目测试(此章完结)

    9 4 5 Fiddler 是一个HTTP的调试代理工具 xff0c 它以代码服务器的方式 xff0c 监听系统的HTTP网络数据 xff0c 俗称抓包工具 可直接去官网下载安装 1 Fiddler工具介绍 启动Fiddler后 xff0c
  • 软硬件基础知识学习--小日记(1)

    终于看完了软件工程和软件测试技术指南两本书 xff0c 因为是自学总觉得前学后忘 有时候找老公不耻下问 xff0c 他总是很完美的把我问的哑口无言 昨天意外翻到黑马程序的的视频 xff0c 觉得非常适合我这0基础的小白 然后就有了今天的小日
  • Qt for Windows版本下编译QtDBus模块

    转载时请注明出处和作者联系方式 作者联系方 式 xff1a Lutx lt 80437 at zj dot com gt Qt中已经包含了QtDBus模块 但此模块只能在Unix系统下使用 却不支持Windows系统 这里介绍的是Windo
  • 智安网络丨一行代码,揭开CPU执行原理!

    计算机如何执行你写的代码 xff1f 知乎上有人提问 xff1a 电脑怎样执行编程语言的 xff1f 图片 很多刚刚入坑的小白可能对此完全没有概念 xff0c 或者模模糊糊知道个大概 xff0c 我们写下的一行行代码 xff0c 计算机到底
  • 推荐7个冷门手机APP,每一个都让我相见恨晚

    推荐7个让我相见恨晚的手机APP 1 Smart Kit 360 Smart Kit 360是一个全能的工具箱软件 xff0c 只有10M的大小 xff0c 却提供了40多个实用工具 xff0c 有了它 xff0c 就不需要下载这么多软件了
  • 推荐8款有趣实用的软件,建议你先收藏,总有一天你会用到

    推荐8个非常好用的软件 xff0c 每一个都能给人带来惊喜 xff0c 软件的实用性非常强 xff0c 千万不要错过了 1 央视频 央视频是中央广播电视总台出品的高质量视频社交软件 xff0c 内容丰富 xff0c 功能强大 强大的电视直播
  • 中国天气网 API

    中国天气网 API 真正的中国天气api接口xml xff0c json 详解 前言 某天想写个天气软件 xff0c 于是上网找找有没有免费的天气 API 发现许多的API不是收费 xff0c 就是不能用了 xff08 心塞塞 xff09
  • 【Linux-Ubuntu】apt-get update软件更新的时候经常出错

    1 网络问题 将电脑连接的WIFI改成手机热点连接 2 镜像源问题 使用最新的镜像源进行下载更新 xff1a 可以参考下面方式获取 xff1a 然后选择手动替换 xff0c 或者命令替换 xff0c 一般你直接复制原来的 list文件 xf
  • Flink与Kafka的爱恨情仇

    FlinkKafkaConsumer 源码剖析 FlinkKafkaConsumer 的继承关系如下图所示 可以发现几个版本的 FlinkKafkaConsumer 都继承自 FlinkKafkaConsumerBase 抽象类 xff0c