我有一个 Kafka 2.1 消息代理,想要对 Spark 2.4 中的消息数据进行一些处理。我想使用 Zeppelin 0.8.1 笔记本进行快速原型设计。
我下载了结构化流所需的spark-streaming-kafka-0-10_2.11.jar(http://spark.apache.org/docs/latest/structed-streaming-kafka-integration.html http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html)并将其作为“依赖项工件”添加到 Zeppelin 的“spark”解释器中(也处理 %pyspark 段落)。我重新启动了这个解释器(还有齐柏林飞艇)。
我还在笔记本的第一个段落中加载了 jar(我首先认为这应该是没有必要的......):
%dep z.load("/usr/local/analyse/jar/spark-streaming-kafka-0-10_2.11.jar")
res0: org.apache.zeppelin.dep.Dependency = org.apache.zeppelin.dep.Dependency@2b65d5
所以,我没有收到错误,所以加载似乎有效。现在,我想做测试,kafka服务器使用这个端口在同一台机器上运行,并且还有一个主题“测试”:
%pyspark
# Subscribe to a topic
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test") \
.load()
但我得到了错误
无法执行第 6 行:.option("subscribe", "test") \ Traceback
(最近一次调用最后一次):文件
“/usr/local/analysis/spark/python/lib/pyspark.zip/pyspark/sql/utils.py”,
63号线,装饰风格
返回 f(*a, **kw) 文件“/usr/local/analysis/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py”,
第 328 行,在 get_return_value 中
format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: 调用 o120.load 时发生错误。 :
org.apache.spark.sql.AnalysisException:找不到数据源:
卡夫卡。请按照部署部分部署应用程序
《结构化流+Kafka集成指南》。;在
org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:652)
在
org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:161)
在 sun.reflect.NativeMethodAccessorImpl.invoke0(本机方法) 处
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
在
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
在 java.lang.reflect.Method.invoke(Method.java:498) 处
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) 在
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 在
py4j.Gateway.invoke(Gateway.java:282) 在
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
在 py4j.commands.CallCommand.execute(CallCommand.java:79) 处
py4j.GatewayConnection.run(GatewayConnection.java:238) 在
java.lang.Thread.run(Thread.java:748)
在处理上述异常的过程中,又出现了一个异常:
回溯(最近一次调用最后一次):文件
“/tmp/zeppelin_pyspark-312826888257172599.py”,第 380 行,在
exec(code, _zcUserQueryNameSpace) 文件“”,第 6 行,在文件中
“/usr/local/analysis/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py”,
400号线,负载中
返回 self._df(self._jreader.load()) 文件“/usr/local/analysis/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py”,
第 1257 行,在call答案,self.gateway_client,self.target_id,self.name)文件“/usr/local/analysis/spark/python/lib/pyspark.zip/pyspark/sql/utils.py”,
69号线,装饰风格
raise AnalysisException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.AnalysisException: '未能找到数据源:
卡夫卡。请按照部署部分部署应用程序
“结构化流+ Kafka 集成指南”。;'
我想知道至少其中一项调整(解释器配置或直接加载)应该有效。
我还在控制台上尝试了spark-submit --jar /usr/local/analysis/jar/spark-streaming-kafka-0-10_2.11.jar,但这似乎只有在我提交程序时才有效。
因此,我还将spark-streaming-kafka-0-10_2.11.jar复制到/usr/local/analysis/spark/jars/,其中所有其他spark jar都在其中。但在重新启动(火花和齐柏林飞艇)后,我总是遇到同样的错误。
与此同时,我发现我可以在网络浏览器中查看spark的环境变量,并且在“Classpath Entries”部分中找到spark-streaming-kafka-0-10_2.11.jar,源为“System Classpath”也作为“由用户添加”(似乎是 Zeppelin 解释器部分中的工件)。所以看来我的前两次尝试应该有效。