如何将Kafka中的记录显示到控制台?

2024-02-04

我正在学习结构化流,但无法将输出显示到控制台。

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types._
import org.apache.spark.sql.streaming.ProcessingTime

object kafka_stream {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder()
      .appName("kafka-consumer")
      .master("local[*]")
      .getOrCreate()
    import spark.implicits._

    spark.sparkContext.setLogLevel("WARN")

//    val schema = StructType().add("a", IntegerType()).add("b", StringType())

    val schema = StructType(Seq(
      StructField("a", IntegerType, true),
      StructField("b", StringType, true)
    ))

    val df = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "172.21.0.187:9093")
    .option("subscribe", "test")
    .option("startingOffsets", "earliest")
    .load()


    val values = df.selectExpr("CAST(value AS STRING)").as[String]

    values.writeStream
      .outputMode("append")
      .format("console")
      .start()
      .awaitTermination()


  }


}

我对卡夫卡的输入

my name is abc how are you ?

我只想将 Kafka 中的字符串显示到 Spark 控制台


None

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何将Kafka中的记录显示到控制台? 的相关文章

随机推荐