Spark DataFrame 删除重复项并保留第一个

2024-05-01

问题:在 pandas 中,当删除重复项时,您可以指定要保留哪些列。 Spark Dataframes 中有等效的吗?

Pandas:

df.sort_values('actual_datetime', ascending=False).drop_duplicates(subset=['scheduled_datetime', 'flt_flightnumber'], keep='first')

Spark dataframe(我使用Spark 1.6.0)没有 keep 选项

df.orderBy(['actual_datetime']).dropDuplicates(subset=['scheduled_datetime', 'flt_flightnumber'])

Imagine scheduled_datetime and flt_flightnumber是第 6 ,17 列。通过根据这些列的值创建键,我们还可以删除重复项

def get_key(x):
    return "{0}{1}".format(x[6],x[17])

df= df.map(lambda x: (get_key(x),x)).reduceByKey(lambda x,y: (x))

但如何指定保留第一行并摆脱其他重复项?最后一排怎么样?


对于每个说 dropDuplicates 保留第一次出现的人 -这并不完全正确。

dropDuplicates 保留排序操作的“第一次出现” - 仅当有 1 个分区时。请参阅下面的一些示例。
然而,这对于大多数 Spark 数据集来说并不实用。因此,我还提供了一个使用窗口函数 + 排序 + 排名 + 过滤器进行“首次出现”删除重复操作的示例。
例如,请参阅帖子底部。

这是使用 pyspark 在 Spark 2.4.0 中进行测试的。

删除重复项示例

import pandas as pd

# generating some example data with pandas, will convert to spark df below
df1 = pd.DataFrame({'col1':range(0,5)})
df1['datestr'] = '2018-01-01'
df2 = pd.DataFrame({'col1':range(0,5)})
df2['datestr'] = '2018-02-01'
df3 = pd.DataFrame({'col1':range(0,5)})
df3['datestr'] = '2018-03-01'
dfall = pd.concat([df1,df2,df3])
print(dfall)
   col1     datestr
0     0  2018-01-01
1     1  2018-01-01
2     2  2018-01-01
3     3  2018-01-01
4     4  2018-01-01
0     0  2018-02-01
1     1  2018-02-01
2     2  2018-02-01
3     3  2018-02-01
4     4  2018-02-01
0     0  2018-03-01
1     1  2018-03-01
2     2  2018-03-01
3     3  2018-03-01
4     4  2018-03-01
# first example
# does not give first (based on datestr)
(spark.createDataFrame(dfall)
   .orderBy('datestr')
   .dropDuplicates(subset = ['col1'])
   .show()
)

# dropDuplicates NOT based on occurrence of sorted datestr
+----+----------+
|col1|   datestr|
+----+----------+
|   0|2018-03-01|
|   1|2018-02-01|
|   3|2018-02-01|
|   2|2018-02-01|
|   4|2018-01-01|
+----+----------+
# second example
# testing what happens with repartition
(spark.createDataFrame(dfall)
   .orderBy('datestr')
   .repartition('datestr')
   .dropDuplicates(subset = ['col1'])
   .show()
)

# dropDuplicates NOT based on occurrence of sorted datestr

+----+----------+
|col1|   datestr|
+----+----------+
|   0|2018-02-01|
|   1|2018-01-01|
|   3|2018-02-01|
|   2|2018-02-01|
|   4|2018-02-01|
+----+----------+
#third example
# testing with coalesce(1)
(spark
   .createDataFrame(dfall)
   .orderBy('datestr')
   .coalesce(1)
   .dropDuplicates(subset = ['col1'])
   .show()
)

# dropDuplicates based on occurrence of sorted datestr
+----+----------+
|col1|   datestr|
+----+----------+
|   0|2018-01-01|
|   1|2018-01-01|
|   2|2018-01-01|
|   3|2018-01-01|
|   4|2018-01-01|
+----+----------+
# fourth example
# testing with reverse sort then coalesce(1)
(spark
   .createDataFrame(dfall)
   .orderBy('datestr', ascending = False)
   .coalesce(1)
   .dropDuplicates(subset = ['col1'])
   .show()
)
# dropDuplicates based on occurrence of sorted datestr```
+----+----------+
|col1|   datestr|
+----+----------+
|   0|2018-03-01|
|   1|2018-03-01|
|   2|2018-03-01|
|   3|2018-03-01|
|   4|2018-03-01|
+----+----------+

窗口、排序、排名、过滤示例

