Consumer位移管理-Kafka从入门到精通(十一)

2023-10-30

上篇文章说了,sesstion.time.out 、max.poll.interval.ms、max.poll.records和auto.offset.reset等参数。

KafkaConsumer-Kafka从入门到精通(十)https://blog.csdn.net/ke1ying/article/details/126294017

订阅topic

订阅consumer直接:

Consumer.subscribe(Arrays.asList(“topic1”,“topic2”));

如果使用独立的consumer(standalone consumer),则可以手动订阅,

TopicPartition p1 = new TopicPartition(“topic-name”,0);

Consumer.assign(Arrays.asList(p1));

不管用哪种方法,consumer订阅都是延迟生效的,订阅的消息只有在下次poll调用的时候才会生效。

消息轮询

Poll原理

consumer是用来读取消息的,而且要能够同时读取多个topic的多个分区消息。若要实现并行读取消息,一种方式使用多线程方式,为每个要读取的分区都要创建一个专有线程去消费(这就是旧版本cousumer采用的方式),另一种方法采用linuxI/O模型的poll或者select等,使用一个线程同时管理多个socket连接,即同时与多个broker通信实现并行读取。

一旦consumer订阅了topic,所有的消费逻辑包括coordinator的协调,消费者组的rebalance以及数据的获取会在主逻辑poll方法中一次调用中被执行,这样用户很容易使用一个线程来管理所有的cousumerIO。

对于问题,consumer到底是单线程还是多线程呢?

最新版的kafka是一个多线程或者双线程的java进程,创建kafkaConsumer的称为主线程,同时在后台创建一个心跳线程,该线程被称呼为后台心跳线程。

kafkaConsumer的poll方法在用户主线程中运行,这同时也表明:消费者组的rebalance、消息获取、coordinator管理、异步任务结果的处理、位移提交等操作这些都在主线程中的,因此仔细调优参数至关重要。

Poll使用方法

Consumer订阅topic之后通常以事件循环的方法来获取消息读取,poll方法根据当前consumer的消费位移返回消息集合。当poll首次被调用的时候,新的消费者组会根据位移重设策略(auto.offset.reset)来设定消费者组的位移,一旦consumer开始提交位移,后续的rebalance完成后会将位置设置为上次已提交的位移。传递给poll方法的超时设置参数用于控制consumer等待消息的最大阻塞时间,比如consumer至少需要1M的数据,那么此刻consumer会以阻塞不断累计数据到满足1M,如果不想让consumer一直阻塞,则可以给一个过期时间,一定时间内如果还没有满足,则返回。

  1. 要么等数据满了。
  2. 要么等待时间超过了指定时间。

前面我们说了consumer是单线程设计(其实还有一个心跳线程,辅助线程看主线程是否保持心跳,暂不考虑,不承担逻辑),因此consumer应该运行在他的专属线程中。新版本的java consumer不是线程安全的,如果没有显式的同步锁机制保护,kafka会抛出kafkaConsumer is not safe for multi-threaded access

的异常,如果看到了这样的报错,那么说明kafkaConsumer运用在多线程中,对于目前的kafka设计而言,是不被允许的。

我们可以在while条件指定一个布尔变量isRunning来标识是否需要退出consumer消费循环并且结束consumer应用。具体应该是将isRunning标识为volatile,然后其他线程用isRunning=false来控制线程结束。最后千万不要忘记关闭consumer,这不仅会清楚consumer创建的各种socker资源,还会通知消费者coordinator主动离开从而更快的rebalance。比较推荐在finally代码里显式关闭。

位移管理

Consumer位移

Consumer端要为每个它读取的分区保存消费进度,即分区中最新消费消息的位置,该位置就是offset。Consumer需要定期向kafka提交自己的位置信息,实际上,这个信息通常是下一条带消费消息的位置。假设consumer已经读取了某个分区第n条消息,那么他应该提交位移为N,因为位移是从0开始,位移n的位子是n+1条消息。这样conusmer重启时会从第n+1条开始消费。

Offset对于consumer非常重要,因为他们是实现消息交付语义保证(message delivery semantic)的基石,常见的3中消息交付语义保证。

  1. 最多一次(at most once)处理语义:消息可能丢失,但不会被重复处理。
  2. 最少一次(at least once)处理语义:消息不会丢失,但可能处理多次。
  3. 精确一次(exactly)处理语义:消息一定会被处理且只会处理一次。

