结构化流Kafka 2.1->Zeppelin 0.8->Spark 2.4:spark不使用jar

2024-01-10

我有一个 Kafka 2.1 消息代理,想要对 Spark 2.4 中的消息数据进行一些处理。我想使用 Zeppelin 0.8.1 笔记本进行快速原型设计。

我下载了结构化流所需的spark-streaming-kafka-0-10_2.11.jar(http://spark.apache.org/docs/latest/structed-streaming-kafka-integration.html http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html)并将其作为“依赖项工件”添加到 Zeppelin 的“spark”解释器中(也处理 %pyspark 段落)。我重新启动了这个解释器(还有齐柏林飞艇)。

我还在笔记本的第一个段落中加载了 jar(我首先认为这应该是没有必要的......):

%dep z.load("/usr/local/analyse/jar/spark-streaming-kafka-0-10_2.11.jar")
res0: org.apache.zeppelin.dep.Dependency = org.apache.zeppelin.dep.Dependency@2b65d5

所以,我没有收到错误,所以加载似乎有效。现在,我想做测试,kafka服务器使用这个端口在同一台机器上运行,并且还有一个主题“测试”:

%pyspark
# Subscribe to a topic
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "test") \
  .load()

但我得到了错误

无法执行第 6 行:.option("subscribe", "test") \ Traceback (最近一次调用最后一次):文件 “/usr/local/analysis/spark/python/lib/pyspark.zip/pyspark/sql/utils.py”, 63号线,装饰风格 返回 f(*a, **kw) 文件“/usr/local/analysis/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py”, 第 328 行,在 get_return_value 中 format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: 调用 o120.load 时发生错误。 : org.apache.spark.sql.AnalysisException:找不到数据源: 卡夫卡。请按照部署部分部署应用程序 《结构化流+Kafka集成指南》。;在 org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:652) 在 org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:161) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(本机方法) 处 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:498) 处 py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) 在 py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 在 py4j.Gateway.invoke(Gateway.java:282) 在 py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 在 py4j.commands.CallCommand.execute(CallCommand.java:79) 处 py4j.GatewayConnection.run(GatewayConnection.java:238) 在 java.lang.Thread.run(Thread.java:748)

在处理上述异常的过程中,又出现了一个异常:

回溯(最近一次调用最后一次):文件 “/tmp/zeppelin_pyspark-312826888257172599.py”,第 380 行,在 exec(code, _zcUserQueryNameSpace) 文件“”,第 6 行,在文件中 “/usr/local/analysis/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py”, 400号线,负载中 返回 self._df(self._jreader.load()) 文件“/usr/local/analysis/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py”, 第 1257 行,在call答案,self.gateway_client,self.target_id,self.name)文件“/usr/local/analysis/spark/python/lib/pyspark.zip/pyspark/sql/utils.py”, 69号线,装饰风格 raise AnalysisException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.AnalysisException: '未能找到数据源: 卡夫卡。请按照部署部分部署应用程序 “结构化流+ Kafka 集成指南”。;'

我想知道至少其中一项调整(解释器配置或直接加载)应该有效。

我还在控制台上尝试了spark-submit --jar /usr/local/analysis/jar/spark-streaming-kafka-0-10_2.11.jar,但这似乎只有在我提交程序时才有效。

因此,我还将spark-streaming-kafka-0-10_2.11.jar复制到/usr/local/analysis/spark/jars/,其中所有其他spark jar都在其中。但在重新启动(火花和齐柏林飞艇)后,我总是遇到同样的错误。

与此同时,我发现我可以在网络浏览器中查看spark的环境变量,并且在“Classpath Entries”部分中找到spark-streaming-kafka-0-10_2.11.jar,源为“System Classpath”也作为“由用户添加”(似乎是 Zeppelin 解释器部分中的工件)。所以看来我的前两次尝试应该有效。


第一个问题是您已经下载了 Spark Streaming 包,但尝试创建一个结构化流对象(使用readstream())。请记住,Spark 流和 Spark 结构化流是两个不同的东西,需要区别对待。

对于结构化流媒体,您需要下载软件包Spark-SQL-Kafka-0-10_2.11 https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10_2.11/2.4.0及其依赖项卡夫卡客户端 https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients/2.0.0, slf4j-api https://mvnrepository.com/artifact/org.slf4j/slf4j-api/1.7.16, snappy-java https://mvnrepository.com/artifact/org.xerial.snappy/snappy-java/1.1.7.1, lz4-java https://mvnrepository.com/artifact/org.lz4/lz4-java/1.4.0 and unused https://mvnrepository.com/artifact/org.spark-project.spark/unused/1.0.0。您的依赖项部分应如下所示以加载所有必需的包:

z.load("/tmp/spark-sql-kafka-0-10_2.11-2.4.0.jar")
z.load("/tmp/kafka-clients-2.0.0.jar")
z.load("/tmp/lz4-java-1.4.0.jar")
z.load("/tmp/snappy-java-1.1.7.1.jar")
z.load("/tmp/unused-1.0.0.jar")
z.load("/tmp/slf4j-api-1.7.16.jar")
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

结构化流Kafka 2.1->Zeppelin 0.8->Spark 2.4:spark不使用jar 的相关文章

