合并 Spark scala Dataframe 中的行

2024-02-20

合并 Spark Dataframe 中的行

我有如下数据

ID  Name    Passport    Country  License    UpdatedtimeStamp
1   Ostrich 12345       -       ABC         11-02-2018
1   -       -           -       BCD         10-02-2018
1   Shah    12345       -       -           12-02-2018
2   PJ      -           ANB     a           10-02-2018

所需输出为

ID  Name    Passport    Country  License    UpdatedtimeStamp
1   Shah    12345       -       ABC         12-02-2018
2   PJ      -           ANB     a           10-02-2018

基本上,数据相同ID应该合并,最新更新而不是null如果所有值都是,记录应该在输出中null, then null应该保留..

请建议...另外,建议不使用 SparkSQLWindow功能,因为我需要它非常快


如果你想完全留在sparkSQL中

val df= Seq((1,Some("ostrich"), Some(12345), None, Some("ABC")," 11-02-2018" ),
(1,None, None, None, Some("BCD"), "10-02-2018"),(1,Some("Shah"), Some(12345), None,None, "12-02-2018"),
(2,Some("PJ"), None, Some("ANB"), Some("a"), "10-02-2018")).toDF("ID","Name","Passport","Country","License","UpdatedtimeStamp")


val df1= df.withColumn("date", to_date($"UpdatedtimeStamp","MM-dd-yyyy" )).drop($"UpdatedtimeStamp")

val win = Window.partitionBy("ID").orderBy($"date".desc)

val df2=df1.select($"*", row_number.over(win).as("r")).orderBy($"ID", $"r").drop("r")
val exprs= df2.columns.drop(1).map(x=>collect_list(x).as(x+"_grp"))

val df3=df2.groupBy("ID").agg(exprs.head,exprs.tail: _*)

val exprs2= df3.columns.drop(1).map(x=> col(x)(0).as(x))

df3.select((Array(col(df2.columns(0)))++exprs2): _*).show


+---+----+--------+-------+-------+----------+
| ID|Name|Passport|Country|License|      date|
+---+----+--------+-------+-------+----------+
|  1|Shah|   12345|   null|    ABC|2018-12-02|
|  2|  PJ|    null|    ANB|      a|2018-10-02|
+---+----+--------+-------+-------+----------+
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

