我想将 Twitter 中的数据写入 Kafka。出于教育目的,我尝试使用结构化流来做到这一点。我创建了一个基于套接字源的 Twitter 源,它运行良好。
我按如下方式设置来源:
val tweets = spark
.readStream
.format("twitter")
.option("query", terms)
.load()
.as[SparkTweet]
这为我提供了一个很好的用于分析查询的数据集。伟大的!
接下来,我想将略微 Sparkified 模式中的每条推文持久保存到 Kafka 中:
val kafkaOutStream = tweets
.toJSON.as("value")
.writeStream
.queryName("stream_to_kafka")
.outputMode(OutputMode.Append())
.trigger(Trigger.ProcessingTime("1 second"))
.format("kafka")
.option("kafka.bootstrap.servers","localhost:9092")
.option("topic","tweets")
.start
这很容易!除此之外,它不起作用。在QueryExecution.scala
呼叫传递到assertSupported
最终被抛弃,因为
Exception in thread "main" org.apache.spark.sql.AnalysisException:
Queries with streaming sources must be executed with writeStream.start();;
我没想到toJSON
成为一个纯粹的批处理操作,但没有它,并使用 sayselect($"text" as "value")
相反,该代码将起作用。
现在,我有点惊讶,希望有人能解释为什么 toJSON 不应该与流兼容(这是一个错误吗?缺少功能吗?),并告诉我是否有一种结构化流方式来获取序列化表示我的目标是卡夫卡。