Spark 流中找不到 KafkaUtils 类

2023-11-20

我刚刚开始使用 Spark Streaming,正在尝试构建一个示例应用程序来计算 Kafka 流中的单词数。虽然它编译的是sbt package,当我运行它时,我得到NoClassDefFoundError. This post似乎有同样的问题,但解决方案是针对 Maven 的,我无法用 sbt 重现它。

KafkaApp.scala:

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._

object KafkaApp {
  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("kafkaApp").setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(1))
    val kafkaParams = Map(
        "zookeeper.connect" -> "localhost:2181",
        "zookeeper.connection.timeout.ms" -> "10000",
        "group.id" -> "sparkGroup"
    )

    val topics = Map(
        "test" -> 1
    )

    // stream of (topic, ImpressionLog)
    val messages = KafkaUtils.createStream(ssc, kafkaParams, topics, storage.StorageLevel.MEMORY_AND_DISK)
    println(s"Number of words: %{messages.count()}")
  }
}

build.sbt:

name := "Simple Project"

version := "1.1"

scalaVersion := "2.10.4"

libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-core" % "1.1.1",
    "org.apache.spark" %% "spark-streaming" % "1.1.1",
    "org.apache.spark" %% "spark-streaming-kafka" % "1.1.1"
)

resolvers += "Akka Repository" at "http://repo.akka.io/releases/"

我提交它:

bin/spark-submit \
  --class "KafkaApp" \
  --master local[4] \
  target/scala-2.10/simple-project_2.10-1.1.jar

Error:

14/12/30 19:44:57 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://[email protected]:65077/user/HeartbeatReceiver
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils$
    at KafkaApp$.main(KafkaApp.scala:28)
    at KafkaApp.main(KafkaApp.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:329)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtils$
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)

Spark-submit不会自动放入包含KafkaUtils的包。您的项目中需要有 JAR。为此,您需要创建一个包含所有内容的 uber-jar,使用sbt组装。这是一个示例 build.sbt 。

https://github.com/tdas/spark-streaming-external-projects/blob/master/kafka/build.sbt

显然您还需要将程序集插件添加到 SBT。

https://github.com/tdas/spark-streaming-external-projects/tree/master/kafka/project

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark 流中找不到 KafkaUtils 类 的相关文章

