流式源的查询必须使用 writeStream.start() 执行;

2024-01-26

我正在尝试在 Spark 中读取来自 kafka(版本 10)的消息并尝试打印它。

     import spark.implicits._

         val spark = SparkSession
              .builder
              .appName("StructuredNetworkWordCount")
              .config("spark.master", "local")
              .getOrCreate()  

            val ds1 = spark.readStream.format("kafka")
              .option("kafka.bootstrap.servers", "localhost:9092")  
              .option("subscribe", "topicA")
              .load()

           ds1.collect.foreach(println)
           ds1.writeStream
           .format("console")
           .start()

           ds1.printSchema()

线程“main”中出现错误异常

org.apache.spark.sql.AnalysisException:使用流源的查询 必须使用 writeStream.start();; 执行


您正在对查询计划进行分支:从您尝试执行的同一个 ds1 开始:

  • ds1.collect.foreach(...)
  • ds1.writeStream.format(...){...}

但你只是打电话.start()在第二个分支上,让另一个分支悬空而没有终止,这反过来会引发您返回的异常。

解决方案是启动两个分支并等待终止。

val ds1 = spark.readStream.format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")  
  .option("subscribe", "topicA")  
  .load()
val query1 = ds1.collect.foreach(println)
  .writeStream
  .format("console")
  .start()
val query2 = ds1.writeStream
  .format("console")
  .start()

ds1.printSchema()
query1.awaitTermination()
query2.awaitTermination()
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

流式源的查询必须使用 writeStream.start() 执行; 的相关文章

