将 Spark 结构化流与 Confluence 架构注册表集成

2024-01-10

我在 Spark 结构化流中使用 Kafka Source 来接收 Confluence 编码的 Avro 记录。我打算使用Confluence SchemaRegistry,但与spark结构化流的集成似乎是不可能的。

我已经看到这个问题,但无法让它与融合模式注册表一起工作。使用 Spark 2.0.2 从 Kafka 读取 Avro 消息(结构化流) https://stackoverflow.com/questions/40705926/reading-avro-messages-from-kafka-with-spark-2-0-2-structured-streaming


我花了几个月的时间阅读源代码并进行测试。简而言之,Spark只能处理String和Binary序列化。您必须手动反序列化数据。在 Spark 中,创建 Confluence Rest 服务对象以获取架构。使用 Avro 解析器将响应对象中的架构字符串转换为 Avro 架构。接下来,像平常一样阅读 Kafka 主题。然后使用 Confluence KafkaAvroDeSerializer 映射二进制类型的“值”列。我强烈建议您了解这些类的源代码,因为这里发生了很多事情,因此为了简洁起见,我将省略许多细节。

//Used Confluent version 3.2.2 to write this. 
import io.confluent.kafka.schemaregistry.client.rest.RestService
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.Schema

case class DeserializedFromKafkaRecord(key: String, value: String)

val schemaRegistryURL = "http://127.0.0.1:8081"

val topicName = "Schema-Registry-Example-topic1"
val subjectValueName = topicName + "-value"

//create RestService object
val restService = new RestService(schemaRegistryURL)

//.getLatestVersion returns io.confluent.kafka.schemaregistry.client.rest.entities.Schema object.
val valueRestResponseSchema = restService.getLatestVersion(subjectValueName)

//Use Avro parsing classes to get Avro Schema
val parser = new Schema.Parser
val topicValueAvroSchema: Schema = parser.parse(valueRestResponseSchema.getSchema)

//key schema is typically just string but you can do the same process for the key as the value
val keySchemaString = "\"string\""
val keySchema = parser.parse(keySchemaString)

//Create a map with the Schema registry url.
//This is the only Required configuration for Confluent's KafkaAvroDeserializer.
val props = Map("schema.registry.url" -> schemaRegistryURL)

//Declare SerDe vars before using Spark structured streaming map. Avoids non serializable class exception.
var keyDeserializer: KafkaAvroDeserializer = null
var valueDeserializer: KafkaAvroDeserializer = null

//Create structured streaming DF to read from the topic.
val rawTopicMessageDF = sql.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "127.0.0.1:9092")
  .option("subscribe", topicName)
  .option("startingOffsets", "earliest")
  .option("maxOffsetsPerTrigger", 20)  //remove for prod
  .load()

//instantiate the SerDe classes if not already, then deserialize!
val deserializedTopicMessageDS = rawTopicMessageDF.map{
  row =>
    if (keyDeserializer == null) {
      keyDeserializer = new KafkaAvroDeserializer
      keyDeserializer.configure(props.asJava, true)  //isKey = true
    }
    if (valueDeserializer == null) {
      valueDeserializer = new KafkaAvroDeserializer
      valueDeserializer.configure(props.asJava, false) //isKey = false
    }

    //Pass the Avro schema.
    val deserializedKeyString = keyDeserializer.deserialize(topicName, row.key, keySchema).toString //topic name is actually unused in the source code, just required by the signature. Weird right?
    val deserializedValueString = valueDeserializer.deserialize(topicName, row.value, topicValueAvroSchema).toString

    DeserializedFromKafkaRecord(deserializedKeyString, deserializedValueString)
}

val deserializedDSOutputStream = deserializedTopicMessageDS.writeStream
    .outputMode("append")
    .format("console")
    .option("truncate", false)
    .start()
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

将 Spark 结构化流与 Confluence 架构注册表集成 的相关文章

随机推荐