Spark/Scala 可能已经有一个 helper/util 方法可以为我做到这一点。
你说得对。 Spark 已经有自己的架构和数据类型推断代码,用于从底层数据源(csv、json 等)推断架构,因此您可以查看它来实现您自己的(实际实现被标记为 Spark 私有,并且是与 RDD 和内部类绑定在一起,因此不能直接从 Spark 外部的代码使用它,但应该可以让您了解如何使用它。)
鉴于 csv 是平面类型(并且 json 可以具有嵌套结构),csv 模式推断相对更直接,应该可以帮助您完成上面想要实现的任务。因此,我将解释 csv 推断的工作原理(json 推断只需要考虑可能的嵌套结构,但数据类型推断非常类似)。
有了序言,你想看的是CSV 推断架构 https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala#L41目的。特别是,看看infer
方法需要一个RDD[Array[String]]
并推断数组每个元素的数据类型横跨整个 RDD。它的做法是——它将每个字段标记为NullType
首先,然后迭代下一行值(Array[String]
) 在里面RDD
它更新了已经推断出的DataType
到一个新的DataType
如果新的DataType
更具体。这正在发生here https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala#L46:
val rootTypes: Array[DataType] =
tokenRdd.aggregate(startType)(inferRowType(options), mergeRowTypes)
Now inferRowType
calls https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala#L64 inferField
对于行中的每个字段。inferField
执行 https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala#L80是您可能正在寻找的 - 它采用到目前为止为特定字段推断的类型以及当前行的字段的字符串值作为参数。然后,它返回现有的推断类型,或者推断的新类型是否比新类型更具体。
相关部分代码如下:
typeSoFar match {
case NullType => tryParseInteger(field, options)
case IntegerType => tryParseInteger(field, options)
case LongType => tryParseLong(field, options)
case _: DecimalType => tryParseDecimal(field, options)
case DoubleType => tryParseDouble(field, options)
case TimestampType => tryParseTimestamp(field, options)
case BooleanType => tryParseBoolean(field, options)
case StringType => StringType
case other: DataType =>
throw new UnsupportedOperationException(s"Unexpected data type $other")
}
请注意,如果typeSoFar
是 NullType 那么它首先尝试将其解析为Integer
but tryParseInteger
call 是对较低类型解析的调用链。因此,如果它无法将值解析为 Integer 那么它将调用tryParseLong
失败时将调用tryParseDecimal
失败时将调用tryParseDouble
w.o.f.w.i.tryParseTimestamp
w.o.f.w.itryParseBoolean
w.o.f.w.i.最后stringType
.
因此,您可以使用几乎类似的逻辑来实现您的任何用例。 (如果您不需要跨行合并,那么您只需实现所有tryParse*
方法逐字并简单地调用tryParseInteger
。无需编写自己的正则表达式。)
希望这可以帮助。