我正在尝试设置 Spark Streaming 以从 Kafka 队列获取消息。我收到以下错误:
py4j.protocol.Py4JJavaError: An error occurred while calling o30.createDirectStream.
: org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
org.apache.spark.SparkException: Couldn't find leader offsets for Set([test-topic,0])
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:97)
这是我正在执行的代码(pyspark):
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
directKafkaStream = KafkaUtils.createDirectStream(ssc, ["test-topic"], {"metadata.broker.list": "host.domain:9092"})
ssc.start()
ssc.awaitTermination()
有几个类似的帖子也有同样的错误。在所有情况下,原因都是空的卡夫卡主题。我的“测试主题”中有消息。我可以把他们拿出来
kafka-console-consumer --zookeeper host.domain:2181 --topic test-topic --from-beginning --max-messages 100
有谁知道可能是什么问题?
我在用着:
- Spark 1.5.2(阿帕奇)
- 卡夫卡0.8.2.0+kafka1.3.0(CDH 5.4.7)
您需要检查两件事:
检查该主题和分区是否存在,在您的情况下主题是test-topic
并且分区为0。
根据您的代码,您正在尝试从偏移量 0 消费消息,并且可能无法从偏移量 0 获取消息,请检查您最早的偏移量并尝试从那里消费。
以下是检查最早偏移量的命令:
sh kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "your broker list" --topic "topic name" --time -1
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)