如何移动到 Kafka 消费者中的特定偏移量而不遇到 ValueError?

2023-12-12

我正在使用 python 3.9.16 和 kafka-python 版本 2.0.2。我正在我的 Macbook Pro IOS 11.6.5 上运行。

Kafka 新手,现在只是在玩它。我不确定问题是什么,也不确定为什么我的解决方法有效。

我想做的是寻求该主题的特定偏移量,但我经常遇到 ValueError。

这是我的代码。

from kafka import KafkaConsumer, TopicPartition

consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'])

#import pdb
#pdb.set_trace()

myTP = TopicPartition('my-topic', 0)

consumer.assign([myTP])
print ("this is the consumer assignment: {}".format(consumer.assignment()))
#print ("not sure why this will work but printing position: {} ".format(consumer.position(myTP)))
consumer.seek(myTP, 22) 
#print ("not sure why this will work but printing position: {} ".format(consumer.position(myTP)))

for blah in consumer:
    print ("{}, {}".format(blah.offset, blah.value))

所以大多数时候当我运行它时,我会得到这个 ValueError。有时,如果没有我的解决方法,它会神秘地工作,但我不知道为什么。

this is the consumer assignment: {TopicPartition(topic='my-topic', partition=0)}
Traceback (most recent call last):
  File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/protocol/types.py", line 20, in _unpack
    (value,) = f(data)
struct.error: unpack requires a buffer of 4 bytes
...
...
...
ValueError: Error encountered when attempting to convert value: b'' to struct format: '<built-in method unpack of _struct.Struct object at 0x10539a930>', hit error: unpack requires a buffer of 4 bytes

我发现的解决方法是,如果我在搜索命令之前和之后打印位置,它似乎一直有效,但我不知道为什么。谁可以给我解释一下这个?我是否需要建立一些短暂的延迟才能完成这项工作?打印我在 Consumer 中的位置是否会重置 Consumer 中的某些内容以使其正常工作?

$ python tkCons.py 
this is the consumer assignment: {TopicPartition(topic='my-topic', partition=0)}
not sure why this will work but printing position: 34 
not sure why this will work but printing position: 22 
22, b'{"number": 8}'
23, b'{"number": 9}'
24, b'{"number": 0}'
25, b'{"number": 1}'
26, b'{"number": 2}'
27, b'{"number": 3}'
28, b'{"number": 4}'
29, b'{"number": 5}'
30, b'{"number": 6}'
31, b'{"number": 7}'
32, b'{"number": 8}'
33, b'{"number": 9}'

编辑: 完整的回溯在这里:

$ python tkCons.py
this is the consumer assignment: {TopicPartition(topic='my-topic', partition=0)}
Traceback (most recent call last):
  File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/protocol/types.py", line 20, in _unpack
    (value,) = f(data)
