Consumer.endOffsets 在 Kafka 中如何工作?

2024-04-03

假设我有一个无限期运行的计时器任务,它会迭代 kafka 集群中的所有消费者组,并输出每个组的所有分区的延迟、提交偏移量和结束偏移量。与 Kafka 控制台消费者组脚本的工作方式类似,只不过它适用于所有组。

就像是

单个消费者 - 不工作 - 不返回某些提供的主题分区的偏移量(例如提供 10 个 - 返回 5 个偏移量)

Consumer consumer;

static {
  consumer = createConsumer();
}

run() { 
  List<String> groupIds = getConsumerGroups();
  for(String groupId: groupIds) {
       List<TopicParition> topicParitions =  getTopicParitions(groupId);
       consumer.endOffsets(topicParitions); -- Not working - missing offsets for some partitions for some groups (in 10 - out 5)
   }
}

多个消费者 - 工作

run() { 
   List<String> groupIds = getConsumerGroups();
   for(String groupId: groupIds) {
        List<TopicParition> topicParitions =  getTopicParitions(groupId);
        Consumer consumer = createConsumer();
        consumer.endOffsets(topicParitions); This works!!!
   }
 }

版本:Kafka-Client 2.0.0

我是否错误地使用了消费者 api?理想情况下,我想使用单一消费者。

如果您需要更多详细信息,请告诉我。


我想你已经快到了。首先收集all您感兴趣的主题分区,以及then发出一个consumer.endOffsets命令。

请记住,我还没有尝试运行它,但类似这样的东西应该可以工作:

