我在 Databricks 中有一个 DataFrame 流,我想对每个元素执行一个操作。在网上我找到了特定目的的方法,比如将其写入控制台或转储到内存中,但我想添加一些业务逻辑,并将一些结果放入Redis中。
更具体地说,这就是非流情况下的样子:
val someDataFrame = Seq(
("key1", "value1"),
("key2", "value2"),
("key3", "value3"),
("key4", "value4")
).toDF()
def someFunction(keyValuePair: (String, String)) = {
println(keyValuePair)
}
someDataFrame.collect.foreach(r => someFunction((r(0).toString, r(1).toString)))
但如果someDataFrame
不是一个简单的数据帧而是一个流数据帧(确实来自Kafka),错误信息是这样的:
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
有人可以帮我解决这个问题吗?
一些重要的注意事项:
我已阅读相关文档,例如 Spark Streaming 或 Databricks Streaming 以及其他一些描述。
我知道一定有类似的东西start()
and awaitTermination
,但我不知道确切的语法。这些描述没有帮助。
列出我尝试过的所有可能性需要好几页,所以我宁愿不提供它们。
I do not想要解决显示结果的具体问题。 IE。请不要提供此特定案例的解决方案。这someFunction
看起来像这样:
val someData = readSomeExternalData()
if (condition containing keyValuePair and someData) {
doSomething(keyValuePair);
}
(问题Spark 结构化流中 ForeachWriter 的用途是什么?没有提供有效的示例,因此没有回答我的问题。)