我正在尝试从本地计算机 (OSX) 上的文件夹流式传输 CSV 文件。我将 SparkSession 和 StreamingContext 一起使用,如下所示:
val sc: SparkContext = createSparkContext(sparkContextName)
val sparkSess = SparkSession.builder().config(sc.getConf).getOrCreate()
val ssc = new StreamingContext(sparkSess.sparkContext, Seconds(time))
val csvSchema = new StructType().add("field_name",StringType)
val inputDF = sparkSess.readStream.format("org.apache.spark.csv").schema(csvSchema).csv("file:///Users/userName/Documents/Notes/MoreNotes/tmpFolder/")
如果我跑ssc.start()
之后,我收到此错误:
java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
相反,如果我尝试启动SparkSession
像这样:
inputDF.writeStream.format("console").start()
I get:
java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
显然我不明白如何SparkSession
and StreamingContext
应该一起工作。如果我摆脱SparkSession
, StreamingContext
只有textFileStream
我需要在其上强加 CSV 模式。如果您能就如何实现此功能提供任何说明,我们将不胜感激。