合并 Spark scala Dataframe 中的行 的相关文章

  • 如何在Python中流式传输和操作大数据文件

    我有一个相对较大 1 GB 的文本文件 我想通过跨类别求和来减小其大小 Geography AgeGroup Gender Race Count County1 1 M 1 12 County1 2 M 1 3 County1 2 M 2
  • 如何使用 zio-test 测试异常情况

    我有以下功能 我想测试 def people id Int RIO R People 如果有 People 则此函数返回 Peopleid 分别 如果没有则失败 例如 IO fail ServiceException s No People
  • scala 如何对元组进行排序?

    我试图了解 scala 如何处理元组的排序和排序 例如 如果我得到了列表 val l for i lt 1 to 5 yield i i 2 Vector 1 2 2 4 3 6 4 8 5 10 scala 知道如何对其进行排序 l so
  • 如何使用文本相似性删除 pandas 数据框中相似(不重复)的行?

    我有数千个数据 这些数据可能相似也可能不相似 使用 python 的默认函数 drop duplicates 并没有真正的帮助 因为它们只检测相似的数据 例如 如果我的数据包含类似以下内容怎么办 嗨 早上好 嗨 早上好 Python 不会将
  • 聚合函数在数据框中创建不需要的向量

    我在函数中创建数据帧时遇到了一个奇怪的问题 但是 在 data frame 之外使用相同的方法效果很好 这是基本函数 我用它来计算数据集的平均值 标准差和标准误差 aggregateX lt function formula dataset
  • SBT Scaladoc 配置

    我正在尝试在 SBT 中配置 Scaladoc 特别是标题 输出目录和类路径 我通过将以下内容添加到 build sbt 来定义标题 scalacOptions in Compile doc Opts doc title Scala Too
  • 对于 Scala,“无全局类型推断”是什么意思?

    我读过 Scala 的类型推断不是全局的 因此人们必须在方法上放置类型注释 这会是 本地 类型推断吗 我只知道一点点 原因是它面向对象的本质 但我不清楚 是否有 全局类型推断 的解释以及为什么 Scala 不能让初学者可以理解 The pr
  • 动态过滤 pandas 数据框

    我正在尝试使用三列的阈值来过滤 pandas 数据框 import pandas as pd df pd DataFrame A 6 2 10 5 3 B 2 5 3 2 6 C 5 2 1 8 2 df df loc df A gt 0
  • 我可以使用特征中的方法重写 scala 类方法吗?

    class PasswordCaseClass val password String trait PasswordTrait self PasswordCaseClass gt override def password blue val
  • 如何将 r 数据框转换为 h2o 对象

    我对 R 和 H2O 很陌生 我试图找到一种将 r 数据帧转换为 h2o 对象的方法 我花了一些时间研究如何做到这一点 但没有运气 其他方式也是可能的 并且有详细记录如下 prosPath system file extdata prost
  • 使用 Python Pandas 获取多个值来制作表格

    使用我的代码 我可以将两个 Excel 数据库连接到 1 中 问题是它只显示收入列 而不显示列展示次数 为了更清楚 我留下了代码和示例 我尝试过 df1 df1 pivot index Cliente columns Fecha value
  • 将多个 Future[Seq] 连接成一个 Future[Seq]

    如果没有 Future 这就是我将所有较小的 Seq 组合成一个大 Seq 的方式flatmap category getCategoryUrlKey id Int Seq Meta main method val appDomains S
  • 引用Scala中内部类的类型

    下面的代码尝试模仿DSL 的多态嵌入 http www daimi au dk ko papers gpce50 hofer pdf 而不是给出行为Inner 它被编码在useInner其封闭类的方法 我添加了enclosing方法 以便用
  • 从apache Spark中的文本文件查找rdd中存储的数据大小

    我是 Apache Spark 版本 1 4 1 的新手 我编写了一段小代码来读取文本文件并将其数据存储在 Rdd 中 有没有一种方法可以获取 rdd 中数据的大小 这是我的代码 import org apache spark SparkC
  • 避免函数内装箱/拆箱

    对于数字密集型代码 我编写了一个具有以下签名的函数 def update f Int Int Double gt Double Unit 然而 因为Function3不是专门的 每个应用程序f结果对 3 个参数和结果类型进行装箱 拆箱 我可
  • 强制类型差异

    在 Scala 中 我可以在编译时强制执行类型相等 例如 case class Foo A B a A b B implicit ev A B scala gt Foo 1 2 res3 Foo Int Int Foo 1 2 scala
  • 如何使用 Rrank() 函数创建新的ties.method? [复制]

    这个问题在这里已经有答案了 我试图按人口和日期排序这个数据框 所以我使用order and rank 功能 gt df lt data frame idgeoville c 5 8 4 3 4 5 8 8 date c rep 1950 4
  • 两个不同长度的数据帧的列之间的余弦相似度?

    我在 df1 中有文本列 在 df2 中有文本列 df2 的长度将与 df1 的长度不同 我想计算 df1 text 中每个条目与 df2 text 中每个条目的余弦相似度 并为每场比赛给出分数 输入样本 df1 mahesh suresh
  • Spark Shuffle 写入超慢

    为什么对于 1 6MB shuffle 写入和 2 4MB 输入 spark shuffle 阶段如此缓慢 为什么 shuffle 写入仅发生在一个执行器上 我正在运行一个 3 节点集群 每个集群有 8 个核心 火花用户界面 Code Ja
  • 如何从namedtuple实例列表创建pandas DataFrame(带有索引或多索引)?

    简单的例子 from collections import namedtuple import pandas Price namedtuple Price ticker date price a Price GE 2010 01 01 30

