Kafka 丢数据问题

2023-11-01

Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。

主要应用场景是:日志收集系统和消息系统。

Kafka主要设计目标如下:

  • 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能。
  • 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输。
  • 支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输。
  • 同时支持离线数据处理和实时数据处理。
  • 支持在线水平扩展

1、Kafka如何防止数据丢失

1)消费端弄丢数据

​ 消费者在消费完消息之后需要执行消费位移的提交,该消费位移表示下一条需要拉取的消息的位置。Kafka默认位移提交方式是自动提交,但它不是在你每消费一次数据之后就提交一次位移,而是每隔5秒将拉取到的每个分区中的最大的消费位移进行提交。自动位移提交在正常情况下不会发生消息丢失或重复消费的现象,唯一可能的情况,你拉取到消息后,消费者那边刚好进行了位移提交,Kafka那边以为你已经消费了这条消息,其实你刚开始准备对这条消息进行业务处理,但你还没处理完,然后因为某些原因,自己挂掉了,当你服务恢复后再去消费,那就是消费下一条消息了,那么这条未处理的消息就相当于丢失了。所以,很多时候并不是说拉取到消息就算消费完成,而是将消息写入数据库或缓存中,或者是更加复杂的业务处理,在这些情况下,所有的业务处理完成才能认为消息被成功消费。Kafka也提供了对位移提交进行手动提交的方式,开启手动提交的前提是消费者客户端参数enable.auto.commit配置为false,

 

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

​ 消费者端手动提交方式提供了两种,commitSync()同步提交方式和commitAsync()异步提交方式。commitSync()同步提交方式在调用时Consumer程序会处于阻塞状态,直到远端的broker返回提交结果,这个状态才会结束,这样会对消费者的性能有一定的影响。commitAsync()异步提交方式在执行后会立刻返回,不会被阻塞,但是它也有相应的问题产生,如果异步提交失败后,它虽然也有重试,但是重试提交的位移值可能早已经“过期”或者不是最新的值了,因此异步提交的重试其实没有意义。这里我们可以把同步提交和异步提交相结合,以达到最理想的效果。

 

 try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                for (ConsumerRecord<String, String> record : records) {
                    // 处理消息 record
                }
                consumer.commitAsync();
            }
        } catch (Exception e){
            // 处理异常
        } finally {
            try {
                consumer.commitSync();
            } finally {
                consumer.close();
            }
        }

2)Kafka端弄丢数据

​ 如下图,副本A为leader副本,副本B为follower副本,它们的HW和LEO都为4。

 

 

image

​ 此时,A中写入一条消息,它的LEO更新为5,B从A中同步了这条数据,自己的LEO也更新为5

image

​ 之后B再向A发起请求以拉取数据,该FetchRequest请求中带上了B中的LEO信息,A在收到该请求后根据B的LEO值更新了自己的HW为5,A中虽然没有更多的消息,但还是在延时一段时间之后返回FetchRresponse,其中也包含了HW信息,最后B根据返回的HW信息更新自己的HW为5。

image

​ 可以看到整个过程中两者之间的HW同步有一个间隙,B在同步A中的消息之后需要再一轮的FetchRequest/FetchResponse才能更新自身的HW为5。如果在更新HW之前,B宕机了,那么B在重启之后会根据之前HW位置进行日志截断,这样便会将4这条消息截断,然后再向A发送请求拉取消息。此时若A再宕机,那么B就会被选举为新的leader。B恢复之后会成为follower,由于follower副本的HW不能比leader副本的HW高,所以还会做一次日志截断,以此将HW调整为4。这样一来4这条数据就丢失了(就算A不能恢复,这条数据也同样丢失了)。

image

​ 对于这种情况,一般要求起码设置如下4个参数:

1)给这个topic设置replication.factor参数:这个值必须大于1,要求每个partition必须有至少2个副本

2)在kafka服务端设置min.insync.replicas参数:这个值必须大于1,这个是要求一个leader至少感知到有至少一个follower还跟自己保持联系,没掉队,这样才能确保leader挂了还有一个follower

3)在producer端设置acks=all或-1:这个是要求每条数据,必须是写入所有replica之后,才能认为是写成功了

4)在producer端设置retries为很大的一个值:这个是要求一旦写入失败,就无限重试,它默认为0,即在发生异常之后不进行任何重试。

​ 当然,设置了acks等于all或-1之后,会影响一定的性能。Kafka从0.11.0.0 开始引入了leader epoch的概念,在需要截断数据的时候使用leader epoch作为参考依据而不是原本的HW。leader epoch代表leader的纪元信息,初始值为0,每当leader变更一次,leader epoch的值就会加1,相当于为leader增设了一个版本号。引入leader epoch很好的解决了前面所说的数据丢失问题,也就不需要去设置acks=all了。

