卡夫卡消费者陷入(重新)加入组

2024-04-07

如果 kafka(版本 0.10)消费者尝试重新加入消费者组,其默认行为是什么。 我正在将单个消费者用于消费者组,但似乎它在重新加入时受到了打击。 每 10 分钟后,它会在消费者日志中打印以下行。

2016-08-11 13:54:53,803 INFO o.a.k.c.c.i.ConsumerCoordinator [pool-5-thread-1] ****撤销先前分配的分区**** [] for group image-consumer-group

2016-08-11 13:54:53,803信息 o.a.k.c.c.i.AbstractCoordinator [pool-5-thread-1](重新)加入组 image-consumer-group

2016-08-11 14:04:53,992 INFO o.a.k.c.c.i.AbstractCoordinator [pool-5-thread-1] 将协调器标记为组 image-consumer-group 的死亡

2016-08-11 14:04:54,095 INFO o.a.k.c.c.i.AbstractCoordinator [pool-5-thread-1] 发现组 image-consumer-group 的协调器。

2016-08-11 14:04:54,096信息 o.a.k.c.c.i.AbstractCoordinator [pool-5-thread-1](重新)加入组 image-consumer-group

重新启动消费者应用程序没有帮助。


如果您打算在一组中只有一个消费者实例,则将消费者与手动assignment战略。 (简单消费者)。

手动主题分配不使用消费者的组管理功能,因此不需要心跳。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

卡夫卡消费者陷入(重新)加入组 的相关文章

  • 如何强制消费者读取kafka中的特定分区

    我有一个应用程序 用于从 1 个 Kafka 生产者生成的 URL 流中下载特定的 Web 内容 我创建了一个有 5 个分区的主题 有 5 个 kafka 消费者 但网页下载的超时时间为 60 秒 当下载其中一个 URL 时 服务器会假设消
  • WARN 获取相关 ID 为 1 的元数据时出错:{MY_TOPIC?=INVALID_TOPIC_EXCEPTION} (org.apache.kafka.clients.NetworkClient)

    当我使用 kafka 运行以下命令时0 9 0 1 我收到这些警告 1 你能告诉我我的主题有什么问题吗 我正在与在 ec2 中运行的 kafka 经纪人交谈 kafka console consumer sh new consumer bo
  • Kafka 连接教程停止工作

    我在此链接中执行了步骤 7 使用 Kafka Connect 导入 导出数据 http kafka apache org documentation html quickstart http kafka apache org documen
  • 批量插入成功后更新 Kafka 提交偏移量

    我有一个 spring kafka 消费者 它读取记录并将其移交给缓存 计划任务会定期清除缓存中的记录 我想仅在批次成功保存到数据库后更新 COMMIT OFFSET 我尝试将确认对象传递给缓存服务以调用确认方法 如下所示 public c
  • 通过 Kafka 消费者重试维持订单保证

    我正在为基于 Kafka 的数据处理管道中的消费者重试设计一个架构 我们正在使用 Kafka 生产者和消费者 并且正在考虑重试主题 如果消费出错 将在这些主题上发送消息 将会有消费者以一定的节奏运行这些重试主题 我读了很多参考架构 但没有一
  • 无法对 @KafkaListener 带注释的方法进行单元测试

    我正在尝试在 Spring 中对 kafka 消费者类进行单元测试 我想知道如果 kafka 消息发送到它的主题 则侦听器方法被正确调用 我的消费者类注释如下 KafkaListener topics kafka topics myTopi
  • kafka启动失败(版本0.8.0 beta1)

    我正在尝试在独立模式 在ec2上 上使用zookeeper版本 3 3 6 启动kafka服务 所以我运行 1 sbt update 2 sbt package 3 sbt assembly package dependency 然后启动z
  • Kafka的消息键有什么特别的地方吗?

    我没有看到任何提及消息键 org apache kafka clients producer ProducerRecord key 除了它们可以用于主题分区 我可以自由地将我喜欢的任何数据放入密钥中 还是有一些我应该遵守的特殊语义 该密钥似
  • 动态创建消费者spring kafka

    我正在创建一个与另一个服务通信的服务 以便识别要收听的 kafka 主题 kafka主题可能有不同的键和值类型 因此 我想为每个配置 主题 键类型 值类型 动态创建不同的 kafka 消费者 其中配置仅在运行时已知 然而在 spring k
  • 无法初始化类 io.confluence.kafka.schemaregistry.client.rest.RestService

    我正在尝试使用 KafkaAvroSerialzer 设置一个卡夫卡生产者以获得价值 当 rit 尝试创建生产者时 我遇到了这个错误 我正在使用 confluence 5 2 1 中提供的所有罐子 java lang NoClassDefF
  • 事务性 Kafka 生产者

    我正在尝试让我的卡夫卡生产者具有事务性 我正在发送 10 条消息 如果发生任何错误 则不应向 kafka 发送任何消息 即不发送或全部消息 我正在使用 Spring Boot KafkaTemplate Configuration Enab
  • Strimzi 运算符 Kafka 集群 ACL 未启用类型:简单

    我们知道要启用Kafka ACL属性authorizer class name kafka security auth SimpleAclAuthorizer要添加到server properties但是如果 Kafka 集群由 Strim
  • 找不到 io.confluence:kafka-protobuf-serializer:6.0.0

    直接的问题是 为什么 Gradle 没有解决我添加的这个依赖关系 dependencies kafka protobuf serializer implementation io confluent kafka protobuf seria
  • 我可以限制kafka-node消费者的消费吗?

    这看起来像我的 kafka 节点消费者 var kafka require kafka node var consumer new Consumer client 在某些情况下 获取的消息数量超出了我的处理能力 有没有办法限制它 例如每秒接
  • Kafka 主题删除不起作用

    我使用的是 Kafka 0 8 2 版本 在开发过程中 我想我可能需要删除一个主题 所以我所做的是将以下行放入服务器配置文件中并启动两个 kafka 服务器 delete topic enable true 当我需要删除一个主题并运行以下命
  • Kafka Streams 内部数据管理

    在我的公司 我们广泛使用 Kafka 但出于容错的原因 我们一直使用关系数据库来存储多个中间转换和聚合的结果 现在我们正在探索 Kafka Streams 作为一种更自然的方式来做到这一点 通常 我们的需求非常简单 其中一个例子是 监听输入
  • Kafka Streams 如何处理包含不完整数据的分区?

    Kafka Streams 引擎将一个分区映射到一个工作线程 即 Java 应用程序 以便该分区中的所有消息都由该工作线程处理 我有以下场景 并试图了解它是否仍然可行 我有一个主题 A 有 3 个分区 发送给它的消息由 Kafka 随机分区
  • 带有安全 Kafka 抛出的 Spark 结构化流:无权访问组异常

    为了在我的项目中使用结构化流 我正在 hortonworks 2 6 3 环境上测试 Spark 2 2 0 和 Kafka 0 10 1 与 Kerberos 的集成 我正在运行下面的示例代码来检查集成 我能够在 Spark 本地模式下的
  • 断言 Kafka 发送有效

    我正在使用 Spring Boot 编写一个应用程序 因此要写信给 Kafka 我这样做 Autowired private KafkaTemplate
  • Apache kafka - 消费者延迟选项

    我想在 Kafka 中为特定主题稍稍延迟启动一个消费者 具体来说 我希望消费者在从生成消息的时间起经过特定的时间延迟后开始使用该主题的消息 Kafka 中有任何属性或选项可以启用它吗 我们对火花流做了同样的事情 我希望 这种方法也适合您 这

随机推荐