所有分区的eekToEnd并在Kafka消费者的自动重新平衡中幸存

2023-12-12

当消费者组 A 的 Kafka 消费者连接到 Kafka 代理时,我想查找所有分区的末尾,即使偏移量存储在代理端。如果有更多额外的消费者连接同一消费者组,他们应该获取最新存储的偏移量。 我正在做以下事情:

consumer.poll(timeout) 
consumer.seekToEnd(emptyList())

while(true) {
  val records = consumer.poll(timeout)
  if(records.isNotEmpty()) {
    //print records
    consumer.commitSync()
  }
}

问题是,当我连接消费者组 A 的第一个消费者 c1 时,一切都按预期工作,如果我连接消费者组 A 的附加消费者 c2,该组正在重新平衡,并且 c1 将消耗跳过的偏移量。

有任何想法吗?


您可以创建一个实现的类ConsumerRebalanceListener, 如下所示:

public class AlwaysSeekToEndListener<K, V> implements ConsumerRebalanceListener {

    private Consumer<K, V> consumer;

    public AlwaysSeekToEndListener(Consumer consumer) {
        this.consumer = consumer;
    }

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {

    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        consumer.seekToEnd(partitions);
    }
}

然后在订阅主题时使用此监听器:

consumer.subscribe(Collections.singletonList("test"), new AlwaysSeekToEndListener<String, String>(consumer));
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

