AbstractMethodError 创建 Kafka 流

2023-12-21

我正在尝试使用以下命令打开 Kafka(尝试过版本 0.11.0.2 和 1.0.1)流createDirectStream方法并收到此 AbstractMethodError 错误:

Exception in thread "main" java.lang.AbstractMethodError
    at org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala:99)
    at org.apache.spark.streaming.kafka010.KafkaUtils$.initializeLogIfNecessary(KafkaUtils.scala:39)
    at org.apache.spark.internal.Logging$class.log(Logging.scala:46)
    at org.apache.spark.streaming.kafka010.KafkaUtils$.log(KafkaUtils.scala:39)
    at org.apache.spark.internal.Logging$class.logWarning(Logging.scala:66)
    at org.apache.spark.streaming.kafka010.KafkaUtils$.logWarning(KafkaUtils.scala:39)
    at org.apache.spark.streaming.kafka010.KafkaUtils$.fixKafkaParams(KafkaUtils.scala:201)
    at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.<init>(DirectKafkaInputDStream.scala:63)
    at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:147)
    at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:124)

我就是这样称呼它的:

val preferredHosts = LocationStrategies.PreferConsistent
    val kafkaParams = Map(
      "bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[IntegerDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupId,
      "auto.offset.reset" -> "earliest"
    )

    val aCreatedStream = createDirectStream[String, String](ssc, preferredHosts,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))

我在 9092 上运行 Kafka,我能够创建生产者和消费者并在它们之间传递消息,所以不确定为什么它不能在 Scala 代码中工作。任何想法表示赞赏。


结果我使用的是 Spark 2.3,而我应该使用 Spark 2.2。显然该方法在后来的版本中被抽象化,所以我收到了这个错误。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

