具有流源的查询必须使用 writeStream.start();; 执行

2023-12-21

我正在尝试使用 Spark 结构化流从 Kafka 读取数据并预测传入数据。我正在使用使用 Spark ML 训练过的模型。

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .master("local")
  .getOrCreate()
import spark.implicits._

val toString = udf((payload: Array[Byte]) => new String(payload))
val sentenceDataFrame = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe", "topicname1")
  .load().selectExpr("CAST(value AS STRING)").as[(String)]
sentenceDataFrame.printSchema()
val regexTokenizer = new RegexTokenizer()
  .setInputCol("value")
  .setOutputCol("words")
  .setPattern("\\W")
val tokencsv = regexTokenizer.transform(sentenceDataFrame)
val remover = new StopWordsRemover()
  .setInputCol("words")
  .setOutputCol("filtered")

val removestopdf = remover.transform(tokencsv)
// Learn a mapping from words to Vectors.
val word2Vec = new Word2Vec()
  .setInputCol("filtered")
  .setOutputCol("result")
  .setVectorSize(300)
  .setMinCount(0)

val model = word2Vec.fit(removestopdf)

val result = model.transform(removestopdf)


val featureIndexer = new VectorIndexer()
  .setInputCol("result")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(2)
  .fit(result)

val some = featureIndexer.transform(result)

val model1 = RandomForestClassificationModel.load("/home/akhil/Documents/traindata/stages/2_rfc_80e12c5d1259")

  val predict = model1.transform(result)

val query = predict.writeStream
  .outputMode("append")
  .format("console")
  .start()
query.awaitTermination()

当我对流数据进行预测时,它给出以下错误:

 Exception in thread "main" org.apache.spark.sql.AnalysisException: 
 Queries with streaming sources must be executed with 