随机推荐

  • Android fastboot 等待设备

    我正在尝试在我的 NVIDIA 测试 git 上加载自定义内核 我输入了fastboot boot myImage之后我得到
  • 为什么我不能从 ruby​​ 中选定的键创建新的哈希值?

    这已经困扰我一段时间了 这不是一件困难的事情 但我不知道为什么已经没有简单的方法可以做到这一点 我敢打赌有 但我没有看到它 我只想进行哈希处理 如下所示 cars bob gt Pontiac fred gt Chrysler lisa g
  • 从文本文件读取数据到 numpy 数组

    我有一个包含一些元数据的文件 然后是一些由两列和标题组成的实际数据 在 numpy 中使用 genfromtxt 之前是否需要分离两种类型的数据 或者我可以以某种方式分割数据吗 将文件指针放在标题上方的行尾 然后从那里尝试 genfromt
  • Android - 双击WebView检测

    我正在创建一个带有 WebView 和页面顶部菜单的应用程序 我使该菜单不可见 并且仅在单击按钮时才会出现 这工作正常 但该按钮无法很好地适应任何地方 因此我没有单击按钮 而是在双击屏幕时出现菜单 所以问题就在这里 双击仅适用于空白区域 但
  • 除了 Haxe 之外,还有其他针对 PHP 的编程语言吗?

    PHP 并没有得到太多的喜爱 但在易于部署 对于廉价托管 方面仍然是赢家 有没有任何编程语言 除了Haxe http haxe org 那个目标 PHP 用这种语言编写应用程序 然后将其翻译成 PHP 就像某些语言以 C 作为中间语言一样
  • 如何使用 NHibernate 插入或更新(或覆盖)记录?

    我需要将一行写入数据库 无论它是否已经存在 在使用 NHibernate 之前 这是通过存储过程完成的 该过程将尝试更新 如果没有行被修改 它将回退到插入 这很有效 因为应用程序并不关心记录是否存在 对于 NHibernate 我发现的解决
  • 如何访问 Galaxy S5 心率传感器?

    In the 三星的编程指南 http img developer samsung com contents cmm SHealth SDK Package ProgrammingGuide 2 Bio Physiology eng v1
  • 如何使用数组包含元素创建 NSPredicate

    我只是有Address有财产的实体favourites类型的 String Address objc Address class Address NSManagedObject Observer NSManaged var favourit
  • 在反应中一次导入多个图像文件的任何简单方法

    我输入以下内容来导入图像 需要一个简短的方法来做到这一点 import banner1 from imgs Banners banner1 jpg import banner2 from imgs Banners banner2 jpg i
  • 通过提升业力重用已解析的变量

    我有一个与下面的代码相当的代码库 我尝试生成一个文本文件 其内容是变量的两倍 我觉得答案在于语义动作以及 a 和 val 但即使有文档也无法完成 您将如何做才能拥有 str 中的 toto 和输出 托托一些东西托托 即如何在业力中重用已解析
  • 检测设备是否正在充电

    我找不到任何明确的用途我最喜欢的工具 http www google com 不过我想我会把它放在这里 有没有办法使用 iPhone SDK 让应用程序检测设备是否处于通电状态 充电 底座等 如果设备正在通电 否则它是用户指定的设置 我希望
  • Rails 迁移变更列

    We have script generate migration add fieldname to tablename fieldname datatype用于向模型添加新列的语法 在同一行上 我们是否有一个脚本 生成来更改列的数据类型
  • timessince 过滤器的格式

    有没有办法使用 date timesince 过滤器 但不是有两个相邻的单元 而是只显示一个 例如 我的模板当前显示 18 小时 16 分钟 我怎样才能让它显示 18小时 这里不考虑四舍五入 谢谢 我想不出一个简单的内置方法来做到这一点 这
  • JavaScript 如何创建稀疏数组?

    虽然我不知道这个功能的名称 a a 57 test console log a length console log a 57 console log a 我很敬畏这是可能的 由于具有 C 背景 这种行为与分配内存之类的行为非常不同 那么几
  • Elasticsearch-Kibana docker-compose - 禁止使用“elastic”值

    我想使用 docker compose 运行 elasticsearch 和 kibana 这是我的 docker compose yml 我使用 docker compose env file dev env up 运行 Docker 组
  • 如何运行 NDK 示例?

    很抱歉问这样一个菜鸟问题 但是 NDK 文档是错误的 r7b 如果您使用 ADT 在 Eclipse 中进行开发 请使用新建项目向导 使用 导入 为每个示例创建一个新的 Android 项目 从现有源 选项并从以下位置导入源
  • 如何在 CMake 中检查列表是否包含特定条目?

    我想检查列表是否包含特定条目 如以下代码片段所示 macro foo if ARGN contains bar endif endmacro CMake 不提供contains 获得所需结果的最佳 最简单方法是什么 在CMake的wiki
  • 如何在 UWP 应用中隐藏/折叠标题栏?

    有没有办法以某种方式隐藏 折叠 使 UWP 应用程序中的标题栏暂时不可见 但不能完全禁用 我知道可以使应用程序全屏显示 然后标题栏自动折叠 但我需要在可调整大小的桌面窗口中实现它 我还知道您可以自定义标题栏的外观 例如颜色等 原因 我的应用
  • CMSIS & STM32,如何开始? [关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 我想在 STM32 上使用 CMSIS 启动项目 网上一搜 没找到具体的教程 有些使用 SPL 开始项
  • 结构化流Kafka 2.1->Zeppelin 0.8->Spark 2.4:spark不使用jar

    我有一个 Kafka 2 1 消息代理 想要对 Spark 2 4 中的消息数据进行一些处理 我想使用 Zeppelin 0 8 1 笔记本进行快速原型设计 我下载了结构化流所需的spark streaming kafka 0 10 2 1