所有分区的eekToEnd并在Kafka消费者的自动重新平衡中幸存 的相关文章

  • Google 地图查询返回的 JSON 包含像 \x26 这样的编码字符(如何解码?)

    在 Java 应用程序中 我获取 JSON 来自 Google 地图 其中包含以下字符 x26我想将其转换为其原始字符 据我所知 这是一个 UTF 8 表示法 但我不完全确定 在源 JSON 中 可能会出现各种编码字符 例如 x3c div
  • 如何在 Eclipse 中用阿拉伯语读写

    我在 eclipse 中编写了这段代码来获取一些阿拉伯语单词 然后打印它们 public class getString public static void main String args throws Exception PrintS
  • JTree 节点不会被直观地选择

    不知何故 我无法为我的 JTree 节点启用 选择突出显示 我正在我的项目中使用自定义单元格渲染器 这很可能导致此问题 这是完整的渲染器类代码 protected class ProfessionTreeCellRenderer exten
  • 无法加载 jar 文件的主类

    我使用 Eclipse IDE 开发了一个应用程序 创建应用程序后 我以 jar 格式导出项目 当我尝试运行此 jar 文件时 出现错误 无法加载主类 请帮忙 当您将项目导出为 jar 时 请参阅此所以问题 https stackoverf
  • 如何对 IntStream 进行逆序排序

    我正在使用 txt 文件读取数字BufferedReader 我想颠倒该流中元素的顺序 以便在收集它们时 它们将从最高到最低排列 我不想在构建数组后进行排序 因为我不知道其中可能有多少元素 我只需要最高的 N 个元素 in new Buff
  • Google Inbox 类似 RecyclerView 项目打开动画

    目前 我正在尝试实现 Google Inbox 例如RecyclerView行为 我对电子邮件打开动画很好奇 我的问题是 该怎么做 我的意思是 他们使用了哪种方法 他们用过吗ItemAnimator dispatchChangeStarti
  • Java:从元素创建 DOM 元素,而不是文档

    如您所知 在 Java 中创建 Dom 元素的正确方法是执行以下操作 import org w3c dom Document import org w3c dom Element Document d Element e e d creat
  • 如何将 Spotlight for Help 插入本地化的 macOS 应用程序?

    我正在 macOS 上使用 Swing GUI 框架实现 Java 应用程序 当使用system外观和感觉以及screen菜单栏 Swing 自动插入一个搜索栏 called 聚光灯寻求帮助 https developer apple co
  • Java 变量的作用域

    我不明白为什么这段代码的输出是10 package uno public class A int x 10 A int x 12 new B public static void main String args int x 11 new
  • 场景生成器删除 fxml 文件中的导入

    我使用场景构建器 Gluon Scene Builder JavaFX Scene Builder 8 1 1 来创建应用程序的 UI 并使用 Eclipse 开发 JavaFX 现在 每次我在场景生成器中保存某些内容时 它都会从 fxml
  • 使用 java 按电子邮件发送日历邀请

    我正在尝试使用 java 发送每封电子邮件的日历邀请 收件人收到电子邮件 但不会显示接受或拒绝的邀请 而是将该事件自动添加到他的日历中 我正在使用 ical4j jar 构建活动 邀请 private Calendar getInvite
  • 覆盖 MATLAB 默认静态 javaclasspath 的最佳方法

    MATLAB 配置为在搜索用户可修改的动态路径之前搜索其静态 java 类路径 不幸的是 静态路径包含相当多非常旧的公共库 因此如果您尝试使用新版本 您可能最终会加载错误的实现并出现错误 例如 静态路径包含 google collectio
  • 在 AKKA 中,对主管调用 shutdown 是否会停止其监督的所有参与者?

    假设我有一位主管连接了 2 位演员 当我的应用程序关闭时 我想优雅地关闭这些参与者 调用supervisor shutdown 是否会停止所有参与者 还是我仍然需要手动停止我的参与者 gracias 阻止主管 https github co
  • OpenJDK 版本控制

    上下文 我想确保我们系统上安装的 Java 不受 CVE 2022 21449 的影响 java version 给出 openjdk version 11 0 7 2020 04 14 LTS OpenJDK Runtime Enviro
  • 阻止 OSX 变音符号为所有用户禁用 Java 中的 KeyBindings?

    注 我知道这个问题 https stackoverflow com questions 40335285 java keybinds stop working after holding down a key用户必须输入终端命令才能解决此问
  • 来自客户端的超时 Web 服务调用

    我正在使用 RestEasy 客户端调用网络服务 一项要求是 如果调用运行时间超过 5 秒 则中止 超时调用 我如何使用 RestEasy 客户端实现这一目标 我只看到服务器端超时 即如果在一定时间内未完成请求 Rest Easy 网络服务
  • 如何移动图像(动画)?

    我正在尝试在 x 轴上移动船 还没有键盘 我如何将运动 动画与boat png而不是任何其他图像 public class Mama extends Applet implements Runnable int width height i
  • struts 教程或示例

    我正在尝试在 Struts 中制作一个登录页面 这个想法是验证用户是否存在等 然后如果有错误 则返回到登录页面 错误显示为红色 典型的登录或任何表单页面验证 我想知道是否有人知道 Struts 中的错误管理教程 我正在专门寻找有关的教程 或
  • Java 编码风格、局部变量与重复方法调用

    我更喜欢使用局部变量而不是多次调用同一方法 I prefer this Vehicle vehicle person getVehicle if vehicle instanceof Car Car car Car vehicle car
  • Java中有类似分支/跳转表的东西吗?

    Java有类似分支表或跳转表的东西吗 分支表或跳转表是 根据维基百科 http en wikipedia org wiki Branch table 用于描述使用分支指令表将程序控制 分支 转移到程序的另一部分 或可能已动态加载的不同程序

