使用KafkaListener时,如何检查主题消息是否已读完?

2024-02-19

使用@KafkaListener时,如何检查主题消息是否已读完?


See 这个答案 https://stackoverflow.com/questions/55430893/how-to-check-if-kafka-is-empty-using-spring-kafka/55432339#55432339有关在没有更多记录可供读取时获取事件通知的信息。

您可以使用Consumer如果要获得当前的position(), and endOffsets();下面显示了如何获取结束偏移量:

@EventListener
void listen(ListenerContainerIdleEvent event) {
    System.out.println(event);
    try {
        System.out.println(event.getConsumer().assignment());
        System.out.println(event.getConsumer().endOffsets(event.getConsumer().assignment(), Duration.ofSeconds(5)));
    }
    catch (Exception e) {
        e.printStackTrace();
    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用KafkaListener时,如何检查主题消息是否已读完? 的相关文章

随机推荐

  • fileUpload 在使用 JSF 2.2 的 PrimeFaces 3.5 中不会触发事件

    我无法在 PrimeFaces 3 5 上使用 fileUpload 组件来触发该事件 我读过很多关于该主题的帖子并遵循了那里的建议 但仍然不起作用 我尝试了所有模式 简单 自动 高级 但没有成功 如果我使用 JSF 规范中的标准 inpu
  • 实体框架最佳实践:哪一层应该调用 SaveChanges()?

    对于一个干净的数据模型 我会来回讨论这个 以审批工作流程为例 假设在我的 Web 应用程序中 我有一个页面 可让用户标记MyEntityObject需要批准 MyEntityObject有一些控制其审批工作流程的属性 因此我有一个通用的实用
  • swift 中的 C 数组内存释放

    var cpuInfo processor info array t nil var numCpuInfo mach msg type number t 0 var coresTotalUsage Float 0 0 var numCPUs
  • 缺少对 Nexus 的 PUT 的请求实体响应

    我正在尝试模拟发布到我的 Nexus 存储库的 Maven 工件 试图了解为什么我的 gradle 构建失败 I try curl u me secret X PUT T my artifact H Content Type maven a
  • 多集群并行方法中启动时的变量范围

    我正在尝试弄清楚如何将函数和包传递给boot 运行并行计算时的函数 在循环内加载包或定义函数似乎非常昂贵 这foreach 我经常用于其他并行任务的函数有一个 packages 和 export 参数来处理这个问题 请参阅此所以问题 htt
  • gpg:未找到有效的 OpenPGP 数据

    我正在尝试在 Ubuntu 13 10 上安装 Jenkins 当我尝试运行以下命令时 出现上述错误 wget q O http pkg jenkins ci org debian jenkins ci org key sudo apt k
  • 日期之间的月份差异[重复]

    这个问题在这里已经有答案了 可能的重复 月份差异 https stackoverflow com questions 1525990 difference in months Hi all 我们如何使用 LINQ 计算两个日期之间的月份差异
  • 如何以编程方式将网站添加到 Internet Explorer 11 兼容性列表?

    我尝试在以下位置添加注册表项 Hive HKEY CURRENT USER Key Path Software Policies Microsoft Internet Explorer BrowserEmulation PolicyList
  • 使用 jquery 获取 html5 的值

    前几天 我在 stackoverflow 中阅读答案 我读到 jquery 可以获取 html5 的值
  • memmove 实际上是否“移动”了一块内存并在源头留下了零? [复制]

    这个问题在这里已经有答案了 可能的重复 memcpy 与 memmove https stackoverflow com questions 4415910 memcpy vs memmove Does memmove http www c
  • github 存储库中的自定义语言

    Git 显示构成存储库的语言的百分比 然而 对于我的一个项目 我想使用我自己的自定义语言 我知道我可以创建一个 gitattributes 文件并放置 py linguist language Python使所有 py 文件被识别为 Pyt
  • 导入错误:没有名为“flask.ext”的模块[重复]

    这个问题在这里已经有答案了 当我像这样导入 Flask 扩展时 它工作正常 from flask module import Module 这样扩展就安装正确了 但每当我尝试像这样导入 Flask 扩展时 from flask ext mo
  • Webpack:未知属性“查询”?

    我正在练习使用 React 构建一个按钮 单击该按钮时计数器会加 1 我现在需要使用 Webpack 打包所有内容 以便可以在浏览器中运行它 我运行以下命令 webpack watch mode development 并得到以下错误 In
  • 全新安装时的 HDFS 空间使用情况

    我刚刚安装了 HDFS 并启动了该服务 并且已使用空间已经超过800MB 它代表什么 hdfs dfs df h Filesystem Size Used Available Use hdfs quickstart cloudera 802
  • Android 数据绑定:“使用‘.’的方法引用已弃用”

    在我的应用程序中使用数据绑定时 我在编译时收到以下警告 Warning Method references using is deprecated Instead of handler onItemClick use handler onI
  • 测试单元Spring boot:无法注册模拟bean

    我有两类测试 1 用于控制器类的单元测试 2 用于服务类的单元测试 如下所示 1 测试类控制器 RunWith SpringRunner class SpringBootTest AutoConfigureMockMvc public cl
  • 如何列出 JAR 的依赖项

    是否有一个工具可以列出包含 JAR 中引用的 第三方 类的第三方 包 假设它会从 JAR 文件定义中识别什么是 主 包 并且会打印出 JAR 中引用的第三方类的完全限定名称列表 最高可达第三级 org apache commons org
  • 使用 Tensorflow 可以实现增量学习吗?

    我正在尝试使用非常大的数据集 比我的记忆大得多 训练 Tensorflow 模型 为了充分利用所有可用的训练数据 我正在考虑将它们分成几个小 碎片 并一次在一个碎片上进行训练 经过一番研究 我发现这种方法通常被称为 增量学习 并基于这个维基
  • 我应该封装我的 IoC 容器吗?

    想要改进这篇文章吗 提供此问题的详细答案 包括引用和解释为什么你的答案是正确的 不够详细的答案可能会被编辑或删除 我正在尝试确定花费额外的精力来封装 IoC 容器是否有意义 经验告诉我 我应该在我的应用程序和任何第三方组件之间放置一层封装
  • 使用KafkaListener时,如何检查主题消息是否已读完?

    使用 KafkaListener时 如何检查主题消息是否已读完 See 这个答案 https stackoverflow com questions 55430893 how to check if kafka is empty using