run() { 
   Consumer consumer = createConsumer();
   List<String> groupIds = getConsumerGroups();
   List<TopicPartition> topicPartitions = new ArrayList<>();

   for (String groupId: groupIds) {
        topicPartitions.addAll(getTopicPartitions(groupId));
   }

   consumer.endOffsets(topicPartitions); 
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Consumer.endOffsets 在 Kafka 中如何工作? 的相关文章

  • 无法解析类型为 xxx 的任何 bean;限定符:[@javax.enterprise.inject.Any()]

    我有一个 LoginProvider 接口 public interface LoginProvider boolean login String username String password 以及两种不同的实现 public clas
  • 如何使用 SimpleDateFormat 解析多种格式的日期

    我正在尝试解析文档中的一些日期 用户似乎以类似但不完全相同的格式输入了这些日期 以下是格式 9 09 9 2009 09 2009 9 1 2009 9 1 2009 尝试解析所有这些内容的最佳方法是什么 这些似乎是最常见的 但我想让我困扰
  • 对话框上的 EditText 不返回任何文本

    我太累了 找不到错误 我没有发现任何错误 但我没有从 editText 收到任何文本 请看下面的代码 活动密码 xml
  • Grails 2.3.0 自动重新加载不起作用

    我最近将我们的项目升级到 grails 2 3 0 一切工作正常 除了每当我更改代码时自动重新加载都无法工作的问题 这包括所有项目工件 控制器 域 服务 gsps css 和 javascript 文件 我的旧版本 grails 可以正常工
  • 为什么卡夫卡这么快[关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 如果我有相同的硬件 请使用 Kafka 或我们当前的解决方案 ServiceMix Camel 有什么区别吗 Kafka 能处理比它
  • Java:使用 HttpURLConnection 的 HTTP PUT

    如何执行 HTTP PUT 我正在使用的类似乎认为它正在执行 PUT 但端点将其视为我执行了 GET 我做错了什么吗 URL url new URL https HttpURLConnection conn HttpURLConnectio
  • 如何在 JSP 中导入类?

    我是一个完全的JSP初学者 我正在尝试使用java util List在 JSP 页面中 我需要做什么才能使用除以下类之外的类java lang 使用以下导入语句进行导入java util List 顺便说一句 要导入多个类 请使用以下格式
  • Firestore - RecycleView - 图像持有者

    我不知道如何编写图像的支架 我已经设置了 2 个文本 但我不知道图像的支架应该是什么样子 你能帮我告诉我图像的文字应该是什么样子才能正确显示吗 holder artistImage setImageResource model getArt
  • Java:正则表达式排除空值

    在问题中here https stackoverflow com questions 51359056 java regexp for a separated group of digits 我得到了正则表达式来匹配 1 到 99 之间的一
  • 列表应该如何转换为具体的实现?

    假设我正在使用一个我不知道源代码的库 它有一个返回列表的方法 如下所示 public List
  • RSA OAEP、Golang 加密、Java 解密 -BadPaddingException:解密错误

    我正在尝试解密使用 RSA OAEP 在 Golang 中加密的字符串 但出现 BadPaddingException 解密错误 很难弄清楚我错过了什么 这是Golang加密方法 func encryptString rootPEM io
  • Java 收集返回顶级项目的映射的嵌套流

    我有以下模型 class Item String name List
  • Dispatcher-servlet 无法映射到 websocket 请求

    我正在开发一个以Spring为主要框架的Java web应用程序 特别使用Spring core Spring mvc Spring security Spring data Spring websocket 像这样在 Spring 上下文
  • 尝试使用等于“是”或“否”的字符串变量重新启动 do-while 循环

    计算行程距离的非常简单的程序 一周前刚刚开始 我有这个循环用于解决真或假问题 但我希望它适用于简单的 是 或 否 我为此分配的字符串是答案 public class Main public static void main String a
  • 如何通过 Inno Setup for NetBeans 使用自定义 .iss 文件

    我将 Inno Setup 5 与 NetBeans 8 一起使用 并且我已经能够创建一个安装程序来安装该应用程序C users username local appname 但是我希望将其安装在C Programfiles 我如何在 Ne
  • JVM:是否可以操作帧堆栈?

    假设我需要执行N同一线程中的任务 这些任务有时可能需要来自外部存储的一些值 我事先不知道哪个任务可能需要这样的值以及何时 获取速度要快得多M价值观是一次性的而不是相同的M值在M查询外部存储 注意我不能指望任务本身进行合作 它们只不过是 ja
  • JSON 到 hashmap (杰克逊)

    我想将 JSON 转换为 HashMapJackson http jackson codehaus org 这是我的 JSON String json Opleidingen name Bijz trajecten zorg en welz
  • Hibernate 和可序列化实体

    有谁知道是否有一个框架能够从实体类中剥离 Hibernate 集合以使它们可序列化 我查看了 BeanLib 但它似乎只进行实体的深层复制 而不允许我为实体类中的集合类型指定实现映射 BeanLib 目前不适用于 Hibernate 3 5
  • 在android中跟踪FTP上传数据?

    我有一个运行 Android 的 FTP 系统 但我希望能够在上传时跟踪字节 这样我就可以在上传过程中更新进度条 安卓可以实现这个功能吗 现在 我正在使用org apache common net ftp我正在使用的代码如下 另外 我在 A
  • 嵌入式 Jetty - 以编程方式添加基于表单的身份验证

    有没有一种方法可以按如下方式以编程方式添加基于表单的身份验证 我用的是我自己的LdapLoginModule 最初我使用基本身份验证并且工作正常 但现在我想在登录页面上进行更多控制 例如显示徽标等 有没有好的样品 我正在使用嵌入式 jett

随机推荐

  • 中包含子包的语法是什么?

    我正在使用Spring并且我有一个很长的子包列表 我是否必须在包中一一指定它们
  • Android上批量获取大量位图资源

    我有一长串图形 icon1 0 png icon1 1 png icon1 2 png icon12 0 png icon12 1 png icon12 2 png 我想将它们打包到我的android应用程序中 理想情况下 我认为我应该能够
  • 如何从 C++ DLL 中的 C# 简单函数调用

    我在 C 中有一个简单的函数 不是类的方法 declspec dllexport extern C void stdcall TestFunc 我尝试从 C 调用它 DllImport ImportTest dll public stati
  • 将语料库转换为R中的data.frame

    我正在使用 tm 包来应用词干提取 并且需要将结果数据转换为数据框 可以在这里找到解决方案R tm包vcorpus 将语料库转换为数据帧时出错 https stackoverflow com questions 24703920 r tm
  • 检查数组中是否存在元素

    PHP中有一个函数叫做isset http php net isset检查某些内容 例如数组索引 是否存在并且具有值 Python 怎么样 我需要在数组上使用它 因为有时我会收到 IndexError 列表索引超出范围 我想我could使用
  • 删除“搜索”选项,但保留“搜索列”选项

    我想从我的应用程序中删除 全局搜索 选项 但保留 列搜索 选项 有任何想法吗 我尝试过不同的参数 例如searching FALSE filtering none 这些都不能正常工作 My code server R library shi
  • MVC中VIEWDATA和VIEWBAG存储在哪里?

    我对 MVC 非常陌生 在 ASP Net 中 存在状态管理技术 其中视图状态或 cookie 存储在客户端中 会话存储在服务器中 类似地 我们在 MVC 中有 Viewbag ViewData 和 TempData cookie 和会话也
  • iOS setValue withCompletionBlock 未调用

    在设置值时 我遇到了一些在 iOS 模拟器和设备 上未调用的completionBlocks 例如 void addShortUserPlaylistUrl NSString playlistId playlistName NSString
  • Plotly.py:在行之间填充,正/负不同颜色

    使用 Plotly 我可以轻松绘制两条线并填充它们之间的区域 import plotly graph objects as go fig go Figure fig add trace go Scatter x 1 2 3 4 y 1 2
  • 如何将 Git 子模块指针恢复到存储在包含存储库中的提交?

    我的主 git 存储库中有一个 git 子模块 据我了解 主存储库存储一个 SHA 值 某处 指向它 链接到 的子模块的特定提交 我进入我的子模块并输入git checkout some other branch 我不知道我来自哪个提交 我
  • 使用 Python 更新媒体 wiki 文章?

    你好 我有一个 cron 作业 它收集有关服务的一些统计信息 我需要 cron 作业以编程方式更新媒体 wiki 页面 附加到页面 我在 cron 中使用 python 那么我最好的选择是什么 是否有 mediawiki python 库的
  • 数据绑定后如何隐藏gridview列?

    我使用以下链接中的解决方案隐藏我的列 如何隐藏 GridView 中的 TemplateField 列 https stackoverflow com questions 4954871 how to hide a templatefiel
  • 消除重复的 try/catch 代码

    编写必须一次又一次处理相同异常的代码总是很无聊 有没有一种方法可以在不使用try catch的情况下编写代码 并向方法添加属性来捕获 并处理 可能发生的各种异常 这听起来像 AOP Postsharp 这会是理想的解决方案吗 因此 我想编写
  • 如何在protobuf消息中添加int数组

    我必须编写一个 protobuf 消息 它应该有 1 个整数变量和一个整数数组 package protobuf message myProto optional uint32 message id 1 optional int updat
  • 无法读取 Angular 4 中 null 的属性“outlets”

    我有 Angular 4 3 6 项目 其中模板片段产生此错误 模板块 a article title a 错误堆栈跟踪 ArticleSpComponent html 26 ERROR TypeError Cannot read prop
  • Android - 在开发和生产 Web 服务之间切换

    我想让我的应用程序在开发和生产 Web 服务之间切换 而不需要对代码进行太多更改 并且相对简单 现在我的网络服务地址为static final String类中的变量执行实际的 HTTP 调用 并使用 a 来切换应用程序其余部分中的代码st
  • 如何执行作为 sp 参数传递的 sql 文本?

    我有一个带有 nvarchar 参数的存储过程 我希望调用者在使用此 SP 时提供 sql 命令的文本 如何从 SP 内执行提供的 sql 命令 这可能吗 我认为可以使用 EXEC 但以下内容 EXEC script 错误表明无法按给定名称
  • 带有 Base64 图像的 v-card-media

    我正在 ColdFusion 中创建验证码图像 并将其作为 Taffy 的 REST feed 返回 然后在 Vuetify 中显示 ColdFusion 太妃糖代码
  • 如何从Excel中获取工作表名称

    如何从 Excel 获取工作表名称并将其添加到我的组合框列表中 我似乎无法将其添加到我的代码中 因为它是public static public static DataTable ExcelToDataTable string fileNa
  • Consumer.endOffsets 在 Kafka 中如何工作?

    假设我有一个无限期运行的计时器任务 它会迭代 kafka 集群中的所有消费者组 并输出每个组的所有分区的延迟 提交偏移量和结束偏移量 与 Kafka 控制台消费者组脚本的工作方式类似 只不过它适用于所有组 就像是 单个消费者 不工作 不返回