随机推荐

  • Swift 检查文本字段是否为空

    我知道有大量的堆栈溢出页面解释了如何执行此操作 但每次我从这里获取代码并将其放入其中时 我都会遇到相同的错误 并且该错误的值是 字符串 没有成员 文本 有什么可靠的方法可以快速检查文本字段是否为空吗 let userEmail userEm
  • C 将指针传递给函数指针并使用 malloc

    我正在尝试获取标准输入以扫描两个二维并行数组 arrAtk arrDef x行 xy列 yy每行的长度都是可变的 第一行输入是x每个数组中的行数 第二行是y为第一行的列数 接下来是y要读入 arrAtk 数组的整数 然后另一个y要读入 ar
  • UIImageView Mask Layer 不是来自角落,直到第一次滚动 PageViewController

    我想达到这个结果 所以我需要在底部添加面具 但这是我得到的 我尝试过的代码 private func setupImageCutPath let path UIBezierPath path move to zero path addLin
  • Jquery / Javascript 根据输入字段更改表单操作

    我有这样的表格
  • 如何在 Joomla 中使用 AJAX 更改另一个选择列表

    我有一个国家列表和每个国家的城市列表 我将两者都设置为下拉列表 我的问题是 当所选国家 地区发生变化时 如何更改列出的城市 这是我的 XML 代码
  • 我可以同时运行 Xcode 3 和 Xcode 4 吗?

    我刚刚升级到 Xcode 4 我有一个项目仍然依赖于 Xcode 3 如何重新安装 Xcode 3 并且不会搞砸当前的 Xcode 4 安装 没问题 Xcode 3 安装程序允许您选择安装目录 我的偏好是将其安装在 Developer ol
  • == 或 .Equals()

    为什么要使用其中一种而不是另一种 是身份测试 如果被测试的两个对象实际上是同一个对象 它将返回 true Equals 执行相等测试 如果两个对象认为自己相等 则返回 true 身份测试速度更快 因此您可以在不需要更昂贵的相等测试时使用它
  • 如何通过添加附加参数来重定向传入的 URL 请求

    问题 我的服务器应用程序收到传入的 HTTP 请求 请求是这样的 http example com id abc 我需要解析这个请求 修补额外的 URL 参数并调用托管的 html 文件 所以 http example com id abc
  • jaxb java 类到多个 xml 映射

    我有几个 java 类 我想使用 jaxb 将它们转换为 xml 我不需要根据模式生成类 我需要能够将类映射到不同的 xml 格式 所以我不想使用注释 据我所知 最好的选择似乎是使用外部 xml 绑定 所以我想知道 1 我正在使用日食 我是
  • vcl.h:没有这样的文件或目录

    我正在寻找在 Visual C 中编译一些旧的源代码 然而 我收到的许多错误中的第一个是 vcl h No such file or directory 这似乎是对 Visual Component Library 的引用 它似乎是 Bor
  • htaccess 指令后 POST 变为 GET

    我在 Apache htaccess 文件中使用以下指令隐藏 php 文件扩展名 RewriteCond THE REQUEST A Z 3 s php NC RewriteRule 1 R L NC RewriteCond REQUEST
  • 最大化窗口并使用 powershell 将其置于前面

    有没有办法从 powershell 中打开一个窗口 我尝试隐藏所有窗口 工作 并将 powershell 恢复 不工作 void System Reflection Assembly LoadWithPartialName Microsof
  • std::unordered_map 和由多个元素构建的键

    我想存储在包装网络连接的地图对象中 其中键应该是IP地址 端口号 我的问题是我应该如何处理带有两个元素的这样的键 我可以定义std unordered map
  • 未解决的依赖关系:com.typesafe.play#play-slick_2.10;0.6.0.1:找不到

    当我尝试在 Mac OS X Mavericks 上的 Play Framework 2 2 2 sbt 0 13 0 和 Scala 2 10 3 中使用 slick 或 play slick 时 出现未解决的依赖项错误 info Upd
  • WPF 中的依赖属性使用

    我很难找出依赖属性的充分理由 为什么 System Controls TextBox Text 属性是依赖属性而不是普通属性 作为依赖属性有什么好处 我想要完成的事情之一是将 ValidationRules 属性添加到我的 UserCont
  • Android N 要求 IDE 运行 Java 1.8 或更高版本?

    My XML布局未呈现并显示此错误消息 我已经在使用 Java 8 还使用 Gradle 中最新的构建工具 android compileSdkVersion android N buildToolsVersion 24 0 0 rc1 X
  • 示例请求:nant-contrib 的 任务

    谁能给我提供一个示例构建文件 演示如何使用 nant contrib 任务创建 Web 应用程序项目的安装程序 这个
  • 从 URL 下载文件到服务器

    嗯 这看起来很简单 确实如此 要将文件下载到服务器 您所需要做的就是 file put contents Tmpfile zip file get contents http someurl file zip 只有一个问题 如果您有一个大文
  • Phonegap 中 Android 的应用程序图标 [重复]

    这个问题在这里已经有答案了 我有一个 HTML5 Javascript 应用程序 并且我使用 PhoneGap 的构建服务为我的 Android 生成 APK 文件 我在我的 Android 上成功安装了 APK 文件 一切都很好 只是手机
  • 所有分区的eekToEnd并在Kafka消费者的自动重新平衡中幸存

    当消费者组 A 的 Kafka 消费者连接到 Kafka 代理时 我想查找所有分区的末尾 即使偏移量存储在代理端 如果有更多额外的消费者连接同一消费者组 他们应该获取最新存储的偏移量 我正在做以下事情 consumer poll timeo