调用一个函数,每个元素都是 Databricks 中的一个流

2023-12-12

我在 Databricks 中有一个 DataFrame 流,我想对每个元素执行一个操作。在网上我找到了特定目的的方法,比如将其写入控制台或转储到内存中,但我想添加一些业务逻辑,并将一些结果放入Redis中。

更具体地说,这就是非流情况下的样子:

val someDataFrame = Seq(
  ("key1", "value1"),
  ("key2", "value2"),
  ("key3", "value3"),
  ("key4", "value4")
).toDF()

def someFunction(keyValuePair: (String, String)) = {
  println(keyValuePair)
}

someDataFrame.collect.foreach(r => someFunction((r(0).toString, r(1).toString)))

但如果someDataFrame不是一个简单的数据帧而是一个流数据帧(确实来自Kafka),错误信息是这样的:

org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;

有人可以帮我解决这个问题吗?

一些重要的注意事项:

  • 我已阅读相关文档,例如 Spark Streaming 或 Databricks Streaming 以及其他一些描述。

  • 我知道一定有类似的东西start() and awaitTermination,但我不知道确切的语法。这些描述没有帮助。

  • 列出我尝试过的所有可能性需要好几页,所以我宁愿不提供它们。

  • I do not想要解决显示结果的具体问题。 IE。请不要提供此特定案例的解决方案。这someFunction看起来像这样:

val someData = readSomeExternalData()
if (condition containing keyValuePair and someData) {
  doSomething(keyValuePair);
}

(问题Spark 结构化流中 ForeachWriter 的用途是什么?没有提供有效的示例,因此没有回答我的问题。)


下面是一个使用 foreachBatch 读取数据的示例,通过流 api 将每个项目保存到 Redis。

与之前的问题相关(DataFrame 到 RDD[(String, String)] 转换)

// import spark and spark-redis
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.streaming._
import org.apache.spark.sql.types._

import com.redislabs.provider.redis._

// schema of csv files
val userSchema = new StructType()
    .add("name", "string")
    .add("age", "string")

// create a data stream reader from a dir with csv files
val csvDF = spark
  .readStream
  .format("csv")
  .option("sep", ";")
  .schema(userSchema)
  .load("./data") // directory where the CSV files are 

// redis
val redisConfig = new RedisConfig(new RedisEndpoint("localhost", 6379))
implicit val readWriteConfig: ReadWriteConfig = ReadWriteConfig.Default

csvDF.map(r => (r.getString(0), r.getString(0))) // converts the dataset to a Dataset[(String, String)]
  .writeStream // create a data stream writer
  .foreachBatch((df, _) => sc.toRedisKV(df.rdd)(redisConfig)) // save each batch to redis after converting it to a RDD
  .start // start processing
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

调用一个函数,每个元素都是 Databricks 中的一个流 的相关文章

随机推荐