如何获取 Spark RDD 的 SQL row_number 等效项?

2024-01-06

我需要为包含许多列的数据表生成行号的完整列表。

在 SQL 中,这看起来像这样:

select
   key_value,
   col1,
   col2,
   col3,
   row_number() over (partition by key_value order by col1, col2 desc, col3)
from
   temp
;

现在,假设在 Spark 中我有一个 (K, V) 形式的 RDD,其中 V=(col1, col2, col3),所以我的条目就像

(key1, (1,2,3))
(key1, (1,4,7))
(key1, (2,2,3))
(key2, (5,5,5))
(key2, (5,5,9))
(key2, (7,5,5))
etc.

我想使用 sortBy()、sortWith()、sortByKey()、zipWithIndex 等命令来排序这些,并拥有一个具有正确 row_number 的新 RDD

(key1, (1,2,3), 2)
(key1, (1,4,7), 1)
(key1, (2,2,3), 3)
(key2, (5,5,5), 1)
(key2, (5,5,9), 2)
(key2, (7,5,5), 3)
etc.

(我不关心括号,所以形式也可以是 (K, (col1,col2,col3,rownum)) 代替)

我该怎么做呢?

这是我的第一次尝试:

val sample_data = Seq(((3,4),5,5,5),((3,4),5,5,9),((3,4),7,5,5),((1,2),1,2,3),((1,2),1,4,7),((1,2),2,2,3))

val temp1 = sc.parallelize(sample_data)

temp1.collect().foreach(println)

// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)
// ((1,2),1,2,3)
// ((1,2),1,4,7)
// ((1,2),2,2,3)

temp1.map(x => (x, 1)).sortByKey().zipWithIndex.collect().foreach(println)

// ((((1,2),1,2,3),1),0)
// ((((1,2),1,4,7),1),1)
// ((((1,2),2,2,3),1),2)
// ((((3,4),5,5,5),1),3)
// ((((3,4),5,5,9),1),4)
// ((((3,4),7,5,5),1),5)

// note that this isn't ordering with a partition on key value K!

val temp2 = temp1.???

另请注意,函数 sortBy 不能直接应用于 RDD,但必须先运行collect(),然后输出也不是 RDD,而是数组

temp1.collect().sortBy(a => a._2 -> -a._3 -> a._4).foreach(println)

// ((1,2),1,4,7)
// ((1,2),1,2,3)
// ((1,2),2,2,3)
// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)

这里还有一些进展,但仍然没有分区:

val temp2 = sc.parallelize(temp1.map(a => (a._1,(a._2, a._3, a._4))).collect().sortBy(a => a._2._1 -> -a._2._2 -> a._2._3)).zipWithIndex.map(a => (a._1._1, a._1._2._1, a._1._2._2, a._1._2._3, a._2 + 1))

temp2.collect().foreach(println)

// ((1,2),1,4,7,1)
// ((1,2),1,2,3,2)
// ((1,2),2,2,3,3)
// ((3,4),5,5,5,4)
// ((3,4),5,5,9,5)
// ((3,4),7,5,5,6)

The row_number() over (partition by ... order by ...)Spark 1.4 中添加了该功能。这个答案使用 PySpark/DataFrames。

创建一个测试数据框:

from pyspark.sql import Row, functions as F

testDF = sc.parallelize(
    (Row(k="key1", v=(1,2,3)),
     Row(k="key1", v=(1,4,7)),
     Row(k="key1", v=(2,2,3)),
     Row(k="key2", v=(5,5,5)),
     Row(k="key2", v=(5,5,9)),
     Row(k="key2", v=(7,5,5))
    )
).toDF()

添加分区行号:

from pyspark.sql.window import Window

(testDF
 .select("k", "v",
         F.rowNumber()
         .over(Window
               .partitionBy("k")
               .orderBy("k")
              )
         .alias("rowNum")
        )
 .show()
)