struct.error: unpack requires a buffer of 4 bytes

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/my_secret_username/kafka/tkCons.py", line 34, in <module>
    for blah in consumer:
  File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/consumer/group.py", line 1193, in __next__
    return self.next_v2()
  File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/consumer/group.py", line 1201, in next_v2
    return next(self._iterator)
  File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/consumer/group.py", line 1116, in _message_generator_v2
    record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False)
  File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/consumer/group.py", line 655, in poll
    records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
  File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/consumer/group.py", line 702, in _poll_once
    self._client.poll(timeout_ms=timeout_ms)
  File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/client_async.py", line 602, in poll
    self._poll(timeout / 1000)
  File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/client_async.py", line 687, in _poll
    self._pending_completion.extend(conn.recv())
  File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/conn.py", line 1053, in recv
    responses = self._recv()
  File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/conn.py", line 1127, in _recv
    return self._protocol.receive_bytes(recvd_data)
  File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/protocol/parser.py", line 132, in receive_bytes
    resp = self._process_response(self._rbuffer)
  File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/protocol/parser.py", line 138, in _process_response
    recv_correlation_id = Int32.decode(read_buffer)
  File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/protocol/types.py", line 64, in decode
    return _unpack(cls._unpack, data.read(4))
  File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/protocol/types.py", line 23, in _unpack
    raise ValueError("Error encountered when attempting to convert value: "
ValueError: Error encountered when attempting to convert value: b'' to struct format: '<built-in method unpack of _struct.Struct object at 0x10539a930>', hit error: unpack requires a buffer of 4 bytes

我建议升级到最新的 Python 版本,然后重试。

该错误来自内部字节解包函数。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何移动到 Kafka 消费者中的特定偏移量而不遇到 ValueError? 的相关文章

随机推荐

  • 在 JSF 中通过 ID 查找组件

    我想找一些UIComponent通过我提供的 id 从托管 bean 中获取 我编写了以下代码 private UIComponent getUIComponent String id return FacesContext getCurr
  • 模拟用户输入()

    我正在尝试模拟我要使用的 python 脚本的用户输入py test 这是一些基本代码 代表了我想要完成的任务 def ask while True age input Enter your age if int age lt 13 pri
  • IE9 不应用链接样式表

    我有一个奇怪的问题 我开发了一个在大多数浏览器中都能正常运行的网站 我刚刚升级到 IE9 我的任何样式都没有被应用 我正在本地电脑上查看此内容 因为该网站尚未上线 所有内联或页面级样式都会正确应用 但从 HEAD 部分中的外部 css 文件
  • 嵌套转发器中的按钮事件

    我正在使用嵌套转发器 子转发器中有一个用户控件
  • 在焦点上显示下拉选项[重复]

    这个问题在这里已经有答案了 我有一个简单的 html 下拉列表 如果可能的话 我想使用 jQuery 来显示所有选项 如果该控件具有焦点 就像用户单击下拉列表一样 我尝试过 jQuery trigger click 都有效
  • 在 SqlAlchemy SQLite 中按联合查询中的列排序

    正如中所解释的这个问题 您可以使用字符串文字来执行order by in unions 例如 这适用于 Oracle querypart1 select t1 c col1 label a order by t1 c col1 limit
  • Windows Phone 8.1 上的资源限定符

    Windows Phone 8 1 应用程序是否支持资源限定符 例如秤 这些限定符与 Windows 应用商店应用程序中的限定符有何不同 是 Windows Phone 8 1 RT 支持图像缩放 具体方法如下
  • Presto:前一个月的最后一天

    我有一个日期列表 我想返回前一个月的最后日期 如下例所示 date lastdayofmonthprior 2018 04 03 2018 03 31 我努力了date trunc month date 2018 04 03 1但是 我收到
  • 启用身份验证后无法连接到 MongoDB

    我正在运行适用于 Ubuntu 14 04 的最新 MongoDB 我创建了一个名为 admin 且具有 userAdminAnyDatabase 角色的用户 我可以在本地和外部访问数据库 我可以使用 SHA SCRAM 1 使用 admi
  • C:从文本文件读入结构数组

    我正在制作的程序应该从文本文件中读取数字并保存数字总数 即结构中数字的平均值 我有一个如下所示的结构 struct seriepost int totnr int outnr float average 函数 未完成 如下所示 int re
  • MapView注释不拖动

    我正在尝试在地图视图中实现可拖动的 图钉 实际上是自定义图标 这是我拥有的委托代码 MKAnnotationView mapView MKMapView mapView viewForAnnotation id
  • 如何修复代码警报“通用对象注入接收器”

    下面是我的代码 我认为没有任何问题 我怎样才能愚弄codacy 如果我不能使用obj key 那这东西到底是什么 我没有办法避免 handleClick e titleProps gt const index titleProps cons
  • 使用 Object.assign 避免 Typescript 构造函数中的冗余

    我有一个 Typescript 应用程序 其中有很多这样的代码 class Model prop1 string prop2 string prop3 string constructor input ModelInput this pro
  • EL上下文路径评估outputLink和graphicImage之间的差异

    我正在使用以下内容在我们的应用程序中获取帮助文档 我的问题是 虽然
  • 读取多模块项目中的属性文件

    您好 我有一个项目 它有两个具有以下结构的模块 project Module1 abc jsp Module2 src main java com xyz comp Action java resources
  • Kivy - 向选项卡按钮添加图标

    我在 Kivy 中使用 TabbedPanel 它工作得很好 但我想稍微自定义一下选项卡按钮并在文本旁边添加一个图标 Right now I have something like this And I would like somethi
  • 如何在窗外画画?

    看着一个窗户tooltips类提示窗口 我看到它绘制了阴影outside提示窗口的实际矩形 Using SpyXX 我可以获得工具提示的窗口矩形和类样式 Rectangle 440 229 544 249 104x20 Restored R
  • url 域的正则表达式

    我如何从这些网址进行匹配 http foo bar com http bar com foo bar从第一个链接和bar从第二个链接使用一个正则表达式 尝试这个 http com 以下是如何从 Python 使用它 import re fo
  • rspec 错误“report_activate_error”:找不到 RubyGem rspec-core (>=0) (Gem:LoadError)

    我有一个看似简单的错误 但我似乎不知道如何解决它 我尝试过卸载并重新安装 但这并没有帮助我 我是终端和 ruby ruby on Rails 的新手 所以请原谅我可能愚蠢的问题 有谁知道如何解决这个问题 这是我尝试运行 rspec 时收到的
  • 如何移动到 Kafka 消费者中的特定偏移量而不遇到 ValueError?

    我正在使用 python 3 9 16 和 kafka python 版本 2 0 2 我正在我的 Macbook Pro IOS 11 6 5 上运行 Kafka 新手 现在只是在玩它 我不确定问题是什么 也不确定为什么我的解决方法有效