如何一起使用SparkSession和StreamingContext?

2024-03-06

我正在尝试从本地计算机 (OSX) 上的文件夹流式传输 CSV 文件。我将 SparkSession 和 StreamingContext 一起使用,如下所示:

val sc: SparkContext = createSparkContext(sparkContextName)
val sparkSess = SparkSession.builder().config(sc.getConf).getOrCreate()
val ssc = new StreamingContext(sparkSess.sparkContext, Seconds(time))

val csvSchema = new StructType().add("field_name",StringType)
val inputDF = sparkSess.readStream.format("org.apache.spark.csv").schema(csvSchema).csv("file:///Users/userName/Documents/Notes/MoreNotes/tmpFolder/")

如果我跑ssc.start()之后,我收到此错误:

java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute

相反,如果我尝试启动SparkSession像这样:

inputDF.writeStream.format("console").start()

I get:

java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.

显然我不明白如何SparkSession and StreamingContext应该一起工作。如果我摆脱SparkSession, StreamingContext只有textFileStream我需要在其上强加 CSV 模式。如果您能就如何实现此功能提供任何说明,我们将不胜感激。


Spark 会话和 Spark 上下文不能同时存在。随着 Spark 2.0.0 的发布,开发人员可以使用一个新的抽象——Spark Session——它可以像以前可用的 Spark Context 一样被实例化和调用。

您仍然可以从 Spark 会话生成器访问 Spark 上下文:

 val sparkSess = SparkSession.builder().appName("My App").getOrCreate()
 val sc = sparkSess.sparkContext
 val ssc = new StreamingContext(sc, Seconds(time))