3)生产者端不会丢失数据

​ 如果你配置了上面场景的参数,就是当数据写入leader副本和所有follower副本成功后才返回响应给生产者,如果写入不成功,生产者会不断重试。

2、Kafka 怎么防止重复消费

​ 消费者的自动位移提交方式会带来重复消费的问题。假设刚刚提交完一次消费位移,然后拉取一批消息进行消费,在下一次自动位移提交之前,消费者崩了,那么等消费者恢复再来消费消息的时候又得从上一次位移提交的地方重新开始,这样便发生了重复消费的现象。

​ 其实这里可以类似上面消费端丢失数据的情况,很多时候并不是说拉取到消息就算消费完成,而是将消息写入数据库或缓存中,或者是更加复杂的业务处理,重复消费也同样如此,重复消费不可怕,可怕的是你没考虑到重复消费之后,怎么保证幂等性,通俗点说,就一个数据,或者一个请求,给你重复来多次,你得确保对应的数据是不会改变的,不能出错。这里防止重复消费,你可以像上面一样把自动提交改为手动提交,或者是保证消息消费的幂等性。

保证消费消息幂等性

1)如果你是要插入postgresql中,可以对其设置唯一键,插入重复的数据只会插入报错,不会有重复数据产生

2)如果你是要写入redis中,每次都是set操作,可以保证幂等性

