Spark 2数据集空值异常

2023-12-24

在 Spark Dataset.filter 中出现此 null 错误

输入 CSV:

name,age,stat
abc,22,m
xyz,,s

工作代码:

case class Person(name: String, age: Long, stat: String)

val peopleDS = spark.read.option("inferSchema","true")
  .option("header", "true").option("delimiter", ",")
  .csv("./people.csv").as[Person]
peopleDS.show()
peopleDS.createOrReplaceTempView("people")
spark.sql("select * from people where age > 30").show()

失败代码(添加以下行返回错误):

val filteredDS = peopleDS.filter(_.age > 30)
filteredDS.show()

返回空错误

java.lang.RuntimeException: Null value appeared in non-nullable field:
- field (class: "scala.Long", name: "age")
- root class: "com.gcp.model.Person"
If the schema is inferred from a Scala tuple/case class, or a Java bean, please try to use scala.Option[_] or other nullable types (e.g. java.lang.Integer instead of int/scala.Int).

你得到的异常应该可以解释一切,但让我们一步一步地进行:

  • 当使用加载数据时csv数据源所有字段都标记为nullable:

    val path: String = ???
    
    val peopleDF = spark.read
      .option("inferSchema","true")
      .option("header", "true")
      .option("delimiter", ",")
      .csv(path)
    
    peopleDF.printSchema
    
    root
    |-- name: string (nullable = true)
    |-- age: integer (nullable = true)
    |-- stat: string (nullable = true)
    
  • 缺失字段表示为 SQLNULL

    peopleDF.where($"age".isNull).show
    
    +----+----+----+
    |name| age|stat|
    +----+----+----+
    | xyz|null|   s|
    +----+----+----+
    
  • 接下来你转换Dataset[Row] to Dataset[Person]它使用Long编码age field. Long在 Scala 中不能null。因为输入模式是nullable,输出模式保持不变nullable尽管如此:

    val peopleDS = peopleDF.as[Person]
    
    peopleDS.printSchema
    
    root
     |-- name: string (nullable = true)
     |-- age: integer (nullable = true)
     |-- stat: string (nullable = true)
    

    请注意,它as[T]根本不影响架构。

  • 当你查询时Dataset使用 SQL(在注册表上)或DataFrameSpark API 不会反序列化该对象。由于架构仍然是nullable我们可以执行:

    peopleDS.where($"age" > 30).show
    
    +----+---+----+
    |name|age|stat|
    +----+---+----+
    +----+---+----+
    

    没有任何问题。这只是一个简单的 SQL 逻辑NULL是一个有效值。

  • 当我们使用静态类型时Dataset API:

    peopleDS.filter(_.age > 30)
    

    Spark 必须反序列化该对象。因为Long不可能是null (SQL NULL)它失败了,但你已经看到了异常。

    如果不是因为这个,你就会得到 NPE。

  • 应使用正确的数据静态类型表示Optional types:

    case class Person(name: String, age: Option[Long], stat: String)
    

    具有调节过滤功能:

    peopleDS.filter(_.age.map(_ > 30).getOrElse(false))
    
    +----+---+----+
    |name|age|stat|
    +----+---+----+
    +----+---+----+
    

    如果您愿意,可以使用模式匹配:

    peopleDS.filter {
      case Some(age) => age > 30
      case _         => false     // or case None => false
    }
    

    请注意,您不必(但无论如何建议)使用可选类型name and stat。因为斯卡拉String只是一个JavaString有可能null。当然,如果您采用这种方法,您必须显式检查访问的值是否是null or not.

Related Spark 2.0 数据集与 DataFrame https://stackoverflow.com/q/40596638/6910411

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark 2数据集空值异常 的相关文章

随机推荐