AbstractMethodError 创建 Kafka 流 的相关文章

  • 带可变参数的 Spark UDF

    如文档中所示 列出最多 22 个参数是唯一的选择吗 https spark apache org docs 1 5 0 api scala index html org apache spark sql UDFRegistration ht
  • Kafka:如何获取主题的最后修改时间,即添加到主题的任何分区的最后一条消息

    我们的用例是从 kafka 中删除陈旧 未使用的主题 即如果某个主题 在所有分区上 在过去 7 天内没有任何新消息 那么我们会将其视为陈旧 未使用并删除它 许多谷歌结果建议向消息添加时间戳 然后解析它 对于新主题和消息 灵魂可以工作 但我们
  • 使用 PySpark 从 azure blob 存储读取 csv 文件

    我正在尝试使用 Microsoft Azure 上的 PySpark HDInsight 集群来做一个机器学习项目 要在我的集群上进行操作 请使用 Jupyter 笔记本 另外 我的数据 一个 csv 文件 存储在 Azure Blob 存
  • 了解Kafka流groupBy和window

    我无法理解 kafka 流中的 groupBy groupById 和窗口的概念 我的目标是聚合一段时间内 例如 5 秒 的流数据 我的流数据看起来像 value 0 time 1533875665509 value 10 time 153
  • akka-http:找不到参数解组的隐式值

    我的 Spray json 支持看起来像这样 object MarshallingSupport extends SprayJsonSupport implicit def json4sFormats Formats DefaultForm
  • Scala 中的行聚合

    我正在寻找一种方法在 Scala 的数据框中获取一个新列来计算min max中的值col1 col2 col10对于每一行 我知道我可以使用 UDF 来做到这一点 但也许有一种更简单的方法 Thanks Porting 这个Python答案
  • Scala 功能设计模式目录

    一周以来我一直在阅读 Scala 编程 作者一步一步地介绍了该语言的元素 但我仍然很困惑何时使用演员 闭包 柯里化等功能性的东西 我正在寻找功能结构的典型用例或最佳实践的目录 我并不是说在 Scala 中重新实现像 GoF 这样的众所周知的
  • 当我们在 Apache Spark 中使用时,无法找到 Set([TOPIC NAME,0])) 的领导者

    我们使用 Apache Spark 1 5 1 和 kafka 2 10 0 8 2 1 以及 Kafka DirectStream API 通过 Spark 从 Kafka 获取数据 我们使用以下设置在 Kafka 中创建了主题 复制因子
  • 创建自定义 scala 集合,其中映射默认返回自定义集合?

    特质TraversableLike A Repr 允许人们在其中进行收藏some函数将返回一个Repr 而其他人则继续返回类型参数That在功能上 有没有办法定义一个CustomCollection A 其中函数如map 其他的默认That
  • 使用 Spring Embedded Kafka 测试 @KafkaListener

    我正在尝试为我正在使用 Spring Boot 2 x 开发的 Kafka 侦听器编写单元测试 作为一个单元测试 我不想启动一个完整的 Kafka 服务器作为 Zookeeper 的实例 所以 我决定使用 Spring Embedded K
  • 如何检测 KTable 连接的哪一侧触发了更新?

    当您在 Kafka 中连接两个表时 每次更新两个 KTable 之一时 您的输出 Ktable 也会更新 想象一下你正在加入Customers与一个列表Orders你已经适当减少了 再次想象一下 您使用此连接的结果来为最终客户提供特别优惠和
  • Apache Spark 和 scikit_learn 之间的 KMeans 结果不一致

    我正在使用 PySpark 对数据集执行聚类 为了找到簇的数量 我对一系列值 2 20 进行了聚类 并找到了wsse 簇内平方和 每个值的值k 在这里我发现了一些不寻常的东西 根据我的理解 当你增加集群数量时 wsse单调递减 但我得到的结
  • 如何使用 FS2 中的分类器函数对对象进行分组?

    我有一个无序的流measurements 我想将其分组为固定大小的批次 以便以后可以有效地保留它们 val measurements for id lt Seq foo bar baz value lt 1 to 5 yield id va
  • 如何防止 SQL Server 在导入数据时去除前导零

    A data file被导入到SQL Server桌子 数据文件中的一列是文本数据类型 该列中的值只能是整数 SQL Server 数据库中目标表中的相应列的类型为varchar 100 但在数据导入后 SQL Server 会存储以下值
  • 相当于 scala 中的 python repr()

    有没有相当于Python的东西reprscala 中的函数 即 您可以给任何 scala 对象提供一个函数 它将生成该对象的字符串表示形式 该对象是有效的 scala 代码 eg val l List Map 1 gt a print re
  • WSClient - 打开的文件太多

    我正在 CentOS 6 上使用 Play Framework 2 4 我的应用程序抛出此异常 java net SocketException Too many open files 我在 Stack Overflow 上搜索了很多主题并
  • Spark 数据帧分组、排序和选择一组列的顶部行

    我正在使用 Spark 1 5 0 我有一个包含以下列的 Spark 数据框 user id description fName weight 我想做的是为每个用户选择前 10 行和后 10 行 基于列权重的值 数据类型为 Double 如
  • Spark SQL/Hive 查询通过 Join 永远持续下去

    所以我正在做一些应该很简单的事情 但显然它不在 Spark SQL 中 如果我在 MySQL 中运行以下查询 查询将在不到一秒的时间内完成 SELECT ua address id FROM user u inner join user a
  • 为什么我在 Mac 上看到“java.lang.reflect.InaccessibleObjectException: Unable to make private java.nio.DirectByteBuffer(long,int)accessibl

    我已经在工作中愉快地构建代码好几天了 但突然我的一个项目 不是全部 失败并出现此错误消息 看看下面的答案吧 我是如何修复它的 起初我用谷歌搜索 看到很多有这个问题的人正在使用 Java 16 但我认为 错误 我正在使用 Java 11 因为
  • 如何在scala中生成n-gram?

    我正在尝试在 scala 中编写基于 n gram 的分离新闻算法 如何为大文件生成 n gram 例如 对于包含 蜜蜂是蜜蜂中的蜜蜂 的文件 首先它必须选择一个随机的 n 元语法 例如 蜜蜂 然后它必须寻找以 n 1 个单词开头的 n 元

