Kafka消息阻塞

2023-11-17

转自:http://jis117.iteye.com/blog/2279519

  1. hi all:  
  2.       大家都很关心kafka消息阻塞的情况(感谢RoctetMQ给我们的教训)。Kafka上线也有一段时间了,确实有出现过消息阻塞的情况,虽然不影响业务而且用临时办法解决了,但是我觉得可以跟大家总结一下。为了不引起大家的恐慌,我决定先把结论写出来:comsumer 非正常的rebalancing(重新分配分区)才会导致消费阻塞,如果不出现rebalancing,消息是不是重复消费或阻塞。  
  3.   
  4.       以下是这两个BUG的描述,这可能需要一些Kafka的知识,我会说得通俗一点,同也会留下一些参考给有兴趣的童鞋进一步了解。  
  5.      1. 消费者处理过慢可能会导致重复消费  
  6.      线上场景:BPM会订阅消费然后把流程信息一条一条索引到ElasticSearch,当索引处理较慢(30s)的时候会出现。  
  7.      重现步骤:https://gist.github.com/richard2011/23b563e6ee5bad4e9d56 ,很普通的代码,消费代码段加上sleep(30s)。  
  8.      产生原因:Kafka是使用poll()(长轮询)拉取消息,流程可以简单理解为: 拉取消息->向kafka broker发送心跳->提交 offset。当消费者处理过慢(session timeout为30s)没有向kafka broker发送心跳,而且没有提交 offset,broker就会发起rebalancing,这个分区就会分配给其他消费者重复消费。最坏的情况是一直在rebalancing,新的消息都不会被消费。  
  9.      临时办法:排查发现线上业务线正常场景没有消费处理过慢的场景,processor组件维护了一个线程池,每条消息都用一个线程处理。tasker-center会不断地把消息放在java的ArrayBlockingQueue。其他周边后端服务(如存储)这种处理能力难以评估的,可以尝试一下先把消息缓存到本地队列再做批量插入的操作,其实很多日志类或大数据类使用Kafka都是这样做的,如Flume。  
  10.      如果确实有这样的场景,请联系我,可以通过通过两个参数减小这问题发生,但都不是完美解决问题。1) 增加session time的时间,但同时也会增加客户端失败的时间。2)减小分区拉取值(max.partition.fetch.bytes默认为1M), 但会影响吞量,而且以bytes为单位也无法评估消息的数量。  
  11.      修复计划:Kafka社区专门针对这个问题写了篇WIKI https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=61333789 。这是他们的改进计划,内容有目的(Motivation), 计划修改(Proposed Change), 新增或修改的公共接口(New or Changed Public Interfaces),升级计划和兼容性(Migration Plan and Compatibility)和已放弃方案(Rejected Alternatives)。@听风,可以参考一下他的文档模板,用于轩辕组件的改进计划模板挺好的。这个BUG暂定为随着Kafka版本的升级来修复。  
  12.   
  13.       2. 多分区多consumer时rebalancing可能会导致某个分区阻塞。  
  14.      线上场景:发生在在cart-processor,每个topic有18个分区,每个cart-processor有两个consumer(不同groupId),8个cart-processor节点。当cart-processor发版(节点增加或删除)会引起rebalancing,这可能导致个topic的分区阻塞。  
  15.      重现步骤:代码https://gist.github.com/richard2011/d92caaa4af50331b0953,创建18个分区的topic, 通过不断地增加或删除,同时通过kafka-consumer-groups.sh命令查看分区情况,直到出现分区阻塞。  
  16.      产生原因:kafka client的BUG( https://issues.apache.org/jira/browse/KAFKA-2978 ),Kafka github主分支已修复,但是还没有release版,越多分区和consumer越容易出现这个问题。  
  17.      临时办法:每次使用kafka的程序发版时,用kafka-consumer-groups.sh命令查看分区情况,发现分区阻塞则重启对应的机器。同时也写了小工具,使comsumer再次rebalancing。  
  18.      修复计划:观察线上出现的频率,如果频繁出现,将会修复kafka client代码,出现不频繁则随Kafka升级修复。  
  19.      

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka消息阻塞 的相关文章

  • Zookeeper的常见面试题

    1 Zookeeper 1 1 Zookeeper基本概念 Zookeeper作为一个优秀高效且可靠的分布式协调框架 ZooKeeper 在解决分布式数据一致性问题时并没有直接使用Paxos算法 而是专门定制了一致性协议叫做 ZAB Zoo
  • 20道常见的kafka面试题以及答案

    JAVA面试宝典 搞定JAVA面试 不再是难题 系列文章传送地址 请点击本链接 目录 1 kafka的消费者是pull 拉 还是push 推 模式 这种模式有什么好处 2 kafka维护消息状态的跟踪方法 3 zookeeper对于kafk
  • MQ如何保证消息不丢失

    如何保证消息不丢失 哪些环节会造成消息丢失 其实主要就是跨网络的环境中需要考虑消息的丢失 主要是有以下几个方面 生产者往MQ发送消息 MQ的Broker是集群有主从的 主节点把消息同步到从节点时也需要考虑消息丢失问题 消息从内存持久化到硬盘
  • Kafka/Spark消费topic到写出到topic

    1 Kafka的工具类 1 1 从kafka消费数据的方法 消费者代码 def getKafkaDStream ssc StreamingContext topic String groupId String consumerConfigs
  • Kafka:主题创建、分区修改查看、生产者、消费者

    文章目录 Kafka后台操作 1 主题 2 分区 3 生产者 4 消费者组 Kafka后台操作 1 主题 1 创建主题 bin kafka topics sh create bootstrap server hadoop102 9092 r
  • 如何更好地使用Kafka?

    引言 要确保Kafka在使用过程中的稳定性 需要从kafka在业务中的使用周期进行依次保障 主要可以分为 事先预防 通过规范的使用 开发 预防问题产生 运行时监控 保障集群稳定 出问题能及时发现 故障时解决 有完整的应急预案 这三阶段 事先
  • Linux 下搭建 Kafka 环境

    安装步骤 准备软件目录 mkdir datalake 上传之前下载好的安装包到 datalake 目录下 jdk 8u181 linux x64 gz kafka 2 11 2 1 0 tgz zookeeper 3 4 5 tar gz
  • 第十四章 kafka专题之日志数据删除策略

    日志数据清理 为了控制磁盘的容量 需要对过去的消息进行清理 1 内部定时任务检测删除日志 默认是5分钟 2 日志清理参数配置 支持配置策略对数据进行清理 以segment为基本单位进行定期清理 当前正在使用的segment不会被清理 启用c
  • Kafka一致性

    一 存在的一致性问题 1 生产者和Kafka存储一致性的问题 即生产了多少条消息 就要成功保存多少条消息 不能丢失 不能重复 更重要的是不丢失 其实就是要确保消息写入成功 这可以通过acks 1来保证 保证所有ISR的副本都是一致的 即一条
  • Flink设置Source数据源使用kafka获取数据

    流处理说明 有边界的流bounded stream 批数据 无边界的流unbounded stream 真正的流数据 Source 基于集合 package com pzb source import org apache flink ap
  • Flink消费kafka出现空指针异常

    文章目录 出现场景 表现 问题 解决 tombstone Kafka中提供了一个墓碑消息 tombstone 的概念 如果一条消息的key不为null 但是其value为null 那么此消息就是墓碑消息 出现场景 双流join时 采用的是l
  • 仿kafka实现java版时间轮

    系统定时 超时 在我们平时的项目开发中 会设置系统的超时时间 比如在http接口中设置超时时间 在定时调度中也会用到 在jdk的开发的实现Timer和ScheduledThreadPoolExecutor DelayQueue定时调度中使用
  • springboot集成kafka实战项目,kafka生产者、消费者、创建topic,指定消费分区

    springboot集成kafka实战项目 kafka生产者 消费者 创建topic 指定消费分区 前言 本项目代码可直接集成到你现有的springboot项目中 功能包括 1 kafka生产者配置 2 kafka消费者配置 指定分区消费
  • Kafka : KafkaProducer Closing the kafka producer with timeoutMillis

    1 美图 2 背景 一段kafka写入程序 不晓得为啥突然发现很多奇怪的日志 kafka 多线程发送数据 然后在本地是可以的 在服务器上是偶现的 我写了一个本地程序多线程生产数据 发现是没有问题的 Test public void mult
  • explain查看sql语句执行计划

    explain sql 执行结果字段描述 id select唯一标识 select type select类型 table 表名称 type 连接类型 possible keys 可能的索引选择 key 实际用到的索引 key len 实际
  • [分布式] zookeeper集群与kafka集群

    目录 一 Zookeeper 概述 1 1 Zookeeper定义 1 2 Zookeeper 工作机制 1 3 Zookeeper 特点 1 4 Zookeeper 数据结构 1 5 Zookeeper 应用场景 1 6 Zookeepe
  • 公司实战 ElasticSearch+Kafka+Redis+MySQL

    一 需求 前一段时间公司要进行数据转移 将我们ES数据库中的数据转移到客户的服务器上 并且使用定时将新增的数据同步 在这过程中学到了很多 在此记录一下 二 技术栈 Mysql Redis ElasticSearch Kafka 三 方案 为
  • 在windows系统下使用IDEA对kafka源码进行编译环境搭建以及配置

    目录 一 前期准备工作 step1 安装JDK1 8 step2 安装zookeeper单机版 step3 安装Gradle 5 4 step4 安装scala 2 11 12 二 将kafka源代码部署到编辑器IDEA并测试 step1
  • 从 MySQL 到 DolphinDB,Debezium + Kafka 数据同步实战

    Debezium 是一个开源的分布式平台 用于实时捕获和发布数据库更改事件 它可以将关系型数据库 如 MySQL PostgreSQL Oracle 等 的变更事件转化为可观察的流数据 以供其他应用程序实时消费和处理 本文中我们将采用 De
  • 消息队列选型:Kafka 如何实现高性能?

    在分布式消息模块中 我将对消息队列中应用最广泛的 Kafka 和 RocketMQ 进行梳理 以便于你在应用中可以更好地进行消息队列选型 另外 这两款消息队列也是面试的高频考点 所以 本文我们就一起来看一下 Kafka 是如何实现高性能的

随机推荐

  • linux虚拟机重启后,运行nmtui提示NetworkManaer 未运行

    环境 centOS 8 虚拟机重启后 输入ifconfig 发现网卡丢失 1 重启NetworkManaer systemctl start NetworkManager 2 输入nmtui nmtui 编辑连接 笔者网络小白 只会用自动
  • 雅特力AT32F403A, 国产芯片PIN TO PIN 替代STM32F103

    中美贸易摩擦日渐加剧 美国从各个方面到处打压中国 半导体行业也收到一定冲击 逼迫国内企业不得不准备产品国产化方案 自从华为被美国制裁之后 国内的很多手机厂商明白了一个道理 爹有娘有 不如自己有 于是各大厂商纷纷走上了芯片国产化的道路 意法半
  • java Hashtable及其子类Properties 源码分析(通俗易懂)

    目录 一 前言 二 Hashtable详解 1 简介 2 特点 3 底层实现 4 HashMap VS Hashtable 三 Properties详解 1 简介 2 特点 3 具体使用 可以不看 四 完结撒 一 前言 大家好 本篇博文是对
  • 【已解决--2021报错】is not a supported wheel on this platform-解决安装simplejson失败的问题

    已解决 2021报错 is not a supported wheel on thisplatform 解决安装simplejson失败的问题 1 问题描述 直接在pycharm中pip安装simplejson失败 然后网上找了很多教程 但
  • 知网的爬取 很简单

    对于知网能爬出来的东西 首先说一下 论文的题目 时间 作者 摘要等信息 本文主要对搜索界面进行爬取 对于知网的爬虫可以说挺简单的 其难点在于有一个二次请求 通过断点分析youfiddler分析有两个要注意的url一个是红色的一个是橘色的 先
  • Javacv+Nginx实现rtsp转rtmp实现web端直播方式

    前言 前面的文章中使用websocket的方案在web端实现rtsp播放 因为各种原因 现需要重新写一套方案 不废话 上才艺 补充 项目中需求可能要同时观看多个摄像头 将本项目放开限制使用多个摄像头时 就会发现相机之间的切换加载时间及视频流
  • Android:实现一种浮动选择菜单的效果

    前几天更新了一下我手机上的百阅软件 上面的浮动对话框选择很好看 就模仿了一下 先看一下运行效果 主要原理是在dialog里扔进一个GridView 可以作为一个组件使用 源码如下 对话框使用的layout grid dialog xml
  • Lumen 5.2 中配置邮件

    本文转自 https laravel china org topics 1974 Lumen中的邮件配置好了之后还是很简单的 但是配置过程官方文档省略了太多 先来扒一扒遇到的坑 Class mailer does not exist 这个是
  • Mysql内置函数全解析——Mysql初级(三)

    一 前言 在关系型数据库使用的过程中 我们总会对DB里面的数据做各种不同形式的转换 字符串处理等基本操作 本文将会比较系统的学习总结Mysql中的各种内置函数 这是一个系列的文章 感兴趣的小伙伴可以关注一下哦 本文的行文思路是这样的 因为M
  • 微信小程序——获取绑定事件元素的ID

    小程序list数据带值跳转 一般直接通过设置item的id来标识或者通过设置键值data xxxx的方式标识 如下图所示 解析出来的结果如下图 我们看到它在元素上绑定了一个checkSchoolLogin事件 触发这个事件时需要获取该元素的
  • 关于单链表结构体定义结点时 LNode *LinkList的理解

    typedef struct LNode ElemType data 数据域 struct LNode next 指针域 LNode LinkList 先说结论 这个就可以直接理解为 第一个是便于定义变量的类型为LNode 如果没有使用ty
  • springboot配置来连接多个mysql数据库

    1 在yml里配置多数据库 spring datasource app1 jdbc url jdbc mysql localhost 3306 data1 useUnicode true characterEncoding UTF 8 se
  • leetcode-143-重排链表

    题意描述 给定一个单链表 L 的头节点 head 单链表 L 表示为 L0 L1 Ln 1 Ln 请将其重新排列后变为 L0 Ln L1 Ln 1 L2 Ln 2 不能只是单纯的改变节点内部的值 而是需要实际的进行节点交换 解题思路 Ali
  • 【机器学习】为什么需要对数值型的特征做归一化(Normalization)?

    目录 为什么需要对数值型的特征做归一化 一 概念定义 二 标准化 归一化的原因 用途 2 1 原因 三 数据归一化的影响 四 常用的3种归一化方法 4 1 归一化公式 4 1 1 线性归一化 Min Max Scaling 即我们一般指的归
  • SpringBoot 整合Activiti(二)——流程审批完整示例

    前两天做了一个SpringBoot整合Activiti的完整示例 功能包括 退回 通过 节点条件 指定办理人 生成流程图 高亮显示已办节点 查询任务列表 办理人 等 下面先简单记录 含完整代码 十六上班后再详细补充 1 画流程图 高亮生成的
  • 《视觉slam十四讲》之第7讲-特征提取与匹配

    特征 特征为图像中具有代表性的区域 可以为角点 边缘和区块等 特征是图像信息的另一种数字表达形式 特征具有以下性质 可重复性 Repeatability 相同的 区域 可以在不同的图像中被找到 可区别性 Distinctiveness 不同
  • Linux运行shell脚本时报错"syntax error near unexpected token `$'\r''

    Linux运行shell脚本时 常常会发生syntax error near unexpected token r 或者syntax error unexpected end of file等 诸如此类的报错信息出现的原因是因为在编写脚本内
  • Shell编程中脱字符(^)的用法

    cat configs signatures tmp 将configs signatures tmp文件内容作为grep命令的输入 grep v v是grep排除的参数 将configs signatures tmp除去空行的内容作为sor
  • 蓝桥杯在哪下载准考证

    点击自己的头像 gt 我的大赛 gt 会出现如下 gt 点击Java软件开发 根据自己报的方向 gt 可以看到考试信息 gt 下载准考证即可 转载于 https blog 51cto com 13534640 2090954
  • Kafka消息阻塞

    转自 http jis117 iteye com blog 2279519 hi all 大家都很关心kafka消息阻塞的情况 感谢RoctetMQ给我们的教训 Kafka上线也有一段时间了 确实有出现过消息阻塞的情况 虽然不影响业务而且用