​ 如何保证消息消费是幂等性的,需要结合具体的业务来看。

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka 丢数据问题 的相关文章

  • Hive - 通过聚合跨组的值来创建映射列类型

    我有一个看起来像这样的表 customer category room date 1 A aa d1 1 A bb d2 1 B cc d3 1 C aa d1 1 C bb d2 2 A aa d3 2 A bb d4 2 C bb d4
  • Hive如何存储数据,什么是SerDe?

    当查询表时 SerDe 将将文件中的字节中的一行数据反序列化为 Hive 内部使用的对象来操作该行数据 执行 INSERT 或 CTAS 时 请参阅第 441 页上的 导入数据 表的 SerDe 将将 Hive 的一行数据的内部表示序列化为
  • 无法使用 PDI 步骤连接到 HDFS

    我已经配置成功了Hadoop 2 4 in an Ubuntu 14 04 虚拟机 from a 视窗8系统 Hadoop 安装工作绝对正常 而且我还可以从 Windows 浏览器查看 Namenode 附图如下 所以 我的主机名是 ubu
  • 一个目录下可以有两个oozieworkflow.xml文件吗?

    一个目录下可以有两个oozieworkflow xml文件吗 如果是这样 我如何指示 oozie runner 运行哪一个 您可以有两个工作流程文件 只需为它们指定唯一的名称 然后您可以通过设置oozie wf application pa
  • Hadoop:处理大型序列化对象

    我正在开发一个应用程序来使用 Hadoop 框架处理 和合并 几个大型 java 序列化对象 顺序 GB 大小 Hadoop 存储将文件块分布在不同的主机上 但由于反序列化需要所有块都存在于单个主机上 因此它会极大地影响性能 我该如何处理这
  • java.io.IOException:无法获取 LocationBlock 的块长度

    我正在使用 HDP 2 1 对于集群 我遇到了以下异常 并且 MapReduce 作业因此失败 实际上 我们定期使用 Flume 版本的数据创建表 1 4 我检查了映射器尝试读取的数据文件 但我找不到任何内容 2014 11 28 00 0
  • Hadoop 安全模式恢复 - 花费太长时间!

    我有一个包含 18 个数据节点的 Hadoop 集群 我在两个多小时前重新启动了名称节点 并且名称节点仍处于安全模式 我一直在寻找为什么这可能花费太长时间 但找不到好的答案 发帖在这里 Hadoop 安全模式恢复 花费大量时间 https
  • HDP 3.1.0.0-78 升级后无法使用 ResourceManager UI 终止 YARN 应用程序

    我最近将 HDP 从 2 6 5 升级到 3 1 0 它运行 YARN 3 1 0 并且我无法再使用旧的 8088 cluster apps 或新的 8088 从 YARN ResourceManager UI 终止应用程序 ui2 ind
  • Hive 中字符串数据类型是否有最大大小?

    谷歌了很多 但没有在任何地方找到它 或者这是否意味着只要允许集群 Hive 就可以支持任意大字符串数据类型 如果是这样 我在哪里可以找到我的集群可以支持的最大字符串数据类型大小 提前致谢 Hive 列表的当前文档STRING作为有效的数据类
  • Spark超时可能是由于HDFS中文件超过100万个的binary Files()

    我正在通过以下方式读取数百万个 xml 文件 val xmls sc binaryFiles xmlDir 该操作在本地运行良好 但在纱线上失败并显示 client token N A diagnostics Application app
  • 如果 HBase 不是运行在分布式环境中,它还有意义吗?

    我正在构建数据索引 这将需要以形式存储大量三元组 document term weight 我将存储多达几百万个这样的行 目前我正在 MySQL 中将其作为一个简单的表来执行 我将文档和术语标识符存储为字符串值 而不是其他表的外键 我正在重
  • Namenode高可用客户端请求

    谁能告诉我 如果我使用java应用程序请求一些文件上传 下载操作到带有Namenode HA设置的HDFS 这个请求首先去哪里 我的意思是客户端如何知道哪个名称节点处于活动状态 如果您提供一些工作流程类型图或详细解释请求步骤 从开始到结束
  • hive创建表的多个转义字符

    我正在尝试将带有管道分隔符的 csv 加载到配置单元外部表 数据值包含单引号 双引号 括号等 使用 Open CSV 版本 2 3 测试文件 csv id name phone 1 Rahul 123 2 Kumar s 456 3 Nee
  • 在 Amazon EMR 上使用 java 中的 hbase 时遇到问题

    因此 我尝试使用作为 MapReduce 步骤启动的自定义 jar 来查询 Amazon ec2 上的 hbase 集群 我的 jar 在地图函数内 我这样调用 Hbase public void map Text key BytesWri
  • 猪参考

    我正在学习 Hadoop Pig 并且我总是坚持引用元素 请查找下面的示例 groupwordcount group chararray words bag of tokenTuples from line token chararray
  • 带有安全 Kafka 抛出的 Spark 结构化流:无权访问组异常

    为了在我的项目中使用结构化流 我正在 hortonworks 2 6 3 环境上测试 Spark 2 2 0 和 Kafka 0 10 1 与 Kerberos 的集成 我正在运行下面的示例代码来检查集成 我能够在 Spark 本地模式下的
  • 纱线上的火花,连接到资源管理器 /0.0.0.0:8032

    我正在我的开发机器 Mac 上编写 Spark 程序 hadoop的版本是2 6 spark的版本是1 6 2 hadoop集群有3个节点 当然都在linux机器上 我在idea IDE中以spark独立模式运行spark程序 它运行成功
  • 猪的组连接等效吗?

    试图在 Pig 上完成这个任务 寻找 MySQL 的 group concat 等效项 例如 在我的表中 我有以下内容 3fields userid clickcount pagenumber 155 2 12 155 3 133 155
  • HDFS:使用 Java / Scala API 移动多个文件

    我需要使用 Java Scala 程序移动 HDFS 中对应于给定正则表达式的多个文件 例如 我必须移动所有名称为 xml从文件夹a到文件夹b 使用 shell 命令我可以使用以下命令 bin hdfs dfs mv a xml b 我可以
  • 如何解决使用 Spark 从 S3 重新分区大量数据时从内存中逐出缓存的表分区元数据的问题?

    在尝试从 S3 重新分区数据帧时 我收到一个一般错误 Caused by org apache spark SparkException Job aborted due to stage failure Task 33 in stage 1

