如何查找组中第一个非空值? (使用dataset api进行二次排序)

2024-05-04

我正在研究一个代表事件流的数据集(例如从网站跟踪事件时触发)。所有事件都有一个时间戳。我们经常遇到的一个用例是尝试查找给定字段的第一个非空值。例如,类似的东西最能让我们到达那里:

val eventsDf = spark.read.json(jsonEventsPath) 

case class ProjectedFields(visitId: String, userId: Int, timestamp: Long ... )

val projectedEventsDs = eventsDf.select(
    eventsDf("message.visit.id").alias("visitId"),
    eventsDf("message.property.user_id").alias("userId"),
    eventsDf("message.property.timestamp"),

    ...

).as[ProjectedFields]

projectedEventsDs.groupBy($"visitId").agg(first($"userId", true))

上面代码的问题在于输入数据的顺序first不保证聚合功能。我希望它按以下方式排序timestamp确保它是时间戳的第一个非空 userId,而不是任何随机的非空 userId。

有没有办法定义分组内的排序?

使用 Spark 2.10


顺便说一句,Spark 2.10 中建议的方式SPARK DataFrame:选择每组的第一行 https://stackoverflow.com/questions/33878370/spark-dataframe-select-the-first-row-of-each-group是在分组之前进行排序——这是行不通的。例如下面的代码:

case class OrderedKeyValue(key: String, value: String, ordering: Int)
val ds = Seq(
  OrderedKeyValue("a", null, 1), 
  OrderedKeyValue("a", null, 2), 
  OrderedKeyValue("a", "x", 3), 
  OrderedKeyValue("a", "y", 4), 
  OrderedKeyValue("a", null, 5)
).toDS()

ds.orderBy("ordering").groupBy("key").agg(first("value", true)).collect()

有时会回来Array([a,y])有时Array([a,x])


Use my belovedwindows(...并体验您的生活变得多么简单!)

import org.apache.spark.sql.expressions.Window
val byKeyOrderByOrdering = Window
  .partitionBy("key")
  .orderBy("ordering")
  .rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)

import org.apache.spark.sql.functions.first
val firsts = ds.withColumn("first",
  first("value", ignoreNulls = true) over byKeyOrderByOrdering)

scala> firsts.show
+---+-----+--------+-----+
|key|value|ordering|first|
+---+-----+--------+-----+
|  a| null|       1|    x|
|  a| null|       2|    x|
|  a|    x|       3|    x|
|  a|    y|       4|    x|
|  a| null|       5|    x|
+---+-----+--------+-----+

注意:不知何故,Spark 2.2.0-SNAPSHOT(今天构建)无法给我正确的答案,没有rangeBetween我认为这应该是默认的无界范围。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何查找组中第一个非空值? (使用dataset api进行二次排序) 的相关文章

随机推荐