我正在尝试在 Spark 中读取来自 kafka(版本 10)的消息并尝试打印它。
import spark.implicits._
val spark = SparkSession
.builder
.appName("StructuredNetworkWordCount")
.config("spark.master", "local")
.getOrCreate()
val ds1 = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topicA")
.load()
ds1.collect.foreach(println)
ds1.writeStream
.format("console")
.start()
ds1.printSchema()
线程“main”中出现错误异常
org.apache.spark.sql.AnalysisException:使用流源的查询
必须使用 writeStream.start();; 执行
您正在对查询计划进行分支:从您尝试执行的同一个 ds1 开始:
ds1.collect.foreach(...)
ds1.writeStream.format(...){...}
但你只是打电话.start()
在第二个分支上,让另一个分支悬空而没有终止,这反过来会引发您返回的异常。
解决方案是启动两个分支并等待终止。
val ds1 = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topicA")
.load()
val query1 = ds1.collect.foreach(println)
.writeStream
.format("console")
.start()
val query2 = ds1.writeStream
.format("console")
.start()
ds1.printSchema()
query1.awaitTermination()
query2.awaitTermination()
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)