我想从生产者向 Kafka 发送一条大消息,因此我更改了以下属性。
代理(服务器.属性)
replica.fetch.max.bytes=317344026
message.max.bytes=317344026
max.message.bytes=317344026
max.request.size=317344026
生产者(生产者.属性)
max.request.size=3173440261
消费者(消费者.属性)
max.partition.fetch.bytes=327344026
fetch.message.max.bytes=317344026
当我使用 python Popen 和 kafta 的 cli 命令运行生成器时,仍然出现一些错误,如下所示。
Code:
def producer(topic_name, content):
p = subprocess.Popen(['/opt/kafka/kafka_2.11-0.9.0.0/bin/kafka-console-producer.sh', '--broker-list', 'localhost:9092', '--topic', 'Hello-Kafka'], stdout=subprocess.PIPE, stdin=subprocess.PIPE)
p.stdin.write(content)
out, err = p.communicate()
print out
Error:
ERROR Error when sending message to topic Hello-Kafka with key: null, value: 1677562 bytes with error: The message is 1677588 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration. (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
当我使用 kafka 的 python 模块时,出现以下错误(https://github.com/dpkp/kafka-python https://github.com/dpkp/kafka-python)
Code:
def producer(topic_name, content):
p = KafkaProducer(bootstrap_servers='localhost:9092')
a = p.send(topic_name, content).get()
print a
p.flush()
p.close()
Error:
kafka.errors.MessageSizeTooLargeError: [Error 10] MessageSizeTooLargeError: The message is 217344026 bytes when serialized which is larger than the maximum request size you have configured with the max_request_size configuration
我成功尝试过的一件事是将内容分成块,但如果有人有任何解决方案可以在不分割内容的情况下做到这一点。