我正在 Scala 阅读中编写 ETL Spark (2.4) 作业;
-S3 上使用 glob 模式分隔的 CSV 文件。数据加载到 DataFrame 中并包含一列(假设它名为custom
) 与 JSON 格式的字符串 (多层嵌套)。目标是自动从该列推断架构,以便可以为 S3 中的 Parquet 文件上的写入接收器构建它。
这个帖子 (如何使用 Spark DataFrames 查询 JSON 数据列?)建议schema_of_json
Spark 2.4 中的可以从 JSON 格式的列或字符串推断模式。
这是我尝试过的:
val jsonSchema: String = df.select(schema_of_json(col("custom"))).as[String].first
df.withColumn(
"nestedCustom",
from_json(col("custom"), jsonSchema, Map[String, String]())
)
但上面的方法不起作用并引发此异常:
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'schemaofjson(`custom`)' due to data type mismatch: The input json should be a string literal and not null; however, got `custom`.;;
'Project [schemaofjson(custom#7) AS schemaofjson(custom)#16]
请记住,我正在过滤掉空值custom
对于这个数据框。
EDIT:下面的整个代码。
import org.apache.spark.sql
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
/**
* RandomName entry point.
*
* @author Random author
*/
object RandomName {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder
.appName("RandomName")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.parquet.fs.optimized.committer.optimization-enabled", true)
.getOrCreate
import spark.implicits._
val randomName: RandomName = new RandomName(spark)
val df: sql.DataFrame = randomName.read().filter($"custom".isNotNull)
val jsonSchema: String = df.select(schema_of_json(col("custom"))).as[String].first
df.withColumn(
"nestedCustom",
from_json(col("custom"), jsonSchema, Map[String, String]())
)
df.show
spark.stop
}
}
class RandomName(private val spark: SparkSession) {
/**
* Reads CSV files from S3 and creates a sql.DataFrame.
*
* @return a sql.DataFrame
*/
def read(): sql.DataFrame = {
val tableSchema = StructType(
Array(
StructField("a", StringType, true),
StructField("b", StringType, true),
StructField("c", DateType, true),
StructField("custom", StringType, true)
))
spark.read
.format("csv")
.option("sep", ";")
.option("header", "true")
.option("inferSchema", "true")
.schema(tableSchema)
.load("s3://random-bucket/*")
}
}
JSON 的示例:
{
"lvl1": {
"lvl2a": {
"lvl3a": {
"lvl4a": "random_data",
"lvl4b": "random_data"
}
},
"lvl2b": {
"lvl3a": {
"lvl4a": "ramdom_data"
},
"lvl3b": {
"lvl4a": "random_data",
"lvl4b": "random_data"
}
}
}
}