随机推荐

  • 如何在非 GUI 应用程序中使用 QWebPage

    我想用QWebPage在非 GUI Qt 应用程序中 我的意思是我根本不想与窗口服务器通信 不过 使用 QtGui 不是问题 QWebPage内部创建一些QWidget实例 因此 使用QCoreApplication不可能 当创建一个QAp
  • 包含多个 Chart.js 图表的 pdf 页面

    我使用 Chart js 生成一个包含多个图表的报告页面 我需要将此报告导出为 PDF 通过搜索可以找到许多解决方案 但我找不到具有多个画布元素的解决方案 唯一可用的解决方案似乎是循环遍历所有图像 并使用图像重新创建报告 然后将其下载为 p
  • onKey onKeyDown 不起作用

    我正在尝试在我的 Android 应用程序中注册方向键 电视遥控器方向键点击 我目前正在使用 Android 模拟器进行测试 并尝试在额外设置菜单下使用方向键输入进行单击 但我不确定为什么这不起作用 任何帮助将不胜感激 public cla
  • 如何使用 Java 复制文件并将其粘贴到剪贴板?

    如何使用 Java 复制文件并将其粘贴到剪贴板 我的程序可以复制但不能粘贴 它给 线程 main 中的异常 java lang ClassCastException java util Arrays ArrayList 无法转换为 java
  • Plotly.js - gd.data 必须是一个数组

    我正在使用 Plotly js 库来绘制 3D 图形 我的计划是将 4 条迹线绘制到一张 3D 图中 但是当我尝试这样做时 我的网站遇到了一些奇怪的行为 有时 当我加载网站时 我没有收到任何错误 并且所有 4 条轨迹都完美加载到我的 3D
  • 将 GWT 应用程序部署为单个 JavaScript 文件

    GWT 应用程序的已编译 JavaScript 输出分为不同的文件 例如 缓存 html gwt rpc 托管 html nocache js 我知道这样做的目的是最小化必须由用户下载的 JavaScript 的大小 例如 Firefox
  • 更新表插入 VARBINARY 数据

    当我运行 sql 查询时 我得到如下信息 不允许从数据类型 varchar 到数据类型的隐式转换 varbinary 使用 CONVERT 函数运行此查询 严重程度 16 我想要插入的数据看起来像 000012000000000000100
  • 如何使用 Windows 任务计划程序自动执行 PowerShell 脚本?

    我有一个发送电子邮件的 PowerShell 脚本 我想每 1 分钟自动执行一次该脚本 我该如何使用任务计划程序来做到这一点 目前我已经创建了一个任务并提供了脚本的路径 但是该调度程序打开我的脚本 而不是执行 我使用的是 Windows 7
  • JDK8 是 JBoss 6 AS 支持的平台吗

    我们正在将应用程序 java 平台升级到最新的稳定平台 并且我们正在使用 Jboss 6 AS Is 甲骨文JDK8JBoss 6 AS 支持的平台 不 它不会起作用 JBoss AS 6和 7 不兼容Oracle JDK 1 8 您需要下
  • 阻止 GSON 序列化 JSON 字符串

    我是 gson 的新手 并且有一个尚未找到答案的新手问题 所以请耐心等待 StackOverflow 和 google 不是我的朋友 我有一个 java 类 User 其属性之一 externalProfile 是一个包含已序列化 JSON
  • 为什么 MFunctor 的“hoist”没有“Monad n”约束?

    我有一个协程变压器 data Step y m a Done a Yield y CoT y m a data CoT y m a CoT m Step y m a with Monad实例 unCoT CoT y m a gt m Ste
  • Laravel - 针对不同用户使用唯一参数重复输入

    使用 Laravel 5 2 我正在 Laravel 中开发一个电话簿项目 您将联系信息存储在名为的表中Contacts 要在此表中创建新联系人 您必须注册 并且您的信息将记录在users table 我创建了一个视图来显示Contacts
  • Firemonkey T编辑高度

    我正在使用 Delphi Seattle 我的应用程序适用于 Windows 桌面 我正在尝试更改 TEdit 的字体大小 因此高度也被修改 在设计时一切正常 但当我运行应用程序时 TEdit 会忽略高度修改并剪切文本 我试图找到Fixed
  • NEWSEQUENTIALID 的可预测性如何?

    根据微软的文档NEWSEQUENTIALID http msdn microsoft com en us library ms189786 aspx NEWSEQUENTIALID 的输出是可预测的 但可预测性如何呢 假设我有一个 GUID
  • sed 中的反向引用返回错误值

    我正在尝试使用 sed 替换表达式 正则表达式在 vim 中有效 但在 sed 中无效 我用斜杠替换数字前的最后一个破折号 所以 www file name 1 应该返回 www file name 1 我正在使用以下命令 但它一直输出 w
  • Apache Camel HTTP 显示请求和响应

    我正在使用 Apache Camel 将数据从 CSV 文件加载到 Web 服务 无论如何我可以显示请求和响应 下面是路由配置 我从数组中拆分并聚合 100 个项目 以作为 POST 正文发送 from fileLocation unmar
  • SqlAlchemy TIMESTAMP“更新时”额外

    我在 python3 4 3 上使用 SqlAlchemy 来管理 MySQL 数据库 我正在创建一个表 from datetime import datetime from sqlalchemy import Column text cr
  • 今日小部件扩展高度 - iOS10

    今日小部件视图模式的高度无法设置为紧凑模式 无论我设置什么值 它将小部件的高度设置为默认值 扩展模式工作完美 并且值已正确设置并反映在小部件中 我已经在 viewDidLoad 方法中添加了这一行 self extensionContext
  • 生产中使用 Flask-oauthlib 或 authlib?

    我刚刚在我正在构建的网站上使用了flask oauthlib 但是 我注意到该项目的首页有一个警告 要求改为使用 authlib 如果您在生产中使用flask oauthlib 您是否计划迁移到authlib 有谁知道一个完成此迁移的简单项
  • 流式源的查询必须使用 writeStream.start() 执行;

    我正在尝试在 Spark 中读取来自 kafka 版本 10 的消息并尝试打印它 import spark implicits val spark SparkSession builder appName StructuredNetwork