使用spark窗口函数获取最后一个值

2023-12-29

假设我有一个像这样的数据框。

val df = sc.parallelize(Seq(
            (1.0, 1,"Matt"), 
            (1.0, 2,"John"),
            (1.0, 3,null.asInstanceOf[String]),
            (-1.0, 2,"Adam"), 
            (-1.0, 4,"Steve"))
          ).toDF("id", "timestamp","name")

我想获取按时间戳排序的每个 id 的最后一个非空值。这是我的窗户

val partitionWindow = Window.partitionBy($"id").orderBy($"timestamp".desc)

我正在创建一个独特的窗口数据

val filteredDF = df.filter($"name".isNotNull).withColumn("firstName", first("name") over (partitionWindow)).drop("timestamp","name").distinct

并将其连接回实际数据

val joinedDF = df.join(filteredDF, windowDF.col("id") === filteredDF.col("id")).drop(filteredDF.col("id"))

joinedDF.show()

它工作正常,但我不喜欢这个解决方案,有人能给我建议更好的吗?

另外,谁能告诉我为什么最后一个功能不起作用?我尝试了这个,但结果不正确

 val partitionWindow = Window.partitionBy($"id").orderBy($"timestamp")

val windowDF = df.withColumn("lastName", last("name") over (partitionWindow))

如果您想传播最后一个已知值(它与使用的逻辑不同)join) 你应该:

  • ORDER BY timestamp.
  • Take last忽略nulls:
val partitionWindow = Window.partitionBy($"id").orderBy($"timestamp")

df.withColumn("lastName", last("name", true) over (partitionWindow)).show
// +----+---------+-----+--------+
// |  id|timestamp| name|lastName|
// +----+---------+-----+--------+
// |-1.0|        2| Adam|    Adam|
// |-1.0|        4|Steve|   Steve|
// | 1.0|        1| Matt|    Matt|
// | 1.0|        2| John|    John|
// | 1.0|        3| null|    John|
// +----+---------+-----+--------+

如果你想全局获取最后一个值:

  • ORDER BY timestamp.
  • 设置无界框架。
  • Take last忽略nulls:
val partitionWindow = Window.partitionBy($"id").orderBy($"timestamp")
  .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

df.withColumn("lastName", last("name", true) over (partitionWindow)).show
// +----+---------+-----+--------+
// |  id|timestamp| name|lastName|
// +----+---------+-----+--------+
// |-1.0|        2| Adam|   Steve|
// |-1.0|        4|Steve|   Steve|
// | 1.0|        1| Matt|    John|
// | 1.0|        2| John|    John|
// | 1.0|        3| null|    John|
// +----+---------+-----+--------+
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用spark窗口函数获取最后一个值 的相关文章

