Kafka重启时如何让Kafka Source重新连接

2024-02-06

我创建一个Source使用 Reactive Kafka 的消费者记录如下:

val settings = ConsumerSettings(system, keyDeserializer, valueDeserializer)
.withBootstrapServers(bootstrapServers)
.withGroupId(groupName)
// what offset to begin with if there's no offset for this group
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
// do we want to automatically commit offsets?
.withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
// auto-commit offsets every 1 minute, in the background
.withProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
// reconnect every 1 second, when disconnected
.withProperty(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, "1000")
// every session lasts 30 seconds
.withProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
// send heartbeat every 10 seconds i.e. 1/3 * session.timeout.ms
.withProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000")
// how many records to fetch in each poll( )
.withProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100")

Consumer.atMostOnceSource(settings, Subscriptions.topics(topic)).map(_.value)  

我的本地计算机上运行着 1 个 Kafka 实例。我通过控制台生成器将值推送到主题中并看到它们打印出来。然后我杀死 Kafka,并重新启动它以查看源是否重新连接。

以下是我的日志的处理方式:

* Connection with /192.168.0.1 disconnected
    java.net.ConnectException: Connection refused
* Give up sending metadata request since no node is available
* Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds
* Resuming partition test-events-0
* Error while fetching metadata with correlation id 139 : {test-events=INVALID_REPLICATION_FACTOR}
* Sending metadata request (type=MetadataRequest, topics=test-events) to node 0
* Sending GroupCoordinator request for group mytestgroup to broker 192.168.0.1:9092 (id: 0 rack: null)
* Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds
* Received GroupCoordinator response ClientResponse(receivedTimeMs=1491797713078, latencyMs=70, disconnected=false, requestHeader={api_key=10,api_version=0,correlation_id=166,client_id=consumer-1}, responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}) for group mytestgroup
* Error while fetching metadata with correlation id 169 : {test-events=INVALID_REPLICATION_FACTOR}
* Received GroupCoordinator response ClientResponse(receivedTimeMs=1491797716169, latencyMs=72, disconnected=false, requestHeader={api_key=10,api_version=0,correlation_id=196,client_id=consumer-1}, responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}) for group mytestgroup
09:45:16.169 [testsystem-akka.kafka.default-dispatcher-16] DEBUG o.a.k.c.c.i.AbstractCoordinator - Group coordinator lookup for group mytestgroup failed: The group coordinator is not available.
09:45:16.169 [testsystem-akka.kafka.default-dispatcher-16] DEBUG o.a.k.c.c.i.AbstractCoordinator - Coordinator discovery failed for group mytestgroup, refreshing metadata
* Initiating API versions fetch from node 2147483647
* Offset commit for group mytestgroup failed: This is not the correct coordinator for this group.
* Marking the coordinator 192.168.43.25:9092 (id: 2147483647 rack: null) dead for group mytestgroup
* The Kafka consumer has closed.  

如何确保此源重新连接并继续处理日志?