随机推荐

  • logrotate 日期格式似乎不支持 %H:%M:%S

    我是 logrotate 的新手 当配置到 dateformat 属性时 logrotate 似乎不支持 strftime H 这是配置 daily rotate 2 size 3M missingok notifempty dateext
  • 在 Prolog 中交换列表的连续项

    我正在尝试编写可以交换列表中两个元素的 Prolog 代码 但前提是它们是彼此连续 那是 conseq swap d e a g d e f X 应该给出 X a g e d f d 和 e 是连续的 However conseq swap
  • 在单个事务中发送多个 SQL 命令

    我有一个巨大的清单INSERT INTO 字符串 目前我运行它们 using SqlConnection connection new SqlConnection connectionString connection Open forea
  • 按钮位于画布中央

    我有 4 个重叠的画布 充当图层 绝对定位并水平和垂直居中 在此画布上 我想在画布中心的一列中覆盖四个 HTML CSS 按钮 用于游戏菜单 我是 CSS 和 HTML 的新手 我一直无法弄清楚如何让按钮在绝对定位的画布上居中 我怎样才能做
  • 通用扩展类并实现接口[重复]

    这个问题在这里已经有答案了 这听起来可能是一个奇怪的问题 但是如何定义一个必须扩展类并实现接口的泛型呢 我目前有一个具有以下原型的通用函数 public static
  • cudaArray 与设备指针

    我对设备指针和设备指针的预期用途之间的区别感到困惑cudaArray结构 有人可以解释一下为什么我会使用其中一种而不是另一种吗 我的基本问题是 在浏览文档并阅读 CUDA by Example 一书之后 我不明白 API 设计者的意图 从我
  • 使用 Sprite Kit 创建自定义滑块 - 如何传递@selector?

    我正在开发的 Sprite Kit 游戏使用自定义滑块充当颜色选择器 颜色是从滑块轨道图形中选取的 这是一个UIImage包含梯度 我研究过使用定制的UISlider 但是标准的 IOS UI 控件不能很好地与 Sprite Kit 的场景
  • 如何通过 python 脚本在 ArcGIS 中添加 shapefile?

    我正在尝试使用 Python 自动执行 ArcGIS Desktop 通常使用 ArcMap 中的各种任务 并且我一直需要一种将形状文件添加到当前地图的方法 然后对其做一些事情 但那是另一个故事 到目前为止我能做的最好的就是添加一个laye
  • AngularJS ng-options 将数据类型添加到选项的值

    尝试使用最新版本 1 5 8 的 AngularJS 和 ng options 来填充下拉列表 问题是它添加了数据类型和值 如下所示
  • 如何将YUV_420_888图像转换为位图[重复]

    这个问题在这里已经有答案了 我正在开发 AR 项目 我需要捕获当前帧并将其保存到图库 我可以使用AR core中的Frame类获取图像 但图像的格式是YUV 420 888 我已经尝试了很多解决方案来将其转换为位图 但无法解决它 这就是我转
  • 在 Safari 9.1.2 中,悬停时的 SVG 转换失败 (11601.7.7)

    我试图在悬停时将文本和 svg 淡入另一种颜色 a color ff0000 display inline block margin 0 0 0 1em text decoration none text transform lowerca
  • Linux 中 C 语言的消息框

    我想在 Linux 中用 C 执行程序后显示 成功 弹出消息 它应该在 KDE 和 GNOME 上运行 我怎样才能做到这一点 您正在寻找桌面通知 http www galago project org specs notification
  • 如何将 Apache 配置为仅分叉一个子进程/工作进程?

    我希望启动 apache 以便它分叉一个子进程 我正在分析一个模块 我知道 X 标志 根据文档 两个指令对活动子进程的数量设置了硬性限制 以及子进程中的服务器线程数 并且只能是 通过完全停止服务器然后再次启动来更改 ServerLimit
  • JFormattedTextField 仅限于 0 或 1

    我想限制用户可以在文本字段中输入的字符 如何限制 JFormattedTextField 仅接受 0 或 1 的 8 位数字 Use 文档过滤器 https stackoverflow com a 9430683 714968与模式JTex
  • 使用 FBGraph 登录 Facebook

    我是 iOS 开发新手 我想从我的 iPhone 应用程序连接到 Facebook 我跟着FBGraph API https github com reallylongaddress iPhone Facebook Graph API tr
  • Docker - 在源文件中构建 Arg

    我正在尝试构建一个 Docker 容器 我想将其源标签作为参数传递 构建脚本 docker build pull true build arg version version Docker 文件 ARG version FROM regis
  • Django:自定义中间件被调用两次

    我有一个自定义中间件 每个请求都会调用两次 我不明白为什么 这是我的中间件 class MyMiddleWare object def process request self request print FOO return None 这
  • Spark AccumulableCollection 不适用于 mutable.Map

    我正在使用 Spark 进行员工记录累积 为此我使用 Spark 的累加器 我使用 Map empId emp 作为cumulableCollection 以便我可以通过员工的 id 搜索员工 我已经尝试了一切 但它不起作用 有人可以指出我
  • 无论缓冲区设置如何,PHP 命令行输出缓冲区都会输出

    我正在为一些类编写单元测试 这些类在其中有回声 我想抑制这个输出和想法ob start and ob clean 就足够了 但它们没有效果 public function testSomething ob start class new M
  • 合并 Spark scala Dataframe 中的行

    合并 Spark Dataframe 中的行 我有如下数据 ID Name Passport Country License UpdatedtimeStamp 1 Ostrich 12345 ABC 11 02 2018 1 BCD 10