# generating some example data with pandas
df1 = pd.DataFrame({'col1':range(0,5)})
df1['datestr'] = '2018-01-01'
df2 = pd.DataFrame({'col1':range(0,5)})
df2['datestr'] = '2018-02-01'
df3 = pd.DataFrame({'col1':range(0,5)})
df3['datestr'] = '2018-03-01'
dfall = pd.concat([df1,df2,df3])
# into spark df
df_s = (spark.createDataFrame(dfall))
from pyspark.sql import Window
from pyspark.sql.functions import rank
window = Window.partitionBy("col1").orderBy("datestr")
(df_s.withColumn('rank', rank().over(window))
.filter(col('rank') == 1)
.drop('rank')
.show()
)
+----+----------+
|col1|   datestr|
+----+----------+
|   0|2018-01-01|
|   1|2018-01-01|
|   3|2018-01-01|
|   2|2018-01-01|
|   4|2018-01-01|
+----+----------+
# however this fails if ties/duplicates exist in the windowing paritions
# and so a tie breaker for the 'rank' function must be added

# generating some example data with pandas, will convert to spark df below
df1 = pd.DataFrame({'col1':range(0,5)})
df1['datestr'] = '2018-01-01'
df2 = pd.DataFrame({'col1':range(0,5)})
df2['datestr'] = '2018-01-01' # note duplicates in this dataset
df3 = pd.DataFrame({'col1':range(0,5)})
df3['datestr'] = '2018-03-01'
dfall = pd.concat([df1,df2,df3])
print(dfall)
   col1     datestr
0     0  2018-01-01
1     1  2018-01-01
2     2  2018-01-01
3     3  2018-01-01
4     4  2018-01-01
0     0  2018-01-01
1     1  2018-01-01
2     2  2018-01-01
3     3  2018-01-01
4     4  2018-01-01
0     0  2018-03-01
1     1  2018-03-01
2     2  2018-03-01
3     3  2018-03-01
4     4  2018-03-01
# this will fail, since duplicates exist within the window partitions
# and no way to specify ranking style exists in pyspark rank() fn
window = Window.partitionBy("col1").orderBy("datestr")
(df_s.withColumn('rank', rank().over(window))
.filter(col('rank') == 1)
.drop('rank')
.show()
)
+----+----------+
|col1|   datestr|
+----+----------+
|   0|2018-01-01|
|   0|2018-01-01|
|   1|2018-01-01|
|   1|2018-01-01|
|   3|2018-01-01|
|   3|2018-01-01|
|   2|2018-01-01|
|   2|2018-01-01|
|   4|2018-01-01|
|   4|2018-01-01|
+----+----------+
# to deal with ties within window partitions, a tiebreaker column is added
from pyspark.sql import Window
from pyspark.sql.functions import rank, col, monotonically_increasing_id
window = Window.partitionBy("col1").orderBy("datestr",'tiebreak')
(df_s
 .withColumn('tiebreak', monotonically_increasing_id())
 .withColumn('rank', rank().over(window))
 .filter(col('rank') == 1).drop('rank','tiebreak')
 .show()
)
+----+----------+
|col1|   datestr|
+----+----------+
|   0|2018-01-01|
|   1|2018-01-01|
|   3|2018-01-01|
|   2|2018-01-01|
|   4|2018-01-01|
+----+----------+
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark DataFrame 删除重复项并保留第一个 的相关文章

