我们正在处理无架构的 JSON 数据,有时 Spark 作业会失败,因为我们在 Spark SQL 中引用的某些列在一天中的某些时间不可用。在这些时间内,Spark 作业会失败,因为所引用的列在数据框中不可用。如何处理这种情况?我已经尝试过 UDF,但缺少太多列,因此无法真正检查每一列的可用性。我还尝试在更大的数据集上推断模式,并将其应用到数据帧上,期望缺失的列将填充空值,但模式应用程序失败并出现奇怪的错误。
请建议
这对我有用。创建了一个函数来检查所有预期的列并将列添加到数据帧(如果缺少)
def checkAvailableColumns(df: DataFrame, expectedColumnsInput: List[String]) : DataFrame = {
expectedColumnsInput.foldLeft(df) {
(df,column) => {
if(df.columns.contains(column) == false) {
df.withColumn(column,lit(null).cast(StringType))
}
else (df)
}
}
}
val expectedColumns = List("newcol1","newcol2","newcol3")
val finalDf = checkAvailableColumns(castedDateSessions,expectedColumns)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)