kafka消费者寻求不起作用:AssertionError:未分配的分区

2024-03-17

卡夫卡消费者con当我尝试从我的主题接收消息时,下面定义的效果非常好;但是,当我尝试使用更改偏移量时,这给我带来了麻烦seek方法或其任何变体。 IE。seek_to_beginning, seek_to_end

from kafka import KafkaConsumer, TopicPartition

con = KafkaConsumer(my_topic, bootstrap_servers = my_bootstrapservers, group_id = my_groupid)
p = con.partitions_for_topic(my_topic)
my_partition = p.pop()
tp = TopicPartition(topic = my_topic, partition = my_partition)
print ('*** tp: ', tp)
con.seek_to_beginning(tp)

它生成以下输出和以下错误:

*** tp:  TopicPartition(topic='mytopic', partition=0)
File "/myhome/anaconda3/lib/python3.6/site-packages/kafka/consumer/group.py", line 735, in seek_to_beginning
assert p in self._subscription.assigned_partitions(), 'Unassigned partition'
AssertionError: Unassigned partition

“未分配的分区”错误对我来说看起来无效,因为我只是从消费者本身获取分区和 TopicPartition 并将其传递回搜索方法,因此它肯定已分配。 有任何想法吗?


我能够找到一种通过更改分区分配给消费者的方式来查找主题分区的方法。问题似乎是消费者无法使用默认订阅的主题分区,因此我必须更改构造函数以防止默认订阅主题,然后显式分配一个分区给我的消费者.

from kafka import KafkaConsumer, TopicPartition

con = KafkaConsumer(bootstrap_servers = my_bootstrapservers)
tp = TopicPartition(my_topic, 0)
con.assign([tp])
con.seek_to_beginning()
con.seek(tp, 1000000)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

kafka消费者寻求不起作用:AssertionError:未分配的分区 的相关文章

  • WooCommerce 在可变价格后显示变量描述

    我正在尝试将变量描述显示到 woocommerce 产品页面中 我安装了一个名为 woocommerce 单选按钮的插件 用于将我的可变产品和价格显示为单选按钮而不是选择 我正在这个插件中编辑variable php文件 然后完成后我会将其

随机推荐