我目前正在 Dataproc 上运行 Spark 作业,在尝试重新加入组并从 kafka 主题读取数据时遇到错误。我做了一些挖掘,但不确定问题是什么。我有auto.offset.reset
set to earliest
所以它应该从最早可用的非提交偏移量中读取,最初我的火花日志如下所示:
19/04/29 16:30:30 INFO
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
clientId=consumer-1, groupId=demo-group] Resetting offset for
partition demo.topic-11 to offset 5553330.
19/04/29 16:30:30 INFO
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
clientId=consumer-1, groupId=demo-group] Resetting offset for
partition demo.topic-2 to offset 5555553.
19/04/29 16:30:30 INFO
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
clientId=consumer-1, groupId=demo-group] Resetting offset for
partition demo.topic-3 to offset 5555484.
19/04/29 16:30:30 INFO
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
clientId=consumer-1, groupId=demo-group] Resetting offset for
partition demo.topic-4 to offset 5555586.
19/04/29 16:30:30 INFO
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
clientId=consumer-1, groupId=demo-group] Resetting offset for
partition demo.topic-5 to offset 5555502.
19/04/29 16:30:30 INFO
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
clientId=consumer-1, groupId=demo-group] Resetting offset for
partition demo.topic-6 to offset 5555561.
19/04/29 16:30:30 INFO
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
clientId=consumer-1, groupId=demo-group] Resetting offset for
partition demo.topic-7 to offset 5555542.```
但是接下来的下一行我尝试从服务器上不存在的偏移量读取时遇到错误(您可以看到分区的偏移量与上面列出的偏移量不同,所以我不知道为什么它会尝试读取表单该偏移量,这是下一行的错误:
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets
out of range with no configured reset policy for partitions:
{demo.topic-11=4544296}
有什么想法可以解释为什么我的 Spark 工作不断回到这个偏移量(4544296),而不是它最初输出的偏移量(5553330)?
这似乎是自相矛盾的 w a) 它所说的实际偏移量和它尝试读取的偏移量 b) 说没有配置重置策略
这个答案迟了一年,但希望能帮助其他面临类似问题的人。
通常,当消费者尝试读取 Kafka 主题中不再存在的偏移量时,就会出现此行为。偏移量不再存在,通常是因为它已被 Kafka Cleaner 删除(例如由于保留或压缩策略)。然而,消费者组仍然是 Kafka 已知的,并且 Kafka 保留了主题“demo.topic”及其所有分区的组“demo-group”的最新消费消息的信息。
因此,auto.offset.reset
配置不会有任何影响,因为不需要重置。相反,卡夫卡了解消费者组。
除此之外Fetcher
只告诉您主题的每个分区内最新的可用偏移量。确实如此not自动意味着它实际上轮询直到此偏移量的所有消息。 Spark 决定每个分区实际消耗和处理多少消息(基于例如配置maxRatePerPartition
).
要解决此问题,您可以更改消费者组(在这种特殊情况下这可能不是您想要的),或者通过使用手动重置消费者组“演示组”的偏移量
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group demo-group --topic demo.topic --partition 11 --to-latest
根据您的要求,您可以使用该工具重置主题每个分区的偏移量。帮助功能或文档解释了所有可用选项。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)