从字符串文字推断 Spark 数据类型

2023-12-25

我正在尝试编写一个可以推断 Spark 的 Scala 函数数据类型 https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/types/DataType.html基于提供的输入字符串:

/**
 * Example:
 * ========
 * toSparkType("string")  =>    StringType
 * toSparkType("boolean") =>    BooleanType
 * toSparkType("date")    =>    DateType
 * etc.
 */
def toSparkType(inputType : String) : DataType = {
    var dt : DataType = null

    if(matchesStringRegex(inputType)) {
        dt = StringType
    } else if(matchesBooleanRegex(inputType)) {
        dt = BooleanType
    } else if(matchesDateRegex(inputType)) {
        dt = DateType
    } else if(...) {
        ...
    }

    dt
}

我的目标是支持可用的大部分(如果不是全部)DataTypes。当我开始实现这个功能时,我开始思考:“Spark/Scala 可能已经有一个 helper/util 方法可以为我做到这一点。“毕竟,我知道我可以做这样的事情:

var structType = new StructType()

structType.add("some_new_string_col", "string", true, Metadata.empty)
structType.add("some_new_boolean_col", "boolean", true, Metadata.empty)
structType.add("some_new_date_col", "date", true, Metadata.empty)

Scala 和/或 Spark 都会隐式转换我的"string"论证StringType等等。所以我问:我可以使用 Spark 或 Scala 做什么来帮助我实现转换器方法?


Spark/Scala 可能已经有一个 helper/util 方法可以为我做到这一点。

你说得对。 Spark 已经有自己的架构和数据类型推断代码,用于从底层数据源(csv、json 等)推断架构,因此您可以查看它来实现您自己的(实际实现被标记为 Spark 私有,并且是与 RDD 和内部类绑定在一起,因此不能直接从 Spark 外部的代码使用它,但应该可以让您了解如何使用它。)

鉴于 csv 是平面类型(并且 json 可以具有嵌套结构),csv 模式推断相对更直接,应该可以帮助您完成上面想要实现的任务。因此,我将解释 csv 推断的工作原理(json 推断只需要考虑可能的嵌套结构,但数据类型推断非常类似)。

有了序言,你想看的是CSV 推断架构 https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala#L41目的。特别是,看看infer方法需要一个RDD[Array[String]]并推断数组每个元素的数据类型横跨整个 RDD。它的做法是——它将每个字段标记为NullType首先,然后迭代下一行值(Array[String]) 在里面RDD它更新了已经推断出的DataType到一个新的DataType如果新的DataType更具体。这正在发生here https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala#L46:

val rootTypes: Array[DataType] =
      tokenRdd.aggregate(startType)(inferRowType(options), mergeRowTypes)

Now inferRowType calls https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala#L64 inferField对于行中的每个字段。inferField 执行 https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala#L80是您可能正在寻找的 - 它采用到目前为止为特定字段推断的类型以及当前行的字段的字符串值作为参数。然后,它返回现有的推断类型,或者推断的新类型是否比新类型更具体。

相关部分代码如下:

typeSoFar match {
        case NullType => tryParseInteger(field, options)
        case IntegerType => tryParseInteger(field, options)
        case LongType => tryParseLong(field, options)
        case _: DecimalType => tryParseDecimal(field, options)
        case DoubleType => tryParseDouble(field, options)
        case TimestampType => tryParseTimestamp(field, options)
        case BooleanType => tryParseBoolean(field, options)
        case StringType => StringType
        case other: DataType =>
          throw new UnsupportedOperationException(s"Unexpected data type $other")
      }

请注意,如果typeSoFar是 NullType 那么它首先尝试将其解析为Integer but tryParseIntegercall 是对较低类型解析的调用链。因此,如果它无法将值解析为 Integer 那么它将调用tryParseLong失败时将调用tryParseDecimal失败时将调用tryParseDoublew.o.f.w.i.tryParseTimestampw.o.f.w.itryParseBooleanw.o.f.w.i.最后stringType.

因此,您可以使用几乎类似的逻辑来实现您的任何用例。 (如果您不需要跨行合并,那么您只需实现所有tryParse*方法逐字并简单地调用tryParseInteger。无需编写自己的正则表达式。)

希望这可以帮助。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

从字符串文字推断 Spark 数据类型 的相关文章

随机推荐