随机推荐

  • 使用反射获取属性的字符串名称

    有大量的反射示例可以让您获得 一个类中的所有属性 单个属性 前提是您知道字符串名称 有没有一种方法 使用反射 TypeDescriptor 或其他方式 在运行时获取类中属性的字符串名称 前提是我拥有 的只是类和属性的实例 我有一个类的实例
  • 如何将数据推送到 iPhone 应用程序?

    我是 iPhone 应用程序开发新手 我无法弄清楚如何将数据推送到应用程序 具体来说 我试图找到一种方法将新数据 用户帖子 从服务器推送到应用程序 而无需用户刷新 下拉刷新 有可能吗 有一个接近的解决方案 使用Apple推送通知服务 它允许
  • 连接字符串无法按预期工作[关闭]

    很难说出这里问的是什么 这个问题是含糊的 模糊的 不完整的 过于宽泛的或修辞性的 无法以目前的形式得到合理的回答 如需帮助澄清此问题以便重新打开 访问帮助中心 help reopen questions 我知道这是一个常见问题 但在寻找参考
  • aws_iam_policy 和 aws_iam_role_policy 之间的区别

    我有一个aws iam role我想添加一个策略 通常 我会创建一个策略aws iam role并将其附加到角色上aws iam role policy attachment 但是 我看过一些使用的文档aws iam role policy
  • 如何从另一个分支获取更改

    我目前正在研究featurex分支 我们的主分支被命名为our team 自从我开始工作以来featurex 对分支进行了更多更改our team 我在本地完成此操作是为了获取所有最新更改our team git checkout our
  • 将 PEM 证书解析为 JSON

    我有 PEM 证书并且正在使用openssl查看其内容 是否可以将输出解析为 JSON 格式 也许有一个 Java 库或 Bash 脚本可以做到这一点 命令 openssl x509 in sample cer noout text out
  • 无法将函数并行映射到 tarfile 成员

    我有一个包含 bz2 压缩文件的 tar 文件 我想应用该功能clean file到每个 bz2 文件 并整理结果 在系列中 使用循环很容易 import pandas as pd import json import os import
  • Python:将标志传递给函数

    很长一段时间以来 我一直试图找出将标志传递给 python 函数的最佳方法 最直接的方法是这样的 def func data flag1 flag2 flag3 func my data True False True 这确实很好而且简洁
  • Ajax div 刷新后 Jquery Masonry 相互加载

    我正在使用 ajax 刷新包含图像的 div 我最初使用砌体来添加布局 然后ajax调用返回一个js 使用html 方法刷新div 现在完成后我打电话masonry reloadItems 但砌体将所有图像加载到另一图像上 调整页面大小后
  • 如何在android中解析这个JSON数组

    我想要每个标签的名称 电子邮件和图像 我必须在列表元素中显示 response name Brajendra Mishra email email protected cdn cgi l email protection address I
  • 在 Python 中使用互斥锁和并发 future

    我有一些代码使用并发 future 连接到许多远程主机来运行一些命令 例如 def set host to host value connection connect to host info do something with conne
  • 从底部开始将元素附加到 div?

    我有以下代码 button click function parent append div element div parent height 200px width 100px border 1px solid ccc div div
  • MVVM 多视图

    我正在尝试学习 MVVM 到目前为止进展顺利 我偶然发现了一种我不知道如何实现的情况 我想要的 具有左侧导航和右侧详细信息窗格的视图 右侧详细信息将有一个内容容器 其中包含我的用户控件 以便通过左侧窗格选择视图 我拥有的 主视图模型 我数据
  • 是否需要注册兴趣才能写入 NIO 套接字来发送数据?

    是否需要注册兴趣才能写入 NIO 客户端套接字通道来发送数据 我必须总是打电话吗socketChannel register selector SelectionKey OP WRITE 或类似的东西 在写信给客户之前SocketChann
  • 在 Excel 中根据条件查找最大值或最小值

    在我的电子表格中 我有一列包含负值和正值 我需要获得所有正值中的最小值和所有负值中的最大值 我怎样才能这样做呢 使用数组公式 在以下示例中 您要检查的值位于A2 A10 最大负值 MAX IF A2 A10 lt 0 A2 A10 Pres
  • 使用Python将多个制表符分隔的文本文件插入MySQL?

    我正在尝试创建一个程序 它采用多个制表符分层文本文件 并一次处理一个文件 将它们保存的数据输入 MySQL 数据库 有几个文本文件 例如 movie txt 如下所示 1 Avatar 3 Iron Man 3 Star Trek 每个文本
  • Android - 通过网址启动谷歌地图

    在 iPhone 上 maps google com URL 被本机谷歌地图应用程序拦截并加载 我想在 Android 上执行相同的操作 但 Google 地图正在浏览器中加载 那么 在网页中 是否可以有这个url在 Android 中打开
  • 在 Objective C 中谁调用了 dealloc 方法以及何时调用?

    当在 Objective C 中创建自定义类时 何时以及如何创建dealloc方法调用 这是我必须在课堂上实施的事情吗 您永远不会直接发送 dealloc 消息 相反 对象的 dealloc 方法是通过 NSObject 协议的releas
  • WebStorm 终端颜色

    我的创意终端中出现了令人难以忍受的白色背景 网络风暴 有谁知道我该如何改变这个 打开 文件 gt 设置 然后转到 编辑器 部分 gt 颜色和字体 部分 在那里 您将看到 控制台颜色 选项 在这里 您将看到一个交互式编辑屏幕 以确定您希望这些
  • 使用spark窗口函数获取最后一个值

    假设我有一个像这样的数据框 val df sc parallelize Seq 1 0 1 Matt 1 0 2 John 1 0 3 null asInstanceOf String 1 0 2 Adam 1 0 4 Steve toDF