我认为你至少需要有2个经纪人。如果其中一台失败,另一台可以完成这项工作,您可以重新启动另一台。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka重启时如何让Kafka Source重新连接 的相关文章

  • 在 String 值之后打印 int 值

    我有以下示例代码 int pay 80 int bonus 65 System out println pay bonus bonus pay 有人可以向我解释一下为什么我得到以下输出 145 6580 您的代码正在从左到右解释表达式 pa
  • 是否可以使用 Java 读写 Parquet,而不依赖 Hadoop 和 HDFS?

    我一直在寻找这个问题的解决方案 在我看来 如果不引入对 HDFS 和 Hadoop 的依赖 就无法在 Java 程序中嵌入读写 Parquet 格式 它是否正确 我想在 Hadoop 集群之外的客户端计算机上进行读写 我开始对 Apache
  • 获取Android库中的上下文

    我正在编写一个 Android 应用程序 它的一些功能封装在内部库中 但是 要使此功能发挥作用 库需要一个应用程序上下文的实例 为图书馆提供这种上下文的最佳方式是什么 我看到了一些选择 但没有一个有吸引力 Have my library c
  • Selenium 和 TestNG 同时使用“dependsOn”和“priority =”问题

    我正在努力在 GUI 自动化测试中实现更好的工作流程控制 我首先从dependsOn开始 但很快发现缺点是如果一个测试失败 则套件的整个其余部分都不会运行 所以我改用 priority 但看到了意外的行为 一个例子 Test priorit
  • Scala REPL / SBT Console 是否有配置文件?

    我一直在尝试找到某种点文件来放入 Scala REPL 设置和自定义函数 我特别有兴趣传递它的标志 例如 Dscala color 启用语法突出显示 以及覆盖设置 如结果字符串截断 scala gt power scala gt vals
  • 使用 kryo 注册课程的策略

    我最近发现了 kryonet 库 它非常棒并且非常适合我的需求 然而 我遇到的一个问题是制定一种好的策略来注册所有可以转移的类 我知道我可以在每个对象中编写一个静态方法 该方法将返回它使用的所有类的列表 但我真的不想这样做 为了我自己的时间
  • 在拇指上方显示修改后的 JSlider 值

    有没有一种简单的方法可以在使用某些 外观和感觉 的同时更改 JSlider 上方标签中显示的值 为了清楚起见 我正在谈论这个值 具体来说 我想显示除以 1000 的值而不是值本身 我知道如果我显示它们 我可以为刻度设置标签 但用户将不得不猜
  • 如何让“循环”泛型在 Java 中工作?

    我在编译以下涉及一些泛型的代码时遇到错误 public abstract class State
  • 避免 @Secured 注释的重复值

    我正在尝试使用以下方法来保护我的服务方法 Secured如下 public interface IUserService Secured ROLE ROLE1 ROLE ROLE2 ResponseEntity saveUser Creat
  • 错误膨胀类 android.support.design.widget.NavigationView [启动时崩溃]

    该应用程序应该有一个导航抽屉 可以从左侧拉出并显示各种活动 但是一旦将导航栏添加到 XML Activity homescreen 文档中 应用程序一启动就会崩溃 主屏幕 java package com t99sdevelopment c
  • 使用 Guava Ordering 对对象列表进行多条件排序

    我有一个类无法实现可比较 但需要根据 2 个字段进行排序 我怎样才能用番石榴实现这一目标 假设班级是 class X String stringValue java util Date dateValue 我有一个清单 List
  • 接口是否像对象一样对待?

    为什么下面的代码可以工作 interface I class A implements I public String toString return in a class B extends A public String toStrin
  • 如何在 spring-data 中强制使用 CrudRepository 进行预加载?

    我有一个实体 其中包含List就是这样lazy默认加载 interface MyEntityRepository extends CrudRepository
  • 如何使用eclipse调试JSP tomcat服务?

    我想使用 Eclipse IDE 调试器来调试单独运行的 JSP Struts Tomcat Hibernate 应用程序堆栈 如何设置 java JVM 和 eclipse 以便设置断点 监视变量值并查看当前正在执行的代码 我刚刚用谷歌搜
  • 在 Spark 中将多行汇总为单行和单列

    我有一个如下的火花 DF 我需要汇总具有与单行相同 ID 的多行 但值应该不同 id values 1 hello 1 hello Sam 1 hello Tom 2 hello 2 hello Tom 预期输出 id values 1 h
  • 在java中执行匿名pl/sql块并获取结果集

    我想执行匿名 PL SQL 并需要获取结果集对象 我得到了可以通过在 PL SQL 块内使用游标来完成的代码 但 PL SQL 块本身将以文本形式来自数据库 所以我无法编辑该 PL SQL 块 并且它只会返回两个值 其列名始终相同 它将返回
  • akka-http:找不到参数解组的隐式值

    我的 Spray json 支持看起来像这样 object MarshallingSupport extends SprayJsonSupport implicit def json4sFormats Formats DefaultForm
  • Java时区混乱

    我正在运行 Tomcat 应用程序 并且需要显示一些时间值 不幸的是 时间快到了 还有一个小时的休息时间 我调查了一下 发现我的默认时区被设置为 sun util calendar ZoneInfo id GMT 08 00 offset
  • 我怎样才能限定我不“拥有”的自动装配设置器

    要点是 Spring Batch v2 测试框架具有JobLauncherTestUtils setJob与 Autowired注解 我们的测试套件有多个Job类提供者 由于这个类不是我可以修改的东西 我不确定如何限定它自动连接的作业 每个
  • Android ClassNotFoundException:在路径上找不到类

    10 22 15 29 40 897 E AndroidRuntime 2561 FATAL EXCEPTION main 10 22 15 29 40 897 E AndroidRuntime 2561 java lang Runtime