导致您的工作失败的另一件事是您正在执行转换但没有调用任何操作。最后应该调用一些操作,例如 inputDF.show()

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何一起使用SparkSession和StreamingContext? 的相关文章

  • Scala SBT 版本依赖性二进制兼容性错误 scala-xml

    我有一个在 GitHub 上托管的项目 我使用 scala steward 来保持我的插件和依赖项最新 这在一段时间内有效 但现在使用此类自动更新却变成了一场噩梦 事情是这样的 在我的plugins sbt中 我依赖于scoverage 它
  • Spark 中的 RDD 和 Dataframe 有什么区别? [复制]

    这个问题在这里已经有答案了 嗨 我对 apache Spark 比较陌生 我想了解 RDD 数据帧和数据集之间的区别 例如 我正在从 s3 存储桶中提取数据 df spark read parquet s3 output unattribu
  • 对列表中的相邻元素进行分组

    假设我想编写一个函数来执行此操作 输入 1 1 3 3 4 2 2 5 6 6 输出 1 1 3 3 4 2 2 5 6 6 它将相同的相邻元素分组 这个方法的名称应该是什么 此操作有标准名称吗 In 1 1 3 3 4 2 2 5 6 6
  • Spark Workers 上缺少 SLF4J 记录器

    我正在尝试通过以下方式运行工作spark submit 此作业导致的错误是 Exception in thread main java lang NoClassDefFoundError org slf4j Logger at java l
  • 为什么我不能将 Scala 的 Function1 隐式转换为 java.util.function.Function?

    我正在尝试创建 Scala Function1 到 java util function Function 的隐式转换 这是我的代码 object Java8ToScala extends App implicit def javaFunc
  • 将额外的参数传递给多态函数?

    我有一个多态函数 可以将列表转换为集合 import shapeless PolyDefns gt import shapeless val lists List 1 2 List A B List 1 1 2 2 HNil object
  • Scala 'null' 是否算作另一种类型的实例?

    我有这个代码 class MyLinkedList T h T tail MyLinkedList T def prepend v T MyLinkedList T new MyLinkedList v this 我想知道我如何可以将第二个
  • 如何列出所有 sbt 依赖项?

    我需要列出所有 sbt 依赖项 以便检查是否已存在 debian 软件包 我还注意到有一个 DEB 包 http www scala sbt org 0 13 tutorial Installing sbt on Linux html但似乎
  • 用 HashMap[Int, Vector[Int]] (Scala) 表示图(邻接列表)?

    我想知道如何 如果可能的话 我可以通过以下方式制作 可变 图的邻接列表表示HashMap Int Vector Int HashMap当然是可变的 目前我将其设置为HashMap Int ArrayBuffer Int 但我可以更改 Arr
  • 如何从本地模式下运行的 pyspark 中的 S3 读取数据?

    我正在使用 PyCharm 2018 1 使用 Python 3 4 并通过 virtualenv 中的 pip 安装 Spark 2 3 本地主机上没有安装hadoop 因此没有安装Spark 因此没有SPARK HOME HADOOP
  • Scala 条件列表构造

    我正在使用 Scala 2 9 2 并且想根据某些条件构建一个列表 考虑以下情况 其中 cond 是采用谓词 p 和类型 T 的值 在本例中为 t3 的某个函数 t1 t2 cond p t3 t4 我想要的行为如下 如果 p 为真 则应给
  • Spark 与 Webhdfs/httpfs

    我想通过 httpfs 或 Webhdfs 将文件从 HDFS 读入 Spark 类似的东西 sc textFile webhdfs myhost 14000 webhdfs v1 path to file txt 或者 理想情况下 sc
  • 将 Spark 添加到 Oozie 共享库

    默认情况下 Oozie 共享 lib 目录提供 Hive Pig 和 Map Reduce 的库 如果我想在 Oozie 上运行 Spark 作业 最好将 Spark lib jar 添加到 Oozie 的共享库 而不是将它们复制到应用程序
  • Scala 中缺少多重集吗?

    我正在尝试 Scala 中的 Facebook Hacker Cup 2013 资格赛问题 对于第三个问题 我觉得需要一个有序的 Multiset 但在 scala 的 2 10 集合中找不到一个 scala 的集合中是否缺少此数据结构 会
  • Spark、pyspark中从TF-IDF到LDA聚类

    我正在尝试对存储在格式键 listofwords 中的推文进行聚类 我的第一步是使用 dataframe 提取单词列表的 TF IDF 值 dbURL hdfs pathtodir file sc textFile dbURL Define
  • Scala:将整个列表的 Either 与每个元素的 Either 组合

    我有一个 Either 列表 它代表错误 type ErrorType List String type FailFast A Either ErrorType A import cats syntax either val l List
  • Scala 警告、IntelliJ 和编译器标志

    我目前正在试用 IntelliJ Scala 插件 有件事让我有点烦恼 编译时我收到 3 个警告 Warning scala Recompiling 4 files Warning scala Warning scala there wer
  • Scala REPL / SBT Console 是否有配置文件?

    我一直在尝试找到某种点文件来放入 Scala REPL 设置和自定义函数 我特别有兴趣传递它的标志 例如 Dscala color 启用语法突出显示 以及覆盖设置 如结果字符串截断 scala gt power scala gt vals
  • Apache Spark 中的高效字符串匹配

    我使用 OCR 工具从屏幕截图中提取文本 每个大约 1 5 句话 然而 当手动验证提取的文本时 我注意到时不时会出现一些错误 鉴于文本 你好 我真的很喜欢 Spark 我注意到 1 像 I 和 l 这样的字母被 替换 2 表情符号未被正确提
  • 使用 PySpark 从 azure blob 存储读取 csv 文件

    我正在尝试使用 Microsoft Azure 上的 PySpark HDInsight 集群来做一个机器学习项目 要在我的集群上进行操作 请使用 Jupyter 笔记本 另外 我的数据 一个 csv 文件 存储在 Azure Blob 存

