我正在使用 python 3.9.16 和 kafka-python 版本 2.0.2。我正在我的 Macbook Pro IOS 11.6.5 上运行。
Kafka 新手,现在只是在玩它。我不确定问题是什么,也不确定为什么我的解决方法有效。
我想做的是寻求该主题的特定偏移量,但我经常遇到 ValueError。
这是我的代码。
from kafka import KafkaConsumer, TopicPartition
consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'])
#import pdb
#pdb.set_trace()
myTP = TopicPartition('my-topic', 0)
consumer.assign([myTP])
print ("this is the consumer assignment: {}".format(consumer.assignment()))
#print ("not sure why this will work but printing position: {} ".format(consumer.position(myTP)))
consumer.seek(myTP, 22)
#print ("not sure why this will work but printing position: {} ".format(consumer.position(myTP)))
for blah in consumer:
print ("{}, {}".format(blah.offset, blah.value))
所以大多数时候当我运行它时,我会得到这个 ValueError。有时,如果没有我的解决方法,它会神秘地工作,但我不知道为什么。
this is the consumer assignment: {TopicPartition(topic='my-topic', partition=0)}
Traceback (most recent call last):
File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/protocol/types.py", line 20, in _unpack
(value,) = f(data)
struct.error: unpack requires a buffer of 4 bytes
...
...
...
ValueError: Error encountered when attempting to convert value: b'' to struct format: '<built-in method unpack of _struct.Struct object at 0x10539a930>', hit error: unpack requires a buffer of 4 bytes
我发现的解决方法是,如果我在搜索命令之前和之后打印位置,它似乎一直有效,但我不知道为什么。谁可以给我解释一下这个?我是否需要建立一些短暂的延迟才能完成这项工作?打印我在 Consumer 中的位置是否会重置 Consumer 中的某些内容以使其正常工作?
$ python tkCons.py
this is the consumer assignment: {TopicPartition(topic='my-topic', partition=0)}
not sure why this will work but printing position: 34
not sure why this will work but printing position: 22
22, b'{"number": 8}'
23, b'{"number": 9}'
24, b'{"number": 0}'
25, b'{"number": 1}'
26, b'{"number": 2}'
27, b'{"number": 3}'
28, b'{"number": 4}'
29, b'{"number": 5}'
30, b'{"number": 6}'
31, b'{"number": 7}'
32, b'{"number": 8}'
33, b'{"number": 9}'
编辑:
完整的回溯在这里:
$ python tkCons.py
this is the consumer assignment: {TopicPartition(topic='my-topic', partition=0)}
Traceback (most recent call last):
File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/protocol/types.py", line 20, in _unpack
(value,) = f(data)
struct.error: unpack requires a buffer of 4 bytes
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/my_secret_username/kafka/tkCons.py", line 34, in <module>
for blah in consumer:
File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/consumer/group.py", line 1193, in __next__
return self.next_v2()
File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/consumer/group.py", line 1201, in next_v2
return next(self._iterator)
File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/consumer/group.py", line 1116, in _message_generator_v2
record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False)
File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/consumer/group.py", line 655, in poll
records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/consumer/group.py", line 702, in _poll_once
self._client.poll(timeout_ms=timeout_ms)
File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/client_async.py", line 602, in poll
self._poll(timeout / 1000)
File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/client_async.py", line 687, in _poll
self._pending_completion.extend(conn.recv())
File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/conn.py", line 1053, in recv
responses = self._recv()
File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/conn.py", line 1127, in _recv
return self._protocol.receive_bytes(recvd_data)
File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/protocol/parser.py", line 132, in receive_bytes
resp = self._process_response(self._rbuffer)
File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/protocol/parser.py", line 138, in _process_response
recv_correlation_id = Int32.decode(read_buffer)
File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/protocol/types.py", line 64, in decode
return _unpack(cls._unpack, data.read(4))
File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/protocol/types.py", line 23, in _unpack
raise ValueError("Error encountered when attempting to convert value: "
ValueError: Error encountered when attempting to convert value: b'' to struct format: '<built-in method unpack of _struct.Struct object at 0x10539a930>', hit error: unpack requires a buffer of 4 bytes