kafka数据丢包原因及解决方案

2023-11-20

数据丢失是一件非常严重的事情事,针对数据丢失的问题我们需要有明确的思路来确定问题所在,针对这段时间的总结,我个人面对kafka 数据丢失问题的解决思路如下:

  • 是否真正的存在数据丢失问题,比如有很多时候可能是其他同事操作了测试环境,所以首先确保数据没有第三方干扰。
  • 理清你的业务流程,数据流向,数据到底是在什么地方丢失的数据,在kafka 之前的环节或者kafka之后的流程丢失?比如kafka的数据是由flume提供的,也许是flume丢失了数据,kafka 自然就没有这一部分数据。
  • 如何发现有数据丢失,又是如何验证的。从业务角度考虑,例如:教育行业,每年高考后数据量巨大,但是却反常的比高考前还少,或者源端数据量和目的端数据量不符
  • 定位数据是否在kafka之前就已经丢失还事消费端丢失数据的
  1. kafka支持数据的重新回放功能(换个消费group),清空目的端所有数据,重新消费。
  2. 如果是在消费端丢失数据,那么多次消费结果完全一模一样的几率很低。
  3. 如果是在写入端丢失数据,那么每次结果应该完全一样(在写入端没有问题的前提下)
  • kafka环节丢失数据,常见的kafka环节丢失数据的原因有:
  1. 如果auto.commit.enable=true,当consumer fetch了一些数据但还没有完全处理掉的时候,刚好到commit interval出发了提交offset操作,接着consumer crash掉了。这时已经fetch的数据还没有处理完成但已经被commit掉,因此没有机会再次被处理,数据丢失。
  2. 网络负载很高或者磁盘很忙写入失败的情况下,没有自动重试重发消息。没有做限速处理,超出了网络带宽限速。kafka一定要配置上消息重试的机制,并且重试的时间间隔一定要长一些,默认1秒钟并不符合生产环境(网络中断时间有可能超过1秒)。
  3. 如果磁盘坏了,会丢失已经落盘的数据
  4. 单批数据的长度超过限制会丢失数据,报kafka.common.MessageSizeTooLargeException异常
    解决:
  5. Consumer side:fetch.message.max.bytes- this will determine the largest size of a message that can be fetched by the consumer.
  6.  
  7. Broker side:replica.fetch.max.bytes- this will allow for the replicas in the brokers to send messages within the cluster and make sure the messages are replicated correctly. If this is too small, then the message will never be replicated, and therefore, the consumer will never see the message because the message will never be committed (fully replicated).
  8.  
  9. Broker side:message.max.bytes- this is the largest size of the message that can be received by the broker from a producer.
  10.  
  11. Broker side (per topic):max.message.bytes- this is the largest size of the message the broker will allow to be appended to the topic. This size is validated pre-compression. (Defaults to broker'smessage.max.bytes.)
  12. partition leader在未完成副本数follows的备份时就宕机的情况,即使选举出了新的leader但是已经push的数据因为未备份就丢失了!
    kafka是多副本的,当你配置了同步复制之后。多个副本的数据都在PageCache里面,出现多个副本同时挂掉的概率比1个副本挂掉的概率就很小了。(官方推荐是通过副本来保证数据的完整性的)
  13. kafka的数据一开始就是存储在PageCache上的,定期flush到磁盘上的,也就是说,不是每个消息都被存储在磁盘了,如果出现断电或者机器故障等,PageCache上的数据就丢失了。
    可以通过log.flush.interval.messageslog.flush.interval.ms来配置flush间隔,interval大丢的数据多些,小会影响性能但在0.8版本,可以通过replica机制保证数据不丢,代价就是需要更多资源,尤其是磁盘资源,kafka当前支持GZipSnappy压缩,来缓解这个问题 是否使用replica取决于在可靠性和资源代价之间的balance

同时kafka也提供了相关的配置参数,来让你在性能与可靠性之间权衡(一般默认):

当达到下面的消息数量时,会将数据flush到日志文件中。默认10000

log.flush.interval.messages=10000

当达到下面的时间(ms)时,执行一次强制的flush操作。interval.msinterval.messages无论哪个达到,都会flush。默认3000ms

log.flush.interval.ms=1000

检查是否需要将日志flush的时间间隔

log.flush.scheduler.interval.ms = 3000

Kafka的优化建议

producer端:

  • 设计上保证数据的可靠安全性,依据分区数做好数据备份,设立副本数等。
    push数据的方式:同步异步推送数据:权衡安全性和速度性的要求,选择相应的同步推送还是异步推送方式,当发现数据有问题时,可以改为同步来查找问题。
  • flushkafka的内部机制,kafka优先在内存中完成数据的交换,然后将数据持久化到磁盘.kafka首先会把数据缓存(缓存到内存中)起来再批量flush.
    可以通过log.flush.interval.messageslog.flush.interval.ms来配置flush间隔
  • 可以通过replica机制保证数据不丢.
    代价就是需要更多资源,尤其是磁盘资源,kafka当前支持GZipSnappy压缩,来缓解这个问题
    是否使用replica(副本)取决于在可靠性和资源代价之间的balance(平衡)
  • broker Consumer kafkaconsumer提供两种接口.
  1. high-level版本已经封装了对partitionoffset的管理,默认是会定期自动commit offset,这样可能会丢数据的
  2. low-level版本自己管理spout线程和partition之间的对应关系和每个partition上的已消费的offset(定期写到zk)
    并且只有当这个offsetack后,即成功处理后,才会被更新到zk,所以基本是可以保证数据不丢的即使spout线程crash(崩溃),重启后还是可以从zk中读到对应的offset
  • 异步要考虑到partition leader在未完成副本数follows的备份时就宕机的情况,即使选举出了新的leader但是已经push的数据因为未备份就丢失了!
  1. 不能让内存的缓冲池太满,如果满了内存溢出,也就是说数据写入过快,kafka的缓冲池数据落盘速度太慢,这时肯定会造成数据丢失。
  2. 尽量保证生产者端数据一直处于线程阻塞状态,这样一边写内存一边落盘。
  3. 异步写入的话还可以设置类似flume回滚类型的batch数,即按照累计的消息数量,累计的时间间隔,累计的数据大小设置batch大小。
  • 设置合适的方式,增大batch 大小来减小网络IO和磁盘IO的请求,这是对于kafka效率的思考。
  1. 不过异步写入丢失数据的情况还是难以控制
  2. 还是得稳定整体集群架构的运行,特别是zookeeper,当然正对异步数据丢失的情况尽量保证broker端的稳定运作吧

kafka不像hadoop更致力于处理大量级数据,kafka的消息队列更擅长于处理小数据。针对具体业务而言,若是源源不断的push大量的数据(eg:网络爬虫),可以考虑消息压缩。但是这也一定程度上对CPU造成了压力,还是得结合业务数据进行测试选择

  • 结合上游的producer架构,

broker端:

topic设置多分区,分区自适应所在机器,为了让各分区均匀分布在所在的broker中,分区数要大于broker数。分区是kafka进行并行读写的单位,是提升kafka速度的关键。

  1. broker能接收消息的最大字节数的设置一定要比消费端能消费的最大字节数要小,否则broker就会因为消费端无法使用这个消息而挂起。
  2. broker可赋值的消息的最大字节数设置一定要比能接受的最大字节数大,否则broker就会因为数据量的问题无法复制副本,导致数据丢失

comsumer端:

关闭自动更新offset,等到数据被处理后再手动跟新offset
在消费前做验证前拿取的数据是否是接着上回消费的数据,不正确则return先行处理排错。
一般来说zookeeper只要稳定的情况下记录的offset是没有问题,除非是多个consumer group 同时消费一个分区的数据,其中一个先提交了,另一个就丢失了。


问题:
kafka的数据一开始就是存储在PageCache上的,定期flush到磁盘上的,也就是说,不是每个消息都被存储在磁盘了,如果出现断电或者机器故障等,PageCache上的数据就丢失了。

这个是总结出的到目前为止没有发生丢失数据的情况

//producer用于压缩数据的压缩类型。默认是无压缩。正确的选项值是none、gzip、snappy。压缩最好用于批量处理,批量处理消息越多,压缩性能越好

     props.put("compression.type", "gzip");

     //增加延迟

     props.put("linger.ms", "50");

     //这意味着leader需要等待所有备份都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的保证。,

     props.put("acks", "all");

     //无限重试,直到你意识到出现了问题,设置大于0的值将使客户端重新发送任何数据,一旦这些数据发送失败。注意,这些重试与客户端接收到发送错误时的重试没有什么不同。允许重试将潜在的改变数据的顺序,如果这两个消息记录都是发送到同一个partition,则第一个消息失败第二个发送成功,则第二条消息会比第一条消息出现要早。

     props.put("retries ", MAX_VALUE);

     props.put("reconnect.backoff.ms ", 20000);

     props.put("retry.backoff.ms", 20000);

    

     //关闭unclean leader选举,即不允许非ISR中的副本被选举为leader,以避免数据丢失

     props.put("unclean.leader.election.enable", false);

     //关闭自动提交offset

     props.put("enable.auto.commit", false);

     限制客户端在单个连接上能够发送的未响应请求的个数。设置此值是1表示kafka broker在响应请求之前client不能再向同一个broker发送请求。注意:设置此参数是为了避免消息乱序

     props.put("max.in.flight.requests.per.connection", 1);

Kafka重复消费原因

强行kill线程,导致消费后的数据,offset没有提交,partition就断开连接。比如,通常会遇到消费的数据,处理很耗时,导致超过了Kafkasession timeout时间(0.10.x版本默认是30秒),那么就会re-blance重平衡,此时有一定几率offset没提交,会导致重平衡后重复消费。
如果在close之前调用了consumer.unsubscribe()则有可能部分offset没提交,下次重启会重复消费

kafka数据重复 kafka设计的时候是设计了(at-least once)至少一次的逻辑,这样就决定了数据可能是重复的,kafka采用基于时间的SLA(服务水平保证),消息保存一定时间(通常为7天)后会被删除
kafka的数据重复一般情况下应该在消费者端,这时log.cleanup.policy = delete使用定期删除机制

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

kafka数据丢包原因及解决方案 的相关文章

随机推荐

  • mysql navicat 修改默认值_怎么在navicat中为datetime设置默认值

    在使用navicat设计表的字段时 是可以给字段设置默认值的 但是datetime类型类型的字段缺设置不了 下面我们就为大家详细解读一下这个问题 由于MySQL目前字段的默认值不支持函数 所以以create time datetime de
  • 2023面试问答-计算机网络

    OSI 的七层模型分别是 各自的功能是什么 简要概括 物理层 底层数据传输 如网线 网卡标准 数据链路层 定义数据的基本格式 如何传输 如何标识 如网卡MAC地址 网络层 定义IP编址 定义路由功能 如不同设备的数据转发 传输层 端到端传输
  • 【ES实战】ES中关于segment的小结

    文章目录 ES中关于segment的小结 ES中segment相关的原理 在Lucene中的产生segment的过程 Lucene commit过程 ES为了实现近实时可查询做了哪些 缩短数据可被搜索的等待时长 增加数据的可靠性 优化seg
  • mysql更新一张表的字段来自另一张表的某个字段

    UPDATE tba a LEFT JOIN tbb b on a id b id set a xxx b xxxx where a id b id
  • 对于opencv摄像头调用与现实方向相反的问题怎么解决?

    可以对原始图像进行水平翻转 使用opencv自带的flip函数 例如 读取图像帧 ret frame cap read 水平翻转图像 frame cv2 flip frame 1 这样就可以了 后面的参数1代表水平翻转图像 而0代表垂直翻转
  • node.js与elasticsearch交互

    参考elasticsearch 以下简称es 官方javascript的API https www elastic co guide en elasticsearch client javascript api 6 x api refere
  • Sqli-Labs靶场(6--10)题详解

    目录 六 Less 6 GET Double Injection Double Quotes string GET 双重注入 双引号 字符串 七 Less 7 GET Dump into outfile string GET 导出文件 字符
  • Altium designer自动布线设置GND或其他网络不布线的方法

    1 在导航栏里面找到设计栏 找到类选项打开2 在Net Classes选项下 右击鼠标 找到添加类选项 会创建一个New Class 3 设置好需要布线的网络 以及不需要布线的网络 如下图 4 找到自动布线菜单栏下的网络类 点击进去如下图
  • Android下自定义的jar库文件编译和调用

    主要为了解决如下问题 项目中使用了Android未公开的API 在Eclipse下会有红叉显示 不同的项目抽出相同部分的代码共用 必需的前提条件 需要有Android源代码 编译的库文件主要是封装未公开API或者共用代码 工程1 Java库
  • h5单页面埋点问题(undefine)

    需求 商城页面里调用第三方资源埋点 代码实现 主要解决资源未加载就被调用问题 备注 把调用函数作为参数传进去 控制保证在资源加载完成后调用 let COLLECTURL http collect trc com index js 动态创建j
  • java的特点

    一 简单易学 1 java的风格类似于c 因而许多c c程序员初次接触java语言时会感到熟悉 从某种意义来说c 语言是从c语言继承而来 java语言是c 语言的一个变种 因此 学过c或c 的程序员可以更快速的掌握java编程技术 附图 编
  • 【mysql timeStamp默认值0000-00-00 00:00:00 报错:Invalid default value for ‘end_time’】

    mysql timeStamp默认值0000 00 00 00 00 00 报错 Invalid default value for end time 运行其中的sql文件时报错 nvalid default value for end t
  • python猜拳游戏编程代码_用python实现“猜拳"游戏

    原标题 用python实现 猜拳 游戏 用python实现 猜拳 游戏 先来练习一道用python编写的小程序 这道题是用for in 循环实现输入10个数并求和 这里用到了append 方法 append 方法 是一个很重要的方法 它是向
  • 计算机翻译的汉字,计算机系外文翻译(中英对照3000汉字左右).doc

    文档介绍 毕业设计 论文 外文资料翻译系别计算机信息与技术系专业计算机科学与技术班级姓名学号外文出处附件1 原文 2 putingMainarticle puter wasrecordedin1613 referringtoapersonw
  • 拓扑排序,广度优先

    使用一个队列来进行广度优先搜索 初始时 所有入度为 0 的节点都被放入队列中 它们就是可以作为拓扑排序最前面的节点 并且它们之间的相对顺序是无关紧要的 在广度优先搜索的每一步中 取出队首的节点 u 将 u 放入答案中 移除 u 的所有出边
  • hample滤波器的原理及其Python实现

    hample滤波器 1 作用及原理 2 Python实现 1 作用及原理 功能 检测并删除异常值 用一个一维向量 x x 1
  • 利用云原生数仓 Databend 构建 MySQL 的归档分析服务

    MySQL 常用 OLTP 业务环境 一般会使用比较好的硬件资源来提供对外服务 现在 MySQL 数据对外提供的数据动不动好几个 T 也是正常的 在很多业务中 数据有较强的生命周期 在线一段时间后 可能就是失去业务意义 如 某个业务下线 业
  • C语言通讯录

    主要知识 结构体 枚举 指针 递归 冒泡排序等 文章目录 一 前言 1 菜单 2 结构体创建 3 初始化通讯录 4 增加联系人 4 删除联系人 5 修改联系人信息 6 搜索联系人 7 显示联系人 8 联系人排序 三 代码展示 contect
  • 单片机FLASH操作

    FLASH 操作 查看程序已经占用的FLASH的扇区 剩余的扇区就是可以操作而不会使程序发生错乱的区域 找到listing文件夹下面的 map文件 搜索Memory Map of the image 查看占用的内存 起始地址是 0x8000
  • kafka数据丢包原因及解决方案

    数据丢失是一件非常严重的事情事 针对数据丢失的问题我们需要有明确的思路来确定问题所在 针对这段时间的总结 我个人面对kafka 数据丢失问题的解决思路如下 是否真正的存在数据丢失问题 比如有很多时候可能是其他同事操作了测试环境 所以首先确保