随机推荐

  • 使用 sqlplus 连接到 Oracle 数据库

    我在 Unix 环境中使用以下命令连接 Oracle 数据库 sqlplus test test DESCRIPTION ADDRESS LIST ADDRESS PROTOCOL TCP HOST hostname com PORT 15
  • 无返回值的条件运算符

    我有这个代码 bool value false if value Console Write true else Console Write false 我想通过使用条件运算符来缩短它 但我找不到正确的语法 bool value false
  • 批处理文件保留十行之一

    我有一个包含 n 行的文件 n 超过 1 亿 我想输出一个仅包含 10 行中的 1 行的文件 我无法将文件分成十部分并只保留一部分 因为它必须更加随机 后来我必须进行统计分析 我不能在数据中产生强烈的偏差 我正在考虑读取文件 并为每条记录如
  • Kubernetes 仪表板显示未经授权

    我使用 KUBEADM 工具 IN LOCAL 配置了具有 1 个主节点和 4 个工作节点的 kubernetes 集群 所有节点都运行良好 部署了一个应用程序并能够从浏览器访问该应用程序 我尝试了很多方法使用 kubectl 创建仪表板
  • 如何在 Xcode 中将 OpenSSL 与我的应用程序静态链接?

    我正在使用使用 OpenSSL 加密库的第三方代码 自 OSX 10 7 起 Apple 已弃用 OpenSSL 转而采用他们自己的解决方案 由于我没有加密编码经验 因此我无法移植第三方代码以使用通用加密 因此 为了防止 Apple 稍后可
  • 如何删除字符串中的尾随空格和嵌入空格?

    我正在编写一个程序 将国内和国际帐号转换为 IBAN 号码 首先 我需要形成一个字符串 银行 ID 分行 ID 帐号 ISO 国家 地区代码 这些字段中可能存在尾随空格 但并非每个帐号都具有相同的长度 有些帐号具有分支标识符 而其他帐号则没
  • shared_ptr 析构函数、复制和不完整类型

    我有一个头文件foo h像这样 无关的东西省略 pragma once include
  • 互换使用 str 和 String

    假设我正在尝试使用 Rust 做一个奇特的零拷贝解析器 str 但有时我需要修改文本 例如实现变量替换 我真的想做这样的事情 fn main let mut v Vec lt str gt Hello there world split w
  • 单行 FFMPEG cmd 合并视频/音频并保留两个音频

    我有一个项目需要将视频文件与另一个音频文件合并 预期输出是一个视频文件 其中包含实际视频中的音频和合并的音频文件 输出视频文件的长度将与实际视频文件的大小相同 是否有单行 FFMPEG 命令可以使用 copy 和 map 参数来实现此目的
  • Android 上的 OkHttp PublicKey 固定

    有谁知道我们如何使用 OkHttp3 实现公钥固定 一直在阅读有关 SSL 固定的内容 我发现我们可以使用证书固定或公钥固定 后者似乎更灵活 来实现 但我只能找到例子证书固定 https github com square okhttp w
  • WSL (Ubuntu):如何从 bash 终端在浏览器中打开 localhost

    我正在尝试打开http localhost http localhost在 WSL bash 终端的 任何 浏览器中 到目前为止我已经尝试过 如何从 URL localhost 3000 的终端打开 Google Chrome https
  • iOS 的日志框架? [关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 iOS 是否有一个日志框架可以帮助开发人员诊断应用程序崩溃 您可能喜欢 伐木工人 https gith
  • Bootstrap 网格列相互重叠

    我对 Bootstrap 的网格布局和其中的列重叠有疑问 我不确定问题到底是什么 任何建议将不胜感激 谢谢 div class container div class row div class col md 6 img src conte
  • 错误提示“.class 文件中的版本错误”

    我使用了JSP代码并在tomcat5 5服务器下执行了相同的操作 它工作被罚款 现在我已经将相同的代码复制到其他系统的tomcat服务器下 但是在提交该 jsp 文件时 收到以下错误 导致错误的原因可能是什么 请指教 root cause
  • 使 d3.js 可视化布局响应式的最佳方法是什么?

    假设我有一个直方图脚本 可以构建 960 500 svg 图形 我如何使其响应 以便调整图形宽度和高度是动态的
  • 如何快速学习Java RMI [关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 我有一个 Java 应用程序 我已经开发了一两年了 我想创建一组非常简单的接口 以后可能会增加复杂性 我可以使用它从另一个 JVM 例
  • Jmeter - 如何向计数器添加前缀。类似于随机变量

    目前 在随机变量中 我可以在变量的输出格式中添加前缀或后缀 然而 这个很好的功能不适用于简单的计数器控制器 每次使用变量时连接 string counter 对我来说并不是一个好的选择 因为我经常这样做 有没有办法以随机变量的方式实现前缀
  • 对内置数据类型使用前向声明

    我明白 只要有可能 我们就应该使用前向声明而不是包含来加快编译速度 我有课Person像这样 pragma once include
  • 如何将 TWSocket 的 OnDataAvailable() 事件推送到 Delphi 6 应用程序中的后台线程?

    我有一个 Delphi 6 应用程序 它使用 ICS 组件套件进行套接字通信 我有自己的服务器套接字 VCL 组件 当新会话可用时 它会创建客户端 TWSocket 套接字 我创建的客户端套接字确实将 Multithreaded 属性设置为
  • 如何一起使用SparkSession和StreamingContext?

    我正在尝试从本地计算机 OSX 上的文件夹流式传输 CSV 文件 我将 SparkSession 和 StreamingContext 一起使用 如下所示 val sc SparkContext createSparkContext spa