+----+-------+------+
|   k|      v|rowNum|
+----+-------+------+
|key1|[1,2,3]|     1|
|key1|[1,4,7]|     2|
|key1|[2,2,3]|     3|
|key2|[5,5,5]|     1|
|key2|[5,5,9]|     2|
|key2|[7,5,5]|     3|
+----+-------+------+
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何获取 Spark RDD 的 SQL row_number 等效项? 的相关文章

  • 手动更改postgresql中查询的执行计划?

    是否可以在postgresql中手动更改执行计划的操作顺序 例如 如果我总是想在过滤之前进行排序操作 尽管这在 postgresql 的正常使用中没有意义 是否可以通过例如手动强制执行该操作改变运营的内部成本 如果我实现自己的功能呢 是否可
  • sql server 2008 对 exec 语句的限制

    我只需要仔细检查 t sql 中的 EXEC 命令是否有字符限制 如果我有一个带有 varchar max 的变量并使用 EXEC 执行命令 你认为这样可以吗 thanks 应该没问题 根据这篇 MSDN 文章 http msdn micr
  • 如何搜索表中的所有列?

    如何在 SQL Server 中搜索表的所有列 SELECT FROM yourtable WHERE val IN field1 field2 field3 field4 如果您正在寻找精确的全场比赛 如果你正在寻找子字符串匹配 你将不得
  • Sql Server 的夏令时

    我们正在使用一个以 C Unix 格式存储日期的旧应用程序 C 时间基本上是自 1970 年 1 月 1 日以来的秒数 日期以整数形式存储在 SQL Server 数据库中 我正在为使用这些日期的报告编写视图 到目前为止 我正在使用以下命令
  • 优化 LATERAL join 中的慢速聚合

    在我的 PostgreSQL 9 6 2 数据库中 我有一个查询 该查询根据一些股票数据构建计算字段表 它为表中的每一行计算 1 到 10 年的移动平均窗口 并将其用于周期性调整 具体来说 CAPE CAPB CAPC CAPS 和 CAP
  • Postgres LIMIT/OFFSET 奇怪的行为

    我正在使用 PostgreSQL 9 6 我有一个这样的查询 SELECT anon 1 id AS anon 1 id anon 1 is valid AS anon 1 is valid anon 1 first name AS ano
  • 如何检查oracle数据库中分配给模式、角色的对象的权限(DDL、DML、DCL)?

    大多数时候 我们都在与愚蠢的事情作斗争 以获取架构 角色及其对象的权限详细信息 并尝试找到一些简单的方法来获取有关它的所有详细信息以及伪查询代码 以批量生成授予语句以供进一步使用执行 所以我们在这里得到它 关于数据字典视图前缀的一些简单介绍
  • SQL Server Like 查询不区分大小写

    Query SELECT from Table 2 WHERE name like Joe Output 1 100 Joe 2 200 JOE 3 300 jOE 4 400 joe 为什么不区分大小写 Problem 查询不区分大小写
  • 从 PL/SQL 调用 shell 脚本,但 shell 以 grid 用户而非 oracle 身份执行

    我正在尝试使用 Runtime getRuntime exec 从 Oracle 数据库内部执行 shell 脚本 在 Red Hat 5 5 上运行的 Oracle 11 2 0 4 EE CREATE OR REPLACE proced
  • RANK() OVER PARTITION 并重置 RANK

    如何获得在分区更改时重新启动的 RANK 我有这张表 ID Date Value 1 2015 01 01 1 2 2015 01 02 1
  • 如何将事物的组合映射到关系数据库?

    我有一个表 其记录代表某些对象 为了简单起见 我假设该表只有一列 这是唯一的ObjectId 现在我需要一种方法来存储该表中的对象组合 组合必须是唯一的 但可以是任意长度 例如 如果我有ObjectIds 1 2 3 4 我想存储以下组合
  • 最近邻居的 Postgis SQL

    我正在尝试计算最近的邻居 为此 我需要传递一个参数来限制与邻居的最大距离 例如 半径1000米内最近的邻居是哪些 我做了以下事情 我用数据创建了表 id name latitude longitude 之后 我执行了以下查询 SELECT
  • Spark 在 WholeTextFiles 上创建的分区少于 minPartitions 参数

    我有一个文件夹 里面有 14 个文件 我在一个集群上使用 10 个执行器运行 Spark Submit 该集群的资源管理器为 YARN 我创建了我的第一个 RDD 如下所示 JavaPairRDD
  • SQL UPDATE 语句根据另一个现有行更新列

    基本上我有一个与下表具有相似格式的表格 我想做的是根据这个逻辑更新 Col4 如果 Col2 为空 则用 Col3 更新 Col4 如果 Col2 不为 null 则在 Col1 中查找与 Col2 中的值匹配的值 使用 col3 中的相应
  • 如何在 Spring Data 中选择不同的结果

    我在使用简单的 Spring Data 查询或 Query 或 QueryDSL 在 Spring Data 中构建查询时遇到问题 如何选择三列 研究 国家 登录 不同的行 并且查询结果将是用户对象类型的列表 Table User Id S
  • 在 Mysql 上使用 EntityManager JPA 运行脚本

    我正在尝试运行脚本 sql 文件 但由于我尝试了多种方法 因此出现多个错误 这是我的主要 sql 脚本 INSERT INTO Unity VALUES 11 paq 0 2013 04 15 11 41 37 Admin Paquete
  • 在 MS Access SQL 查询中从正常日期转换为 unix 纪元日期

    我正在尝试编写一个通过 ODBC 连接到 MySQL 数据库的 MS Access 2007 连接的查询 一切工作正常 查询执行我想要的操作 我挂断的部分是我一直在询问用户 unix 纪元时间 而不是常规日期 我查找了 MS Access
  • 自动删除主键序列中的间隙

    我正在创建一个网页 该网页根据用户操作将数据存储到 MySQL 数据库中 数据库有很多行 行的主键是列 rowID 它只是按顺序对行进行编号 例如 1 2 3 4 用户可以选择删除行 问题是当用户删除最后一行以外的行时 rowID 中有一个
  • hive sql查找最新记录

    该表是 create table test id string name string age string modified string 像这样的数据 id name age modifed 1 a 10 2011 11 11 11 1
  • 在spark-kafka中使用schema将ConsumerRecord值转换为Dataframe

    我正在使用 Spark 2 0 2 和 Kafka 0 11 0 并且 我正在尝试在火花流中使用来自卡夫卡的消息 以下是代码 val topics notes val kafkaParams Map String Object bootst

随机推荐