如何在不使用 Azure Databricks 中的 Pyspark 缓存数据的情况下查询损坏记录?

2023-11-21

我遇到了数据块中记录损坏的问题。我们想要对损坏的记录进行计数,并将损坏的记录保存在特定位置作为增量表。 为此,我们正在阅读使用PERMISSIVE并据此进行查询_corrupt_record column.

我们在 Azure Databricks 中将 pyspark 与 Apache Spark 3.0.1 结合使用。

这是我们收到的错误消息:

从 Spark 2.3 开始,原始查询 当引用的列仅包含时,不允许使用 JSON/CSV 文件 内部损坏记录列(默认名称为 _corrupt_record)。 例如: Spark.read.schema(schema).json(file).filter($"_corrupt_record".isNotNull).count() 和 Spark.read.schema(schema).json(file).select("_corrupt_record").show().

根据这个文档,如果要查询列损坏记录,则必须缓存或保存数据。

enter image description here

但我们不想在 ETL 中缓存数据。 ETL 用于在同一集群上运行的许多作业,我们可以将 150GB 的大文件作为输入。缓存数据可能会导致集群崩溃。

有没有办法查询这些损坏的记录without缓存数据?

#1 将数据保存在 blob 存储上可能是另一种选择,但这听起来开销很大。

#2 我们还尝试使用该选项BadRecordsPath:将坏记录保存到BadRecordsPath并读回来以便统计,但是没有简单的方法可以知道坏记录文件是否已被写入(以及文件已写入哪个分区)。 分区看起来像/20210425T102409/bad_records

请参阅我的其他问题here

#3 另一种方法是从许可读取中减去 dropmalformed 读取。 例如:

dataframe_with_corrupt = spark.read.format('csv').option("mode", "PERMISSIVE").load(path)
dataframe_without_corrupt = spark.read.format('csv').option("mode", "DROPMALFORMED").load(path)

corrupt_df = dataframe_with_corrupt.exceptAll(dataframe_without_corrupt)

但我不确定它会比缓存占用更少的内存!

任何建议或意见将不胜感激!提前致谢


None

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在不使用 Azure Databricks 中的 Pyspark 缓存数据的情况下查询损坏记录? 的相关文章

随机推荐