我正在研究一个代表事件流的数据集(例如从网站跟踪事件时触发)。所有事件都有一个时间戳。我们经常遇到的一个用例是尝试查找给定字段的第一个非空值。例如,类似的东西最能让我们到达那里:
val eventsDf = spark.read.json(jsonEventsPath)
case class ProjectedFields(visitId: String, userId: Int, timestamp: Long ... )
val projectedEventsDs = eventsDf.select(
eventsDf("message.visit.id").alias("visitId"),
eventsDf("message.property.user_id").alias("userId"),
eventsDf("message.property.timestamp"),
...
).as[ProjectedFields]
projectedEventsDs.groupBy($"visitId").agg(first($"userId", true))
上面代码的问题在于输入数据的顺序first
不保证聚合功能。我希望它按以下方式排序timestamp
确保它是时间戳的第一个非空 userId,而不是任何随机的非空 userId。
有没有办法定义分组内的排序?
使用 Spark 2.10
顺便说一句,Spark 2.10 中建议的方式SPARK DataFrame:选择每组的第一行 https://stackoverflow.com/questions/33878370/spark-dataframe-select-the-first-row-of-each-group是在分组之前进行排序——这是行不通的。例如下面的代码:
case class OrderedKeyValue(key: String, value: String, ordering: Int)
val ds = Seq(
OrderedKeyValue("a", null, 1),
OrderedKeyValue("a", null, 2),
OrderedKeyValue("a", "x", 3),
OrderedKeyValue("a", "y", 4),
OrderedKeyValue("a", null, 5)
).toDS()
ds.orderBy("ordering").groupBy("key").agg(first("value", true)).collect()
有时会回来Array([a,y])
有时Array([a,x])