随机推荐

  • 仅在 RMarkdown 中引用作者/在文本引用中添加所有格

    我想引用 RMarkdown 中的一位作者在他的名字中添加所有格 的内容 但是 我找不到在没有出版年份的情况下引用作者或直接添加所有格的方法 s 它应该看起来像这样 在芬纳 2012 的书中 最小工作示例 对于 R 笔记本 title R
  • Java如何使用私钥文件而不是PEM来解密?

    使用 Java 和 Bouncy Castle 1 52 我可以使用以下代码通过 PEM 证书加载私钥 我还有一个相同的 PKCS8 格式的 private key 文件 直接使用private key文件而不是PEM的代码是什么 Stri
  • 了解 keras 中不同序列的 lstm 输入形状

    我对 keras 和 python 都很陌生 我有一个具有不同序列长度的时间序列数据集 例如第一个序列是 484000x128 第二个序列是 563110x128 等 我已将序列放入 3D 数组中 我的问题是如何定义输入形状 因为我很困惑
  • Spring 3.1 和 JPA 2 没有正在进行的事务异常

    我已经这样做了几个星期了 我已经尝试过 eclipselink 现在只是普通的 JPA 我不断遇到同样的问题 每次我尝试刷新实体管理器时 都会收到 javax persistence TransactionRequiredException
  • 从全日历中获取选定的日期

    我将日历添加到我的 asp net mvc 2 应用程序中here http arshaw com fullcalendar 我想选择参加活动的选定日期 我怎样才能获得选定的日期 我还想将此日期和相应的事件保存到数据库中 这还怎么办 设置插
  • 如何使用 C 将带有 2 个变量的 IF 语句转换为 switch 函数?

    我有一个 IF 语句 我想将其转换为 Switch 语句 但它有 2 个变量 在C上可以实现吗 这是一个石头 剪刀 布的游戏 R代表石头 P代表布 S代表剪刀 char play1 play2 printf nPlayer 1 Enter
  • 使用异步函数等待 onclick 的用户输入

    我是异步新手 也许只是不了解基础知识 但我试图通过调用弹出模式并等待用户提交数据的异步函数来等待来自 onclick 的用户输入 在找到一两个甚至提到使用异步等待页面事件的来源后 这对我的特定任务并不是特别有帮助 我想出了这个 asnyc
  • 如何将 Netbeans 项目导入 Eclipse

    我想将我的 NetBeans 项目转移到 Eclipse 这是一个网络应用程序项目 我将 war 文件导入到 Eclipse 中 但无法获取 Java 文件 并且 war 文件给了我很多错误 导入整个项目的最佳方式是什么 另一种简单的方法如
  • Togglz 我的 SpringBoot 的 Yml/Yaml 配置不起作用

    尝试使用 Togglz 创建功能切换 在我的应用程序中进行了以下配置 代码 bootstrap yml togglz enabled true features FEATURE ONE true 功能枚举类 public enum AppF
  • 如何从对象文字数组中切片数组?

    我有这个数组 其中每个索引都包含一个对象文字 所有对象字面量都具有相同的属性 某些对象文字对于给定属性具有相同的值 我想创建一个包含only那些对象文字 我的想法是对数组进行排序 并将其切片成一个新数组 这是数组 var arr arr 0
  • virtualenv、python 和 subversion

    我正在尝试在 python subversion SWIG 库中使用virtualenv no site packages环境 我怎样才能做到这一点 你可以从 svn 将其安装在 virtualenv 中 source home you v
  • 您在一次渲染中修改了 *** 两次

    升级到 1 13 后 出现此异常 但我不知道问题出在哪里 我也找不到任何有用的资源来解决我的问题 我在另一个计算属性中设置的属性会发生这种情况 但这个属性肯定只被调用一次 我创建了一个 jsbin 示例 http emberjs jsbin
  • 如何在 Mac 上升级 Docker? [关闭]

    Closed 这个问题不符合堆栈溢出指南 help closed questions 目前不接受答案 我尝试了 docker machine升级 并做了一些工作 但我似乎没有最新的 CLI 如果我执行 docker 版本 我会得到 Clie
  • 无法从“https://services.gradle.org/distributions/gradle-2.1-all.zip”安装 Gradle 发行版

    我是 gradle 新手 我只是尝试在 IntelliJ 中创建一个新的 Android Gradle 项目 填写完必需品后 它开始下载一些东西 这花了几个小时 所以我决定强制退出 IDE 并再次打开项目 And now I am gett
  • 函数不会在所有代码路径上返回值。使用结果时,运行时可能会发生空引用异常

    我收到此错误 函数 getkey 不会在所有代码路径上返回值 当结果为空引用异常时 可能会在运行时发生 用过的 到以下代码 Public Function getkey ByVal id As String Dim cmd As SqlCo
  • 在执行方法的括号内声明变量

    默认情况下 变量在方法执行之前定义 例如 DateTime myDate if DateTime TryParse date out myDate 我们可以实现内联声明 并且该变量可以在外部使用 例如 if DateTime TryPars
  • 如何使用 Node JS 对包含小数/尾随零的数据生成哈希

    在尝试验证 Node JS 中的 Authorize net webhook 通知时 我遇到了以下与小数 尾随零有关的问题 Authorize net 使用 HMAC SHA512 以及 Webhook 通知正文和商家的签名密钥形成哈希 该
  • 私人 NuGet Feed - 记住密码

    每当我在 Visual Studio 中更新 NuGet 包时 系统都会提示我输入有关私有 NuGet 源的用户名和密码 尽管我勾选了方框 记住我的密码 后续 NuGet 更新时系统会提示我输入密码 如何让它正确记住我的密码 这是来自 Vi
  • 删除行时 QModelIndex 变得无效

    我正在子类化QAbstractItemModel显示项目QTreeView 并且在这个子类中 projectModel 我有一个功能可以删除树视图中当前选定的索引 Component是用于表示模型所有成员的类 void projectMod
  • Spark DataFrame 删除重复项并保留第一个

    问题 在 pandas 中 当删除重复项时 您可以指定要保留哪些列 Spark Dataframes 中有等效的吗 Pandas df sort values actual datetime ascending False drop dup