仅保留 DataFrame 中有关某些字段的重复项

2024-04-20

我有这个火花数据框:

+---+-----+------+----+------------+------------+
| ID|  ID2|Number|Name|Opening_Hour|Closing_Hour|
+---+-----+------+----+------------+------------+
|ALT|  QWA|     6|null|    08:59:00|    23:30:00|
|ALT|AUTRE|     2|null|    08:58:00|    23:29:00|
|TDR|  QWA|     3|null|    08:57:00|    23:28:00|
|ALT| TEST|     4|null|    08:56:00|    23:27:00|
|ALT|  QWA|     6|null|    08:55:00|    23:26:00|
|ALT|  QWA|     2|null|    08:54:00|    23:25:00|
|ALT|  QWA|     2|null|    08:53:00|    23:24:00|
+---+-----+------+----+------------+------------+

我想获得一个新的数据框,其中仅包含 3 个字段中不唯一的行"ID", "ID2" and "Number".

这意味着我想要这个 DataFrame:

+---+-----+------+----+------------+------------+
| ID|  ID2|Number|Name|Opening_Hour|Closing_Hour|
+---+-----+------+----+------------+------------+
|ALT|  QWA|     6|null|    08:59:00|    23:30:00|
|ALT|  QWA|     2|null|    08:53:00|    23:24:00|
+---+-----+------+----+------------+------------+

或者可能是一个包含所有重复项的数据框:

+---+-----+------+----+------------+------------+
| ID|  ID2|Number|Name|Opening_Hour|Closing_Hour|
+---+-----+------+----+------------+------------+
|ALT|  QWA|     6|null|    08:59:00|    23:30:00|
|ALT|  QWA|     6|null|    08:55:00|    23:26:00|
|ALT|  QWA|     2|null|    08:54:00|    23:25:00|
|ALT|  QWA|     2|null|    08:53:00|    23:24:00|
+---+-----+------+----+------------+------------+

一种方法是使用pyspark.sql.Window http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.Window添加一列来计算每行的重复项数("ID", "ID2", "Number")组合。然后仅选择重复项数大于 1 的行。

import pyspark.sql.functions as f
from pyspark.sql import Window

w = Window.partitionBy('ID', 'ID2', 'Number')
df.select('*', f.count('ID').over(w).alias('dupeCount'))\
    .where('dupeCount > 1')\
    .drop('dupeCount')\
    .show()
#+---+---+------+----+------------+------------+
#| ID|ID2|Number|Name|Opening_Hour|Closing_Hour|
#+---+---+------+----+------------+------------+
#|ALT|QWA|     2|null|    08:54:00|    23:25:00|
#|ALT|QWA|     2|null|    08:53:00|    23:24:00|
#|ALT|QWA|     6|null|    08:59:00|    23:30:00|
#|ALT|QWA|     6|null|    08:55:00|    23:26:00|
#+---+---+------+----+------------+------------+

I used pyspark.sql.functions.count() http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.functions.count计算每组中的项目数。这将返回一个包含所有重复项的 DataFrame(您显示的第二个输出)。

如果您只想获得一行("ID", "ID2", "Number")组合,您可以使用另一个窗口来对行进行排序。

例如,下面我添加另一列row_number http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.functions.row_number并仅选择重复计数大于 1 且行号等于 1 的行。这保证每个分组一行。

w2 = Window.partitionBy('ID', 'ID2', 'Number').orderBy('ID', 'ID2', 'Number')
df.select(
        '*',
        f.count('ID').over(w).alias('dupeCount'),
        f.row_number().over(w2).alias('rowNum')
    )\
    .where('(dupeCount > 1) AND (rowNum = 1)')\
    .drop('dupeCount', 'rowNum')\
    .show()
#+---+---+------+----+------------+------------+
#| ID|ID2|Number|Name|Opening_Hour|Closing_Hour|
#+---+---+------+----+------------+------------+
#|ALT|QWA|     2|null|    08:54:00|    23:25:00|
#|ALT|QWA|     6|null|    08:59:00|    23:30:00|
#+---+---+------+----+------------+------------+
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