随机推荐

  • Android:对Fragment使用Activity的默认动画

    我需要在片段替换中使用活动默认动画 但是使用android R anim我找不到它 我怎样才能找到它的名字 是否可以默认使用它 或者我必须手动创建动画才能使用它 预先非常感谢 在搜索了这个确切的问题后 我在 Google 论坛上找到了这个资
  • Liferay 7 无法设置全局会话属性

    我正在尝试设置会话属性 HTTP 或 Portlet 会话 以便我可以全局访问它 通过门户 但是 在获取 Session 属性时 它返回的是 null 而不是实际值 设置会话属性 Component immediate true prope
  • 调用 PowerMockito.mockStatic 时出现 AbstractMethodError

    我正在尝试使用 PowerMockito 来模拟静态调用 然而 当我尝试这样做时 PowerMockito mockStatic调用抛出一个 AbstractMethodError java lang AbstractMethodError
  • Android WebView:检查网页是否消耗了触摸

    如果我触摸我的 WebView 有没有办法检查我的 WebView 内容 网页 是否实际使用 消耗了触摸 在这种情况下我想要做什么 如果用户触摸屏幕一侧的边距 如果下面没有按钮 href等 当网页内容没有使用 消耗触摸时 我自己使用它 在
  • 登录前添加监听

    我一直在使用 symfony 3 创建 Web 应用程序 我已使用 EWZRecaptchaBundle 将 Recaptcha 添加到我的登录表单中 如何在登录前添加侦听器以验证 Recaptcha 的验证
  • Raku:捕获标记的效果在“更高处”消失

    以下 Raku 脚本 usr bin env raku use v6 d grammar MyGrammar rule TOP
  • 使用 Python 的二维数组(图像)中的像素邻居

    我有一个像这样的 numpy 数组 x np array 1 2 3 4 5 6 7 8 9 我需要创建一个函数 我们将其称为 邻居 并带有以下输入参数 x 一个 numpy 二维数组 i j 二维数组中元素的索引 d 邻域半径 作为输出
  • 将 Pandas DataFrame 转换为对象列表

    我想将 Pandas DataFrame 转换为对象列表 这是我的课 class Reading def init self self HourOfDay int 0 self Percentage float 0 我读过 to dict
  • 启动多个线程,只等待一个线程完成即可获取结果

    假设我有这个功能double someRandomFunction int n 它接受一个整数并返回双精度值 但它是随机的 因为它会尝试随机的东西来得出解决方案 因此即使您使用相同的参数运行该函数 有时也可能需要 10 秒才能完成 其他则需
  • 当选项更改时,单选按钮“Checked=checked”不会更改

    我创建了一个基本的 2 单选按钮表单 如下面的示例所示 观察浏览器渲染 我们看到第 1 项被选中 我们检查第 1 项和第 2 项 当我单击第 2 项时 我希望删除第 1 项的 check checked 我希望第 2 项收到属性 selec
  • 我通过访问内存位置来更改 const 变量的值。为什么不起作用?

    我试图理解const在c 中 我写了以下代码片段 const int x 5 int ptr ptr int x cout lt lt address of x lt lt x lt
  • 如何使用jsoup限制下载大小?

    我正在尝试使用 JSoup 限制下载页面 链接的大小 如下所示 Scala 代码 val document Jsoup connect theURL get 我只想获取给定页面的前几 KB 并停止尝试下载超出该范围的内容 如果有一个非常大的
  • 只有一个元素的双边框

    我试图获得一个双边框 下划线 标题 第一个是全宽 第二个是文本宽度 边界应该重叠 有一个简单的解决方案 其中两个元素嵌套如下 h1 span Title span h1 and css h1 border bottom 1px solid
  • Django WSGI 应用程序 SegFault

    所以我通过 apache 2 22 在 mod wsgi 中的 django wsgi 应用程序遇到段错误 这是 wsgi 应用程序 import os import sys sys path append home com zpencer
  • Moose 与 Perl 类似 C++ 的 OOP 用法

    我一直在玩Moose 找到了感觉 我想要一个纯虚函数的例子 就像 C 中的那样 但用 Moose 的说法 特别是以 C 的方式 我知道即使 Moose 强加了比普通 Perl 更严格的模型 仍然有不止一种方法可以完成我所要求的任务 通过方法
  • 如何在 FastAPI 中执行 Post/Redirect/Get (PRG)?

    我正在尝试从 POST 重定向到 GET 如何在 FastAPI 中实现这一点 你尝试了什么 我按照建议尝试了以下 HTTP 302 FOUND HTTP 303 SEE OTHER问题 863 FastAPI https github c
  • C++初学者问题:解引用与乘法[重复]

    这个问题在这里已经有答案了 刚刚接触C 当我看到乘法符号时 我经常偏离轨道 用于表示变量的取消引用 例如 unsigned char pixels vidgrabber getPixels 这会让其他人感到厌烦吗 让我思考这个问题的秘诀是什
  • 将字符串数组传递给 webservice 方法

    我有一个使用这种方法的网络服务 WebMethod public int stringTest string tString int numberOfStrings tString Length int returnS new int nu
  • 如何使JavaFX应用程序始终位于其他应用程序之上?

    我制作了一个作为 Mac 应用程序启动的 JavaFX 应用程序 我希望启动它 以便窗口始终位于其他应用程序之上 我怎样才能做到这一点 从 Java 8u20 ea b15 和 Java 8u6 开始 您可以执行以下操作stage setA
  • AbstractMethodError 创建 Kafka 流

    我正在尝试使用以下命令打开 Kafka 尝试过版本 0 11 0 2 和 1 0 1 流createDirectStream方法并收到此 AbstractMethodError 错误 Exception in thread main jav