随机推荐

  • 新手小白必看 Python爬虫学习路线全面指导

    爬虫是大家公认的入门Python最好方式 没有之一 虽然Python有很多应用的方向 但爬虫对于新手小白而言更友好 原理也更简单 几行代码就能实现基本的爬虫 零基础也能快速入门 让新手小白体会更大的成就感 因此小编整理了新手小白必看的Pyt
  • 微信小程序之别踩白块游戏

    微信小程序项目实例 别踩白块游戏 项目成果展示 项目功能具体 核心代码展示 项目成果展示 微信小程序 别踩白块游戏演示 项目功能具体 该项目是一个别踩白块的小游戏 会拥有无限模式 限时模式 极速模式等游戏模式 并且可以记录最高得分和最长时长
  • STM32学习笔记—串口数据的基本收发(基于HAL库)

    在STM32中 串口主要使用的是异步串行通信 由于我现在学习的是HAL库 所以我只留意HAL库里面的有关串口的发送和接受函数 数据的接收和发送主要分阻塞式和非阻塞式 由于阻塞式是通过延时来实现的 也就是说在发送和接收的时候 整个系统都在都停
  • R语言保存EXCEL小技巧

    R语言中将数据框保存为EXCEL的方法 文章目录 前言 一 小tip 前言 创建名为df的数据框 一 小tip 使用readr包和openxlsx包
  • C++ sort函数对class类排序

    sort是stl中一个经常用到的排序函数 可以对数组或类似数组 例如vector 的结构进行排序 默认为升序排序 例如下面的代码对vec进行升序排序 sort vec begin vec end 若想降序排序 则只需加greater即可 s
  • Linux基础网络设置和Samba文件共享服务

    作者 小刘在C站 个人主页 小刘主页 每天分享云计算网络运维课堂笔记 努力不一定有收获 但一定会有收获加油 一起努力 共赴美好人生 夕阳下 是最美的绽放 树高千尺 落叶归根人生不易 人间真情 目录 一 Linux基础网络设置 1 服务突然中
  • BUG :failed with repodata from current_repodata.json, will retry with next repoda

    在anaconda里面再次冲洗进行安装pytorch 时 具体步骤可见安装笔记 报错 failed with repodata from current repodata json will retry with next repoda 应
  • 欧拉定理(降幂)

    欧拉定理 定理 感觉这个定理降幂的时候用的多一点 题1 题面 思路 对于每一个数字ai 出现的次数为 A i C n
  • 基于阿里移动端积木框架Tangram自定义首页卡片

    Tangram 移动端框架介绍点击打开链接 项目背景 公司app首页需要完全做到可配置组合提高运营可维护性 由移动端选择了Tangram作为卡片布局框架 此框架有最大的一个好处是 同一份JSON格式数据渲染出来的界面安卓和IOS风格一直 对
  • Unable to open debugger port (127.0.0.1:50573): java.net.SocketException

    现象 IDEA tomcat启动项目报Unable to open debugger port 127 0 0 1 50573 java net SocketException 的错误 导致无法完成启动 解决方法 1 打开cmd 2 net
  • 摘:为什么老毛桃初始化后有了两个分区?

    为什么老毛桃初始化后有了两个分区 答案 制作出来的启动盘支持uefi启动就要有uefi分区 同时也是为了支持两种启动方式 传统mbr和uefi 提高兼容性 原文 http ask zol com cn x 5938014 html 转载于
  • mmsegmentation中如何输出mask

    在mmaegmentation中默认的输出结果是将mask与原图重叠在一起 有时候我们需要输出只有mask的图 具体修改代码如下 在tools test py中进行如下设置 如果没有标签的测试集中 在 eval参数中设置为None即可 pa
  • c++学习笔记-----this指针、构造函数、析构函数和友元函数

    一 this指针 1 概念理解 说起this指针 我个人的理解就是假如我们生产了同一种型号的两个杯子 当张三要买的时候 我们就用一个工具 this指针 给该杯子底部刻上张三的名字用来识别是张三 当李四要买 我们就给杯子刻上李四的名字 这样虽
  • 自顶向下、逐步求精

    自顶向下 逐步求精 自顶向下 逐步求精是计算机编程里面常用的思路 自顶向下 将大问题分解为各个小问题 再逐步求解 逐步求精 对于每个问题逐次细化 不断完善 例子 就如对一个洗衣机程序 先将他的洗衣这个大问题分为多个小问题 选择 洗衣模式 输
  • 修复Ubuntu18.04与Windows 10双系统丢失grub引导界面

    修复Ubuntu18 04与Windows 10双系统丢失grub引导界面 问题描述 最近因为学习Linux而装了Windows10 ubuntu的双系统 安装后的前几天还一切正常 每次开机grub都会让人性化的让我选择想要进入的系统 好景
  • IOS开发 返回到指定的视图控制器

    for UIViewController controller in self navigationController viewControllers BOOL isKindOfClass controller isKindOfClass
  • 关于v-if判断用法

    2019独角兽企业重金招聘Python工程师标准 gt gt gt
  • Qt调用主界面ui

    一 适用情景 在其他类中使用主界面ui this 控件操作等 二 步骤 首先 子类需要包含主界面的头文件和ui xx h文件 声明主界面类 然后使用构造函数把主界面的指针传递给子类 子类头文件 ifndef ONEUI H define O
  • 为Linux系统增加中文字体支持:解决显示问题的三种方法

    Linux 增加中文字体支持 在使用 Linux 操作系统的过程中 用户经常会遇到中文字体显示问题 导致一些界面上的文字不能正确显示 为了解决这个问题 我们可以通过以下方法来增加中文字体支持 一 安装中文字体 要安装中文字体 可以使用 XF
  • Kafka 丢数据问题

    Kafka是最初由Linkedin公司开发 是一个分布式 分区的 多副本的 多订阅者 基于zookeeper协调的分布式日志系统 也可以当做MQ系统 常见可以用于web nginx日志 访问日志 消息服务等等 Linkedin于2010年贡