仅保留 DataFrame 中有关某些字段的重复项 的相关文章

  • Spark SQL 中的 SQL LIKE

    我正在尝试使用 LIKE 条件在 Spark SQL 中实现联接 我正在执行连接的行看起来像这样 称为 修订 Table A 8NXDPVAE Table B 4 8 NXD V 在 SQL Server 上执行联接 A revision
  • S3并行读写性能?

    考虑 Spark 或任何其他 Hadoop 框架 从 S3 读取大型 例如 1 TB 文件的场景 多个spark执行器如何从S3并行读取非常大的文件 在 HDFS 中 这个非常大的文件将分布在多个节点上 每个节点都有一个数据块 在对象存储中
  • 使用空/空字段值创建新的数据框

    我正在从现有数据帧创建一个新数据帧 但需要在这个新 DF 中添加新列 下面代码中的 field1 我该怎么做 工作示例代码示例将不胜感激 val edwDf omniDataFrame withColumn field1 callUDF v
  • 如何在 Scala 中将 DataFrame 模式写入文件

    我有一个 DataFrame 它从一个巨大的 json 文件加载并从中获取架构 该架构基本上大约有 1000 列 我希望将 printSchema 的相同输出保存在文件中而不是控制台中 有任何想法吗 如果您在本地环境中工作 您可以执行以下操
  • Spark中的count和collect函数抛出IllegalArgumentException

    当我使用时抛出此异常时 我尝试在本地 Spark 上加载一个小数据集count 在 PySpark 中 take 似乎有效 我试图搜索这个问题 但没有找到原因 看来RDD的分区有问题 有任何想法吗 先感谢您 sc stop sc Spark
  • Scala:如何获取数据框中的行范围

    我有一个DataFrame通过运行创建sqlContext readParquet 文件的一个 The DataFrame由 300 M 行组成 我需要使用这些行作为另一个函数的输入 但我想以较小的批次进行操作 以防止 OOM 错误 目前
  • 为什么我必须明确告诉 Spark 要缓存什么?

    在 Spark 中 每次我们对 RDD 执行任何操作时 都会重新计算 RDD 因此 如果我们知道 RDD 将被重用 我们应该显式地缓存 RDD 比方说 Spark 决定延迟缓存所有 RDD 并使用 LRU 自动将最相关的 RDD 保留在内存
  • 从 aws Glue 脚本调用存储过程

    ETL 作业完成后 在 AWS Glue 脚本中调用存储过程的最佳方式是什么 我正在使用 PySpark 从 S3 获取数据并将其存储在临时表中 在这个过程之后 需要调用一个存储过程 该存储过程将数据从临时表加载到相应的 MDS 表中 如果
  • 检查 pyspark df 列的值是否存在于其他 pyspark df 列中

    我有 2 个 pyspark 数据帧 我想检查一列的值是否存在于另一个数据帧的列中 我只看到了如何过滤存在的值的解决方案 像这样 https stackoverflow com questions 41775281 filtering a
  • 为什么 Apache Spark 会读取嵌套结构中不必要的 Parquet 列?

    我的团队正在构建一个 ETL 流程 以使用 Spark 将原始分隔文本文件加载到基于 Parquet 的 数据湖 中 Parquet 列存储的承诺之一是查询将仅读取必要的 列条带 但我们看到意外的列被读取以获取嵌套模式结构 为了进行演示 下
  • Spark:连接两个相同分区的数据帧时防止洗牌/交换

    我有两个数据框df1 and df2我想在一个名为的高基数字段上多次加入这些表visitor id 我只想执行一次初始洗牌 并让所有连接发生 而无需在 Spark 执行器之间洗牌 交换数据 为此 我创建了另一个名为visitor parti
  • Scala Spark:将数据框中的双列转换为日期时间列

    我正在尝试编写代码来将日期时间列 date 和 last updated date 转换为 mm dd yyyy 格式以进行显示 它们实际上是 unix 时间转换为双精度数 我该怎么做呢 import org joda time impor
  • 选择 PySpark 数据框中的列

    我正在寻找一种在 PySpark 中选择数据帧列的方法 对于第一行 我知道我可以使用df first 但不确定列是否存在没有列名 我有 5 列 想循环浏览每一列 1 2 3 4 5 6 7 1 0 0 0 0 0 0 1 0 0 0 0 0
  • 如果 Spark 中的数据帧是不可变的,为什么我们能够使用 withColumn() 等操作来修改它?

    这可能是一个愚蠢的问题 源于我的无知 我已经在 PySpark 上工作了几个星期 并没有太多的编程经验 我的理解是 在 Spark 中 RDD 数据帧和数据集都是不可变的 我再次理解 这意味着您无法更改数据 如果是这样 为什么我们能够使用编
  • 这个错误是什么意思(SimpleHttpConnectionManager 被错误使用)?

    我正在尝试从 ElasticSearch 中读取数据到 Spark conf es resource sflow sflow es nodes ES01 es query some query rdd sc newAPIHadoopRDD
  • 在 Pandas UDF PySpark 中传递多列

    我想计算 PySpark DataFrame 两列之间的 Jaro Winkler 距离 Jaro Winkler 距离可通过所有节点上的 pyjarowinkler 包获得 pyjarowinkler 的工作原理如下 from pyjar
  • SQL 类似于 PySpark 数据帧的 NOT IN 子句

    例如 在 SQL 中 我们可以这样做select from table where col1 not in A B 我想知道是否有一个与此等效的 PySpark 我能够找到isin类似于 SQL 的函数IN条款 但没有任何内容NOT IN
  • Spark 写入 S3 V4 SignatureDoesNotMatch 错误

    我遇到S3SignatureDoesNotMatch尝试使用 Spark 将 Dataframe 写入 S3 时 症状 尝试过的事情 代码失败有时但有效有时 代码可以read从 S3 没有任何问题 并且能够不时写入 S3 这排除了错误的配置
  • Zeppelin:如何在 zeppelin 中重新启动 SparkContext

    我正在使用 zeppelins Spark 解释器的隔离模式 在这种模式下 它将为 Spark 集群中的每个笔记本启动一项新工作 我想在笔记本执行完成后通过 zeppelin 终止该作业 为此我做了sc stop这停止了 sparkCont
  • 为什么spark在sql查询末尾附加'WHERE 1=0'

    我正在尝试使用 Apache Spark 执行简单的 mysql 查询并创建一个数据框 但由于某些原因 Spark 附加 WHERE 1 0 在我想要执行的查询末尾并抛出异常说明 You have an error in your SQL

随机推荐