在 Spark 结构化流中创建 Dataframe forEachWriter 以插入 kudu 表问题

2024-01-29

我有一个问题,我尝试寻找解决方案,但无法找到任何解决方案,并且希望获得任何*指针。

所以我试图将 Spark 结构化流与 Apache Kudu 集成,我正在从 Kafka 读取流并进行一些处理,现在应该写入 Kudu 表,问题是 Spark 结构化流不提供对 Kudu 接收器的支持(即我知道吗?),并且我正在使用 foreach writer,但一旦尝试在“ForeachWriter.process()”内创建数据框,它就会挂起并且永远不会继续

import org.apache.spark.sql.ForeachWriter
val foreachWriter = new  ForeachWriter[Row] {


  override def open(partitionId: Long,version: Long): Boolean = {
    val mySchema = StructType(Array(
      StructField("id", IntegerType),
      StructField("value", DoubleType),
      StructField("EventTimestamp", TimestampType)
    ))
       true
  }

  override def process(value: Row): Unit = {
    println("values\n------------------")

    val spark = SparkSession.builder.appName("Spark-Kafka-Integrations").master("local").getOrCreate()
    val valRDD=spark.sparkContext.parallelize(value.toSeq)
    val valRDF=valRDD.map(x=>x.toString.split(",").to[List])
    println(value)

    val valDF=spark.createDataFrame(valRDF)
    valDF.show()
    println("End values\n///////////////////")
    //shoud insert into kudu here
   }

  override def close(errorOrNull: Throwable): Unit = {
   }
}
   //count is a Dstream/streaming dataframe

count.writeStream.foreach(foreachWriter).outputMode("complete") .option("truncate", "false").start().awaitTermination()

None

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在 Spark 结构化流中创建 Dataframe forEachWriter 以插入 kudu 表问题 的相关文章

随机推荐