writeStream.start();;
kafka
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:196)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:35)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:33)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:128)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:33)
at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:58)
at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:69)
at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:67)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:73)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:73)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:79)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:84)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:84)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87)
at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:2547)
at org.apache.spark.sql.Dataset.rdd(Dataset.scala:2544)
at org.apache.spark.ml.feature.Word2Vec.fit(Word2Vec.scala:175)
at predict1model$.main(predict1model.scala:53)
at predict1model.main(predict1model.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

错误指的是 word2vec.fit(removestopdf) 行。任何帮助将非常感激 。


一般来说,结构化流不能(从 Spark 2.2 开始)用于训练 Spark ML 模型。 结构化流不支持某些操作。其中之一是转变Dataset to its rdd表示。 特别是以下情况word2Vec, 它需要去rdd实施水平fit https://github.com/apache/spark/blob/fa225da7463e384529da14706e44f4a09772e5c1/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala#L176.

尽管如此,可以在静态数据集上训练模型并将预测应用于流数据。这transform操作可用于流式传输Dataset,如上所示:val result = model.transform(removestopdf)

简而言之,我们需要fit the model在静态数据集上。所结果的transformer can be applied到流媒体Dataset.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

具有流源的查询必须使用 writeStream.start();; 执行 的相关文章

  • 带预览和进度栏的 Twitter Bootstrap 图像上传

    我如何使用 Twitter Bootstrap 上传带有预览和进度条的单个图像 目前 在保存图像之前 我看不到上传图像的任何预览或进度条 Jasny 的 Bootstrap 分支让您能够接近这一点 看文档 http jasny github
  • Javascript:在函数内调用函数时 window.location.href 不会重定向

    单击按钮时 window location href 会将浏览器重定向到 stackoverflow com 但在输入文本字段中按 Enter 键时不会将浏览器重定向到 stackoverflow com 尽管两个事件侦听器使用相同的函数
  • 使用空的weak_ptr作为参数调用map::count安全吗?

    打电话安全吗map count http www cplusplus com reference map map count on an 未初始化因此为空weak ptr http en cppreference com w cpp mem
  • 如何在 Scala Play 框架中进行 Twitter 反向身份验证?

    我正在编写一个 play 应用程序 在 scala 中 并且正在尝试执行 twitter 此处概述的反向身份验证步骤 https dev twitter com docs ios using reverse auth https dev t
  • 使用 DataMapper 而不是 ActiveRecord [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi
  • 将同步 zip 操作转换为异步

    我们有一个现有的库 其中一些方法需要转换为异步方法 但是我不确定如何使用以下方法执行此操作 错误处理已被删除 该方法的目的是压缩文件并将其保存到磁盘 请注意 zip 类不公开任何异步方法 public static bool ZipAndS
  • Adobe Illustrator 中的折线简化如何工作?

    我正在开发一个记录笔划的应用程序 您可以使用定点设备来绘制笔划 在上图中 我绘制了一个笔划 其中包含 453 个数据点 我的目标是大幅减少数据点的数量 同时仍然保持原始笔画的形状 对于那些感兴趣的人 上图笔画的坐标可以作为GitHub 上的
  • Turbolinks 访问的页面中缺少 hubspot 聊天界面,但可用于全页面刷新

    我想将 hubspot 聊天界面集成到我的 Rails 4 Turbolinks 应用程序中 我已将 Google 跟踪代码管理器配置为在每个页面加载事件中显示支持聊天界面 该界面工作正常 GTM 标签 自定义 html PROBLEM 当
  • 具有重复值的 Sqlite 列

    就说专栏吧aSQLite 数据库的非常重复 始终有相同的 4 个值 其他值可能稍后出现 但不同值的数量将少于 1000 个 VALUES hello world it s a shame to store this str many tim
  • Haskell:如何创建将函数应用于元组项的最通用函数

    这是一个个人练习 旨在更好地理解 Haskell 类型系统的局限性 我想创建最通用的函数 将某些函数应用于 2 条目元组中的每个条目 例如 applyToTuple fn a b fn a fn b 我试图让这个函数在以下每种情况下都起作用
  • Centos/Linux 将 logrotate 设置为所有日志的最大文件大小

    我们使用 logrotate 并且它每天运行 现在我们遇到了一些情况 日志显着增长 阅读 gigabaytes 并杀死我们的服务器 所以现在我们想为日志设置最大文件大小 我可以将其添加到 logrotate conf 中吗 size 50M
  • NHibernate:无状态会话错误消息无法获取代理

    我正在使用 nHibernate 无状态会话来获取对象 更新一个属性并将对象保存回数据库 我不断收到错误消息 无状态会话无法获取代理 我在其他地方有类似的代码 所以我不明白为什么这不起作用 有谁知道问题可能是什么 我正在尝试更新Screen
  • 如何从我的 appDelegate 访问我的 viewController? iOS系统

    我有一个在 xCode 中创建为 基于视图的应用程序 的 iOS 应用程序 我只有一个 viewController 但它会自动显示 而且我没有看到任何将它与我的 appDelegate 关联的代码 我需要将数据从 appDelegate
  • .gitignore:如何忽略嵌套目录?

    我有以下目录结构 test a test b c test a b Ouput test c d e Output test f Output 我想忽略 test 下的所有 Output 目录 我试过test Output 但没有成功 我究
  • Pandas 2 个字段中唯一值的数量

    我正在尝试查找覆盖 2 个字段的唯一值的数量 例如 一个典型的例子是姓氏和名字 我有一个数据框 当我执行以下操作时 我只获取每列的唯一字段数 在本例中为 最后一个 和 第一个 不是复合体 df Last Name First Name nu
  • 如何从Python枚举类中获取所有值?

    我正在使用 Enum4 库创建一个枚举类 如下所示 class Color Enum RED 1 BLUE 2 我要打印 1 2 作为某处的列表 我怎样才能实现这个目标 您可以执行以下操作 e value for e in Color
  • 在着色器中旋转法线

    我有一个场景 其中有多个具有各自位置和旋转的模型 给定法线 着色器对每个像素应用简单的双向照明 那是我的顶点着色器 version 150 in vec3 position in vec3 normal in vec2 texcoord o
  • 在reactjs中停止超时?

    有没有办法可以杀死 摆脱 reactjs 中的超时 setTimeout function do something bind this 3000 通过某种点击或操作 我希望能够完全停止并结束超时 有没有办法做到这一点 谢谢 假设这种情况发
  • 如何使用javascript取消设置变量? [复制]

    这个问题在这里已经有答案了 这是我到目前为止所尝试的 var nxt I am next window onscroll function var scr this pageYOffset if scr gt 400 console log
  • 应用服务器如何注入私有字段?

    我看到这个问题 注入私有 包或公共字段或提供 setter https stackoverflow com questions 2021716 inject into private package or public field or p

随机推荐