随机推荐

  • 如何在同一手机上打开 WhatsApp Business 应用程序中的 wa.me 链接而不是个人 WhatsApp?

    我们在 Angular 8 中有一个 Web 应用程序 我们有以下代码来显示一个按钮 用户可以单击该按钮通过 WhatsApp 向联系人发送预定义的消息 a target blank href https wa me internation
  • 在 CakePHP 3 中加载来自供应商的 javascript 文件

    我的问题是如何从 CakePHP 3 0 中的供应商文件夹加载 js 文件 我已经通过 Composer 包含了 twitter bootstrap js 文件位于 vendor twbs bootstrap sass assets jav
  • 无法写入二进制文件

    我有以下 C 代码 int arr 1 2 3 4 5 6 7 8 9 10 ofstream output Sample txt ios out ios binary for int i 0 i lt 10 i output lt
  • 找不到强大的模块 - Node.js

    我开始使用 node j 进行开发 我遇到了有关使用 formidable 模块的问题 我有这个错误 错误 找不到模块 强大 以下是使用 npm lsinstalled 安装的模块列表 email protected email prote
  • 从命令行进行 MySQL 备份和恢复

    我正在使用命令行来备份和恢复 MYSQL 数据库 让我们使用我有一个数据库 Data1 其中包含视图和过程 当在Cmd line I use mysqldump i e gt bin gt mysqldump u root proot Da
  • CSS“content”属性中的 HTML 特殊字符

    我正在尝试使用 CSS before content 字段在项目之前添加内容 我想插入一个复选标记 但如果我在内容选项中使用它 它会打印为文字 我如何告诉 CSS 使其成为复选标记 而不是文字字符串 尝试这个 target before c
  • 如果用户 = 系统,.NET 用户设置存储在哪里?

    我一直在使用更新来更新我的一个应用程序并使用Properties Settings Default Upgrade 并发现我的更新程序重新启动我的应用程序后 它在系统用户而不是默认 登录用户下运行 这让我想知道 SYSTEM 的 user
  • 如何在 Perl 中使用变量作为正则表达式修饰符?

    我正在编写一个抽象函数 它将询问用户一个给定的问题并根据给定的正则表达式验证答案 重复该问题 直到答案与验证正则表达式匹配 但是 我还希望客户端能够指定答案是否必须区分大小写 所以像这样 sub ask my prompt validati
  • 如何检查 CachedRowSet 中是否存在列名?

    我正在从可能发生变化的视图中查询数据 在执行操作之前我需要知道该列是否存在crs get 我发现我可以查询metadata像这样在我请求数据之前查看一列是否存在 ResultSetMetaData meta crs getMetaData
  • 使用预定义的自定义键顺序数组按键对数组进行排序

    a array 0 gt a 1 gt b 2 gt c 3 gt d 我想将顺序更改为3 2 0 1 a array 3 gt d 2 gt c 0 gt a 1 gt b 如果您想以编程方式更改顺序 请查看各种PHP 中的数组排序函数
  • C# XNA 鼠标位置

    我在 XNA 中的鼠标坐标遇到一些问题 0x0 任意靠近 但不在 屏幕的左上角 我现在正在窗口模式下运行游戏 但坐标是基于屏幕的 而不是游戏窗口的 尽管 XNA 文档告诉我应该是其他情况 提前致谢 这是代码 namespace TheGam
  • MATLAB 编辑器中可以进行多光标编辑吗?

    Is there a way to multiline edit code in matlab Instead of copy paste a single line repeatedly or copy paste from matlab
  • 用连字符分隔的大小写的名称是什么?

    这是帕斯卡命名法 SomeSymbol 这是驼峰式命名法 someSymbol 这是snake case some symbol 所以我的问题是是否有一个被广泛接受的名称 some symbol 它通常用于 url 中 这个案例约定并没有真
  • C++ 时间戳到人类可读的日期时间函数

    我有一个简单的函数 我需要从时间戳返回人类可读的日期时间 但不知何故 它以秒为单位返回相同的时间戳 输入1356953890 std string UT timeStampToHReadble long timestamp const ti
  • 如何验证sqlplus可以连接?

    我想知道是否有可能获得sqlplus以某种方式输出以发现我的数据库是否已启动 我想在数据库上运行脚本列表 但在执行此操作之前 我想知道数据库是否已启动并正在使用我的脚本运行 这是我尝试过的 sqlplus DB1 lt lt EOF gt
  • 如何仅当querytext的长度等于2时才开始过滤

    如何才能仅当查询文本的长度等于 2 时才开始过滤 我有这段代码 但我不知道如何仅在 querytext length gt 2 时开始过滤
  • 关于 C 系列语言中有符号整数的使用

    当在我自己的代码中使用整数值时 我总是尝试考虑符号性 问自己整数是否应该有符号或无符号 当我确定该值永远不需要为负数时 我会使用无符号整数 我不得不说这种情况大多数时候都会发生 在阅读其他人的代码时 我很少看到无符号整数 即使表示的值不能为
  • 事件源与原始源

    我正在阅读 C WPF 书 在路由事件章节中 事件有 2 个相同的属性Source and OriginalSource 我没有看到它们之间的区别 Xaml
  • JasperReports:如何在两列中显示数据

    这是我当前的 jrxml 文件
  • Spark 流中找不到 KafkaUtils 类

    我刚刚开始使用 Spark Streaming 正在尝试构建一个示例应用程序来计算 Kafka 流中的单词数 虽然它编译的是sbt package 当我运行它时 我得到NoClassDefFoundError This post似乎有同样的