我遇到了数据块中记录损坏的问题。我们想要对损坏的记录进行计数,并将损坏的记录保存在特定位置作为增量表。
为此,我们正在阅读使用PERMISSIVE
并据此进行查询_corrupt_record
column.
我们在 Azure Databricks 中将 pyspark 与 Apache Spark 3.0.1 结合使用。
这是我们收到的错误消息:
从 Spark 2.3 开始,原始查询
当引用的列仅包含时,不允许使用 JSON/CSV 文件
内部损坏记录列(默认名称为 _corrupt_record)。
例如:
Spark.read.schema(schema).json(file).filter($"_corrupt_record".isNotNull).count()
和
Spark.read.schema(schema).json(file).select("_corrupt_record").show().
根据这个文档,如果要查询列损坏记录,则必须缓存或保存数据。
但我们不想在 ETL 中缓存数据。 ETL 用于在同一集群上运行的许多作业,我们可以将 150GB 的大文件作为输入。缓存数据可能会导致集群崩溃。
有没有办法查询这些损坏的记录without缓存数据?
#1 将数据保存在 blob 存储上可能是另一种选择,但这听起来开销很大。
#2 我们还尝试使用该选项BadRecordsPath
:将坏记录保存到BadRecordsPath并读回来以便统计,但是没有简单的方法可以知道坏记录文件是否已被写入(以及文件已写入哪个分区)。
分区看起来像/20210425T102409/bad_records
请参阅我的其他问题here
#3 另一种方法是从许可读取中减去 dropmalformed 读取。
例如:
dataframe_with_corrupt = spark.read.format('csv').option("mode", "PERMISSIVE").load(path)
dataframe_without_corrupt = spark.read.format('csv').option("mode", "DROPMALFORMED").load(path)
corrupt_df = dataframe_with_corrupt.exceptAll(dataframe_without_corrupt)
但我不确定它会比缓存占用更少的内存!
任何建议或意见将不胜感激!提前致谢