随机推荐

  • OOD / OOP 练习曲 / 代码练习

    我已经在网上搜索了一段时间了 我正在寻找用于 OOD 实践 以及一些内部 TDD 研讨会 的小样本练习 如果有一个地方可以满足这一需求 请指出它 并关闭此问题 限制条件 与语言无关的现实世界问题 小 最多需要一到两个小时才能解决的问题 或者
  • 无法在Java / C++中为外部应用程序设置always-on-top

    我正在寻找解决方案 使外部应用程序 不是像记事本或 calc exe 这样的 Windows 应用程序 在按下 Java GUI 中的按钮后始终保持在最上面 我在 C 中使用此代码来获取桌面上所有打开的窗口 并将其进程 ID PID 与发送
  • 可更新视图 - SQL Server 2008

    关于可更新数据库视图的问题 我正在阅读有关该主题的一些 MSDN 文档 并且遇到以下限制 任何修改 包括 UPDATE INSERT 和 DELETE 语句 都必须仅引用一个基表中的列 我只是想确保我理解该限制 我想在我的几个媒体评论项目中
  • 多边形中的点

    我正在尝试解决一些 SPOJ 问题https www spoj pl problems FSHEEP https www spoj pl problems FSHEEP 我们必须找出点是否在多边形内部 正如我们所看到的 它不是凸多边形 问题
  • 如何更改pairs()的轴位置?

    默认情况下 pairs 将轴放在图的所有边上 在边之间交替 但是 我将数据集之间的相关性放在上三角形中 所以我想像这样调整轴位置 我需要设置哪些参数 您可以自定义配对功能 如果你看一下代码 就会发现轴是在 2 个嵌套的 for 循环内绘制的
  • 为什么 RGB 使用 6 个十六进制数字?

    据我所知 RGB 用两个十六进制数字编码颜色 对应于红色 绿色和蓝色分量 例如 ff0000 是纯红色 据我了解 每个十六进制数字代表 0 15 之间的数字 或 4 位信息 但是如何用 32 位来表示每种颜色呢 为什么使用两位数字表示红色
  • 如何在 Mercurial 上 git reset --hard HEAD?

    我是一名 Git 用户 正在尝试使用 Mercurial 事情是这样的 我做了一个hg backout在我想恢复的变更集上 这创建了一个新头 因此 hg 指示我合并 我认为回到 默认 合并后 它告诉我我仍然必须提交 然后我注意到在解决合并中
  • Bash 脚本 - 变量内容作为命令运行

    我有一个 Perl 脚本 它给我一个定义的随机数列表 这些随机数对应于文件的行 接下来我想使用从文件中提取这些行sed bin bash count cat last queries txt wc l var perl test pl te
  • 如何通过拖动顶部的 div 来调整其大小?

    我想在拖动两个 div 之间的部分时调整 div 的大小 在搜索中我发现this http jsfiddle net gaby Bek9L 1779 但我不知道如何使这个水平而不是可用的垂直拖动 我的 div 看起来像 div div di
  • Install4j:有没有办法用包含占位符的文本覆盖欢迎消息?

    我需要覆盖install4j欢迎消息 其中包含我需要在运行时解析的占位符文本 将从属性文件中读取替换值 welcomeLabel3 Text 0 another text 1 无法向系统消息添加占位符 您必须指定整个消息 但是 您可以使用安
  • 如何从 javascript 文件(而不是 vue 组件)获取 vuex 状态

    我正在使用 vuex 2 1 1 并让事情在 vue 单文件组件中工作 然而 为了避免 vue 单文件组件中出现太多问题 我将一些函数移至utils js我将其导入到 vue 文件中的模块 在这个utils js我想阅读 vuex 状态 我
  • 初学者:AVR C++ Atmel Studio 6

    我在确定我可以访问哪些库时遇到问题 我知道我可以使用 Atmel Studio 6 IDE 用 C 对微控制器 Atmega328p 进行编程 但是 我无法弄清楚我可以访问哪些库的记录在哪里 例如 我可以使用 STL 例如向量 双端队列 吗
  • Google Maps API V3 -> 利用 MarkerCluster 但簇本身是否特定于绘制的多边形/区域?

    好吧 让我以我已经创建了很多谷歌地图的事实作为这个问题的序言 但它们是严格的标记和表示路线的折线以及一些处理程序交互 现在我希望基本上显示一张世界地图 主要是北美 我想用我拥有的一些纬度 经度将这片大陆分成我预定义的区域 使用这些区域 我想
  • CSS 面包屑箭头指向左侧

    我发现这个 css 面包屑指向右侧 我想指向左侧 相信我 我一遍又一遍地尝试 但没有成功 请有人告诉我该怎么做 div span display inline block position relative background 88b7d
  • 改造 - 更改 BaseUrl

    我有一个场景 我必须使用相同的基本 URL 调用 API 例如www myAPI com但以不同的baseUrl 我有一个 Retrofit 2 的实例 它是通过Builder return new Retrofit Builder bas
  • 将 UL 在 DIV 内垂直居中

    我有以下内容 div style background Red height 100px ul li a href Home a li ul div 我想将 ul 垂直居中在 div 中 但我不知道如何 小提琴演示 http jsfiddl
  • 如何在C++03中用sprintf正确替换sprintf_s?

    sprintf s是该函数的 Microsoft 实现sprintf他们修补了一个缺陷 添加了一个参数来获取函数限制写入的边界值 等效的引入C 11 snprintf 但在这里 我们谈论的是C 03 syntax 签名 count char
  • 为什么 Unity 会忽略非静态公共字段的初始化值?

    我在用着InvokeRepeating http docs unity3d com ScriptReference MonoBehaviour InvokeRepeating html调用游戏中的方法 我打电话InvokeRepeating
  • 在Featuretools中计算多个训练窗口的特征

    我有一张包含客户和交易的表 有没有办法获取过去 3 6 9 12 个月过滤的功能 我想自动生成功能 过去 3 个月的跨性别者数量 过去 12 个月内跨性别者数量 过去 3 个月的平均跨性别者 过去 12 个月的平均跨性别者 我尝试过使用tr
  • Kafka重启时如何让Kafka Source重新连接

    我创建一个Source使用 Reactive Kafka 的消费者记录如下 val settings ConsumerSettings system keyDeserializer valueDeserializer withBootstr