显然,若consumer在消费之前就提交位移,那么多在位移提交完成之后,消费还未消费就崩溃了,这时候consumer重启,则会从新的位移开始消费,则这个已提交的位移会丢失。相反的,若consumer在消费之后再提交,则可以实现at least once。好消息是这个出现多次处理的情况,已经在kafka0.11.0.0版本得到解决。

上次提交位移(last committed offset):consumer最近一次提交的offset值。

当前位置(current position):consumer已读取但尚未提交的位置。

水位(watermark):也被称为高水位(high watermark),严格来说他不属于conusmer管理范围,而属于分区日志概念,consumer可以读取水位之下的所有消息,水位之上的则不可以读取。

日志终端位移(log end offset,leo):日志的最新位移,同样不属于consumer范畴,而属于分区日志管辖。它表示了某个分区副本当前保存消息对应最大的位移值。值得注意的是,正常情况下leo不会比水位值小。事实上,只有分区所有副本都保存某条消息,该分区leader副本才会向上移动水位值。

版本版consumer位移管理

consumer会在kafka集群所有broker里选一个broker作为consumer group的coordinator,用于实现组成员管理,消费分配方案,位移提交等。和普通的kafka topic相同,该topic配置多个分区,每个分区有多个副本。位移存在的目的就是保存consumer提交的位移。

当消费者组首次启动时,由于没有初识位移信息,coordinator必须为其确定初始位移值,这就是consumer参数auto.offset.reset的作用。通常consumer要么从最早位移开始读取,要么从最新位移开始读取。

Consumer提交位移主要机制通过向所属coordinator发送位移提交请求实现的,每个位移提交都会往_consumer_offsets对应分区上追加一条消息。消息的key是groupid、topic等,而value就是位移值,如果consumer为同一个group的同一个topic分区提交多次位移,那么就会存在多条key相同但value不同的消息,显然我们只关心最新一条。

自动提交和手动提交

位移提交策略对提供消费交付语义至关重要,默认情况下consumer自动提交间隔是5s、这就是说若不做特定设置,consumer可以通过参数auto.commit.interval.ms参数可以控制自动提交间隔。

自动提交位移的优势是降低用户开发成本使得用户不比处理位移提交,劣势用户不能细颗粒度的处理位移提交,特别是强调精确一次处理语义时,这种情况下,用户可以手动位移提交。

典型的consumer场景,用户需要对poll方法返回的消息集合中消息执行业务处理,用户想要保证只有消息被真正处理才去提交位移,如果自动提交则无法保证这种位移时序性,因此这种情况下必须手动提交位移。在构建kafkaConsumer时设置enable.auto.commit=false,然后调用conmmitSync或commitAsync方法即可。

自动提交:默认配置,enable.auto.commit=true。开发简单,无法实现精确控制,位移提交失败后不易处理。可能造成数据丢失,最多实现“最少一次”处理语义。能容忍一定消息丢失。

自动提交:设置enable.autocommit=false。手动调用consumer.commitSync或

consumer.commitAsync位移提交,可以实现“最少一次”处理,依赖外部可以实现“精确一次”处理语义。

手动提交分为异步commitAsync和同步commitSync,如果调用commitSync,用户程序会等待位移提交结束才执行下一条语句。若调用commitAsync则是一个异步阻塞调用,comsumer会在后续poll轮询该位移结果。这里的异步不是指consumer单独的线程进行位移提交,实际上consumer依然会在主线程poll方法中不断轮询这次异步提交结果,只是该提交不会让这个方法阻塞。

