如何从 ADLS 将自定义数据帧写入 eventhub

2023-12-12

我想将自定义数据框写入 eventhub。

val customDf = spark.read.json("path/to/json")

EventHub ConnectionString
val connectionString = new com.microsoft.azure.eventhubs.ConnectionStringBuilder("Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxxxxxxxxxxxxxx=").setEventHubName("test")

val ehConf = EventHubsConf(connectionString.toString).setConsumerGroup("testing")
val eventhubSchema = spark.readStream.format("eventhubs").options(ehConf.toMap).option("eventhubs.partition.count", "4").load()

eventhubSchema.printSchema 

将显示 eventhub body 的默认架构

现在我想将上面的customDf写入eventhub

Method1:
    ds = customDf \
      .selectExpr("partitionKey", "body") \
      .writeStream \
      .format("eventhubs") \
      .options(ehConf.toMap) \
      .option("checkpointLocation", "///output.txt") \
      .start()

Method2:

ds = customDf \          
  .writeStream \
  .format("eventhubs") \
  .options(ehConf.toMap) \
  .option("checkpointLocation", "///output.txt") \
  .start()

如何将 customerEr 写入事件中心。我什至做了 select(get_json_object(cast to string type) 但我得到的是

org.apache.spark.sql.AnalysisException: cannot resolve 'body' given input columns 

如何将customDf写入eventhub


您需要将数据框中的数据转换为单个列对象(二进制或字符串),这实际上取决于您的消费者。最简单的方法是将所有数据打包为 JSON,使用以下组合to_json + struct功能:

import pyspark.sql.functions as F

stream = customDf \
      .select(F.to_json(F.struct("*")).alias("body")) \
      .writeStream \
      .format("eventhubs") \
      .options(ehConf.toMap) \
      .option("checkpointLocation", "...") \
      .start()
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何从 ADLS 将自定义数据帧写入 eventhub 的相关文章

随机推荐