当这个无参数的时候,conmmitSync和commitAsync在调用的时候,都会为他订阅的所有分区进行位移提交。其实他还带另外两个参数的重载方式,用户调用这个方法的时候,需要显式指定一个map告诉kafka哪些分区做提交更为合理。实际使用中这种更加合适,因为consumer只对他所拥有的分区进行提交更为合理。


    ConsumerRecords<String, String> records= consumer.poll(lOOO); 
    for (TopicParti tion partition : records.partitions()) { 
    List<ConsumerRecord<String, String partit onRecords = 
                    records.records(partition); 
    for (ConsumerRecord<String, String> record : partitionRecords) { 
            System.out.println(record.offset ()+”:”+ record.value());
        }
    long lastOffset = parti tionRecords. get (partitionRecords. 
        size() - 1) . offset() ; 
        consumer.commitSync(Collections singletonMap(part tion, new 
        OffsetAndMetadata(lastOffset + 1)));

这里特别需要注意的是,提交的位移一定是consumer下一条待读取消息的位移,这也就是为什么offset+1的原因。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Consumer位移管理-Kafka从入门到精通(十一) 的相关文章

  • 在 Java 中捕获(捕获)窗口中的鼠标光标

    我正在寻找一种方法 在鼠标进入窗口后捕获或捕获该窗口中的鼠标 就像鼠标被捕获在虚拟机窗口中一样 直到用户按 CTRL ALT DEL 或以其他方式释放鼠标 我如何在 Java 中实现这一点 全屏显示不是一个选择 EDIT 这里有一些 SSC
  • 如果您不在 Java 中进行克隆,那么您会做什么以及如何称呼它?

    有没有人对 Java 中的复制构造函数 工厂方法等有任何建议或已建立的最佳实践和命名约定 特别是 假设我有一堂课Thing我想要一个返回新值的方法Thing与 a 具有相同的值Thing传入 如果是实例方法 则作为实例 您会将其作为构造函数
  • 在 String 值之后打印 int 值

    我有以下示例代码 int pay 80 int bonus 65 System out println pay bonus bonus pay 有人可以向我解释一下为什么我得到以下输出 145 6580 您的代码正在从左到右解释表达式 pa
  • JAVA 中的 Composer 相当于什么? [关闭]

    Closed 这个问题不符合堆栈溢出指南 help closed questions 目前不接受答案 我目前从 PHP 转向 java 有没有类似的工具composer https getcomposer org 在 PHP 中用于 JAV
  • Selenium 和 TestNG 同时使用“dependsOn”和“priority =”问题

    我正在努力在 GUI 自动化测试中实现更好的工作流程控制 我首先从dependsOn开始 但很快发现缺点是如果一个测试失败 则套件的整个其余部分都不会运行 所以我改用 priority 但看到了意外的行为 一个例子 Test priorit
  • 如何避免Eclipse在将类名放在注释中时导入类,以便checkstyle稍后不会抱怨?

    有时我将类名放在方法或类的注释中只是为了引用 但是 Eclipse 会自动执行导入并在文件中留下导入语句 这会导致稍后出现 未使用的导入 检查样式错误 当我在注释中输入类名时 是否可以更改一些配置以避免 Eclipse 自动导入 人们不同意
  • 使用 Jena 查询维基数据

    目前 Wikidata 有一个 SPARQL 端点 https query wikidata org https query wikidata org 我想使用 Jena 3 0 1 查询此网站 我使用以下代码 但收到错误消息 端点返回的
  • 在拇指上方显示修改后的 JSlider 值

    有没有一种简单的方法可以在使用某些 外观和感觉 的同时更改 JSlider 上方标签中显示的值 为了清楚起见 我正在谈论这个值 具体来说 我想显示除以 1000 的值而不是值本身 我知道如果我显示它们 我可以为刻度设置标签 但用户将不得不猜
  • for循环中更新JLabel的问题

    我的程序的想法是从之前在其他 JFrame 中保存的列表中选择一个名称 我想在标签中一个接一个地打印所有名称 它们之间有很小的延迟 然后停在其中一个名称上 问题是lbl setText String 如果有多个则不起作用setText co
  • 如何在 Eclipse 中获得完全限定的类名?

    有没有一种快速方法可以在 Eclipse 中单击 Java 类并获取其完全限定名称 或将其复制到剪贴板 2016年6月29日编辑 正如 Jeff 所指出的 您只需要执行以下第二步 1 Double click on the class na
  • 是否可以手动检查 LocateRegistry 是否存在?

    I 已经发现 https stackoverflow com a 8338852 897090一种安全的方式获得LocateRegistry 即使注册表尚不存在 Registry registry null try registry Loc
  • ActiveMQ JNDI 查找问题

    尝试使用 JNDI 运行以下 ActiveMQ http activemq apache org jndi support html http ActiveMQ 20JNDI 并且我的 jboss server node lib 文件夹中有
  • 接口是否像对象一样对待?

    为什么下面的代码可以工作 interface I class A implements I public String toString return in a class B extends A public String toStrin
  • 如何在 spring-data 中强制使用 CrudRepository 进行预加载?

    我有一个实体 其中包含List就是这样lazy默认加载 interface MyEntityRepository extends CrudRepository
  • Hibernate 标准接受 %% 值

    我正在使用下面的 Hibernate 代码来过滤workFlowName crt add Restrictions like workFlowName workFlow MatchMode ANYWHERE crt is the crite
  • 无法连接到docker中的elasticsearch容器

    我正在尝试使用 docker 的官方 elasticsearch 镜像 我遵循了本指南 https www elastic co guide en elasticsearch reference current docker html但是当
  • 使用 Java 8 Spring 4 + MyBatis 集成问题

    使用 Java 8 1 8 0 60 Spring 4 2 1 和 MyBatis 3 3 0 时遇到以下异常 Sep 29 2015 11 02 58 AM org springframework context annotation A
  • 在java中执行匿名pl/sql块并获取结果集

    我想执行匿名 PL SQL 并需要获取结果集对象 我得到了可以通过在 PL SQL 块内使用游标来完成的代码 但 PL SQL 块本身将以文本形式来自数据库 所以我无法编辑该 PL SQL 块 并且它只会返回两个值 其列名始终相同 它将返回
  • 如何在J2ME中获取数字的幂[重复]

    这个问题在这里已经有答案了 可能的重复 J2ME power double double 数学函数实现 https stackoverflow com questions 2076913 j2me powerdouble double ma
  • Unicode(希腊语)字符存储在数据库中,例如“??????”

    数据库中的希腊字符就像问号 我找不到解决办法 我使用 Java Swing 开发了一个应用程序 但是当我在 MySQL 中插入希腊字母时 就像问号一样 我将数据库排序规则更改为 utf8 并将列也更改为 utf8 我的项目编码设置为UTF

随机推荐

  • WSL2 引起的 VirtualBox 启动问题

    罪魁祸首 https docs microsoft com zh cn windows wsl wsl2 install 尝试启用 WSL2 执行了 Enable WindowsOptionalFeature Online FeatureN
  • 9、数据类型

    1 布尔型布尔型的值只可以是常量 true 或者 false 初始化默认false 一个简单的例子 var b bool true 2 数字类型整型 int 和浮点型 float32 float64 Go 语言支持整型和浮点型数字 并且支持
  • 正则化- logistics回归

    正则化是一种常见的机器学习技术 它是在模型训练过程中为了防止过拟合而引入的一种约束方法 它通过在模型的损失函数中增加正则项来实现 正则项通常是模型参数的范数 L1 或 L2 范数 通过限制参数的大小来使得模型更加简单 从而减少过拟合的风险
  • pyinstaller 打包流程大体说明(linux)

    一 在文件中配置好 spec build sh start sh stop sh 文件后 后面的打包过程就很简单了 1 其中 spec文件是打包的流程 与pyinstaller有关 可以参考pyinstaller的spec文件编写规范 它能
  • SpringBoot 发布webservice接口,实现接口如何调用业务层代码

    如果直接按照业务层方式 在webservice实现是不可行的 Autowired无法自动注入 还会报空指针的错误 因为在webservice的自动注入不是在spring容器中找bean对象 所以按照service层方式是无法取得对象 所以我
  • OpenCL编程入门(一)

    OpenCL简介 开放计算语言 Open Computing Language OpenCL 是非盈利技术联盟Khronos Group管理的异构编程框架 该框架充分利用了CPU DSP FPGA GPU的计算能力 OpenCL支持多层次的
  • Java如何处理PermGen内存泄漏问题

    PermGen和内存泄漏问题 在Java早期版本中 永久代 PermGen 是Java虚拟机 JVM 中的一个内存区域 用于存储类的元数据 静态变量 常量等 PermGen的大小是固定的 如果PermGen空间不足 会抛出OutOfMemo
  • 舵机使用基础(SG90模拟舵机和MG90S数字舵机为例)(附驱动程序)

    工作环境 蓝色粗体字为特别注意内容 1 系统环境 SG90模拟舵机 MG90S数字舵机 12C5A60S2单片机 2 参考文献 很多不是航模或者机器人爱好者的同学可能舵机了解比较少 笔者也一样 只是单纯的单片机爱好者 只是有时候需要用舵机来
  • 【舰船数据集格式转换】AIR-SARShip-1.0数据集VOC转COCO

    提示 文章写完后 目录可以自动生成 如何生成可参考右边的帮助文档 文章目录 前言 一 sar舰船图像检测数据集 二 使用步骤 1 原始数据集 2 xml2json AIR SARShip 1 0 3 json文件 总结 前言 最近一直在做s
  • Qt Charts简介

    文章目录 一 图标类型Charts分类 1 折线图和样条曲线图 2 面积图和散点图 3 条形图 4 饼图 5 误差棒图 6 烛台图 7 极坐标图 二 坐标轴Axes类型分类 三 图例 四 图表的互动 五 图表样式主题 一 图标类型Chart
  • Linux软链接硬链接的区别

    ln是linux中又一个非常重要命令 它的功能是为某一个文件在另外一个位置建立一个同步的链接 当我们需要在不同的目录 用到相同的文件时 我们不需要在每一个需要的目录下都放一个必须相同的文件 我们只要在某个固定的目录 放上该文件 然后在 其它
  • vscode链接AutoDL,并使用xtfp7传输文件

    1 AutoDL简介 AutoDL是一个GPU租赁平台 便宜好用 点击下面的链接注册即可 学生邮箱认证有优惠 4090 3090显卡都有还有3060 3080 TITAN Xp等等 AutoDL 品质GPU租用平台 租GPU就上AutoDL
  • 深度图拼接

    度量变换 图像拼接 检测深度图 国科大图像处理实验 度量变换 图像拼接 检测深度图 国科大图像处理实验 Root dobby的博客 CSDN博客 问题描述 目录hw3下有立体视觉对应的两幅图像view1 png和view5 png 图像来源
  • 家用电脑可以用做服务器吗

    家用电脑的结构与服务器的结构是相同的 家用电脑是可以用来搭建服务器使用 但使用家用电脑做服务器在稳定性会比服务器差很多 1 家用电脑没有公网IP 网络运营商分配的IP重启路由之后是会变化 不固定 服务器运行是需要有固定IP让人连接访问 使用
  • Ngui 五种点击事件实现方式

    ngui作为unity界面插件之一中 无疑是最好用 使用最多的了从自学unity到现在界面一直使用它 由于它的持续更新 我在此不得不说 确实很为开发者作想 为什么这么讲呢 大概在去年吧 当时用的那个版本已经不记得了 反正就是有个需求 要实现
  • HTC相关开发所需SDK等工具都在这里了

    HTC相关开发所需SDK等工具都在这里了 转 OpenVR SDKhttps github com ValveSoftware openvr OpenVR SDK是由原本的SteamWorks SDK更新而来 新增对HTC VIVE开发者版
  • 时间格式转换LongToString

    import java util Calendar import java util Date import org apache commons lang3 StringUtils import org apache commons la
  • 3.Qt消息机制和事件

    9 Qt消息机制和事件 好文来自https www cnblogs com weizhixiang p 5824345 html 一 事件 鼠标 敲下键盘 或者是窗口需要重新绘制的时候 都会发出一个相应的事件 Qt 程序需要在main 函数
  • 人工智能在游戏开发中的应用:你目前所需的 6 大 AI 工具

    游戏体量越大 质量越高 所要求的标准就越严格 尤其是在 AAA 级游戏市场 任何失误都可能导致你陷入极其棘手的境地 影响玩家体验 进而招致恶评 随着对游戏的需求和预期不断攀升 游戏开发人员比以往任何时候都需要更多帮助 那么 他们如何才能紧跟
  • Consumer位移管理-Kafka从入门到精通(十一)

    上篇文章说了 sesstion time out max poll interval ms max poll records和auto offset reset等参数 KafkaConsumer Kafka从入门到精通 十 https bl