SparkSQL 超前/滞后函数中的动态/变量偏移

2023-12-24

我们可以以某种方式使用取决于 Spark SQL 中的领先/滞后函数中的列值的偏移值吗?

示例:以下是运行良好的方法。

val sampleData = Seq( ("bob","Developer",125000),
  ("mark","Developer",108000),
  ("carl","Tester",70000),
  ("peter","Developer",185000),
  ("jon","Tester",65000),
  ("roman","Tester",82000),
  ("simon","Developer",98000),
  ("eric","Developer",144000),
  ("carlos","Tester",75000),
  ("henry","Developer",110000)).toDF("Name","Role","Salary")

val window = Window.orderBy("Role")

//Derive lag column for salary
val laggingCol = lag(col("Salary"), 1).over(window)

//Use derived column LastSalary to find difference between current and previous row
val salaryDifference = col("Salary") - col("LastSalary")

//Calculate trend based on the difference
//IF ELSE / CASE can be written using when.otherwise in spark
val trend = when(col("SalaryDiff").isNull || col("SalaryDiff").===(0), "SAME")
  .when(col("SalaryDiff").>(0), "UP")
  .otherwise("DOWN")

sampleData.withColumn("LastSalary", laggingCol)
  .withColumn("SalaryDiff",salaryDifference)
  .withColumn("Trend", trend).show()

现在,我的用例是我们必须传递的偏移量取决于整数类型的特定列。这有点是我想要的工作:

val sampleData = Seq( ("bob","Developer",125000,2),
  ("mark","Developer",108000,3),
  ("carl","Tester",70000,3),
  ("peter","Developer",185000,2),
  ("jon","Tester",65000,1),
  ("roman","Tester",82000,1),
  ("simon","Developer",98000,2),
  ("eric","Developer",144000,3),
  ("carlos","Tester",75000,2),
  ("henry","Developer",110000,2)).toDF("Name","Role","Salary","ColumnForOffset")

val window = Window.orderBy("Role")

//Derive lag column for salary
val laggingCol = lag(col("Salary"), col("ColumnForOffset")).over(window)

//Use derived column LastSalary to find difference between current and previous row
val salaryDifference = col("Salary") - col("LastSalary")

//Calculate trend based on the difference
//IF ELSE / CASE can be written using when.otherwise in spark
val trend = when(col("SalaryDiff").isNull || col("SalaryDiff").===(0), "SAME")
  .when(col("SalaryDiff").>(0), "UP")
  .otherwise("DOWN")

sampleData.withColumn("LastSalary", laggingCol)
  .withColumn("SalaryDiff",salaryDifference)
  .withColumn("Trend", trend).show()
   

这将按预期抛出异常,因为 offset 仅采用整数值。 让我们讨论一下是否可以以某种方式为此实现一个逻辑。


您可以添加行号列,并根据行号和偏移量进行自连接,例如:

val df = sampleData.withColumn("rn", row_number().over(window))

val df2 = df.alias("t1").join(
    df.alias("t2"),
    expr("t1.rn = t2.rn + t1.ColumnForOffset"),
    "left"
).selectExpr("t1.*", "t2.Salary as LastSalary")

df2.show
+------+---------+------+---------------+---+----------+
|  Name|     Role|Salary|ColumnForOffset| rn|LastSalary|
+------+---------+------+---------------+---+----------+
|   bob|Developer|125000|              2|  1|      null|
|  mark|Developer|108000|              3|  2|      null|
| peter|Developer|185000|              2|  3|    125000|
| simon|Developer| 98000|              2|  4|    108000|
|  eric|Developer|144000|              3|  5|    108000|
| henry|Developer|110000|              2|  6|     98000|
|  carl|   Tester| 70000|              3|  7|     98000|
|   jon|   Tester| 65000|              1|  8|     70000|
| roman|   Tester| 82000|              1|  9|     65000|
|carlos|   Tester| 75000|              2| 10|     65000|
+------+---------+------+---------------+---+----------+
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

SparkSQL 超前/滞后函数中的动态/变量偏移 的相关文章

  • K均值||用于 Spark 上的情感分析

    我正在尝试编写基于Spark的情感分析程序 为此 我使用了 word2vec 和 KMeans 聚类 从 word2Vec 我在 100 维空间中得到了 20k 个单词 向量集合 现在我正在尝试对这个向量空间进行聚类 当我使用默认并行实现运
  • 如何将巨大的pandas数据帧保存到hdfs?

    我正在使用 pandas 和 Spark 数据框 数据帧总是非常大 gt 20 GB 标准 Spark 函数不足以满足这些大小 目前 我将 pandas 数据框转换为 Spark 数据框 如下所示 dataframe spark creat
  • pandas 函数根据 dict 创建组合列

    我正在尝试在中创建一个加权列pandas DataFrame 我有一条蟒蛇dictionary钥匙是pandas DataFrame列名称和相应权重的值 我想创建一个新的列 该列的权重基于dictionary和参考pandas DataFr
  • Spark Driver 内存和 Application Master 内存

    我是否正确理解客户端模式的文档 客户端模式与驱动程序在应用程序主机中运行的集群模式相反 在客户端模式下 驱动程序和应用程序主机是单独的进程 因此spark driver memory spark yarn am memory一定小于机器内存
  • 将数据帧中的 NaN 转换为零

    我有字典并使用创建了 Pandas cars pd DataFrame from dict cars dict orient index 和 对索引进行排序 按字母顺序排列汽车 cars sort index axis 1 排序后 我注意到
  • 从 aws Glue 脚本调用存储过程

    ETL 作业完成后 在 AWS Glue 脚本中调用存储过程的最佳方式是什么 我正在使用 PySpark 从 S3 获取数据并将其存储在临时表中 在这个过程之后 需要调用一个存储过程 该存储过程将数据从临时表加载到相应的 MDS 表中 如果
  • 通过Listener获取Spark thrift服务器查询中读取的行数

    我正在尝试为我们的 ST 服务器构建一个监控系统 到目前为止 诸如记录查询 检索的行 红色和花费的时间之类的事情都很好 我已经实现了一个自定义侦听器 我能够毫无问题地检索查询和时间 侦听SparkListenerSQLExecutionSt
  • 通过相邻行的差异过滤 pandas 数据框

    我有一个按日期时间索引的数据框 我想根据行的索引与前一行的索引之间的差异来过滤行 因此 如果我的标准是 删除比前一行晚一小时以上的所有行 则应删除下面示例中的第二行 2005 07 15 17 00 00 2005 07 17 18 00
  • 如何抑制 EMR 上运行的 Spark-sql 的 INFO 消息?

    我正在 EMR 上运行 Spark 如中所述在 Amazon Elastic MapReduce 上运行 Spark 和 Spark SQL https aws amazon com articles 4926593393724923 本教
  • 用于在 pyspark 中处理大数的数据类型

    我将 Spark 与 python 一起使用 上传 csv 文件后 我需要解析 csv 文件中的一列 其中包含 22 位数字长的数字 为了解析我使用的列长类型 我使用 map 函数来定义列 以下是我在 pyspark 中的命令 gt gt
  • 根据连续行值差异拆分数据框

    我有一个这样的数据框 df col1 col2 col3 1 2 3 2 5 6 7 8 9 10 11 12 11 12 13 13 14 15 14 15 16 现在我想当两个连续行的 col1 差异大于 1 时从上面创建多个数据框 所
  • dplyr 中 select() 的 contains() 和 matches() 之间的区别

    我决定花一些时间彻底学习dplyr 我刚刚遇到select 函数以及它附带的一些辅助函数 通过只是玩弄 我没能发现两者之间的任何区别contains and matches辅助功能 有人可以提供一个例子来说明如何将它们用于不同的目的吗 谢谢
  • 如何将函数应用于表以将 P 值输出为新行

    我有这个简单的数据框 sum 列表示行的总和 我想使用 prop test 来确定每列的 P 值 并将该数据显示为标记为 p 值的附加行 我可以按以下方式使用 prop test 来确定任何单个列的 p 值 但无法弄清楚如何使用单个函数将其
  • 为什么 Apache Spark 会读取嵌套结构中不必要的 Parquet 列?

    我的团队正在构建一个 ETL 流程 以使用 Spark 将原始分隔文本文件加载到基于 Parquet 的 数据湖 中 Parquet 列存储的承诺之一是查询将仅读取必要的 列条带 但我们看到意外的列被读取以获取嵌套模式结构 为了进行演示 下
  • 如何区分spark中的操作是转换还是动作?

    最近在学习spark 对transformation和action操作很困惑 我阅读了spark文档和一些关于spark的书籍 我知道action会导致spark作业在集群中执行 而transformation则不会 但是spark的api
  • 在 Pandas UDF PySpark 中传递多列

    我想计算 PySpark DataFrame 两列之间的 Jaro Winkler 距离 Jaro Winkler 距离可通过所有节点上的 pyjarowinkler 包获得 pyjarowinkler 的工作原理如下 from pyjar
  • 数据框应用不接受轴参数

    我有两个数据框 data and rules gt gt gt data gt gt gt rules vendor rule 0 googel 0 google 1 google 1 dell 2 googly 2 macbook 我正在
  • Zeppelin:如何在 zeppelin 中重新启动 SparkContext

    我正在使用 zeppelins Spark 解释器的隔离模式 在这种模式下 它将为 Spark 集群中的每个笔记本启动一项新工作 我想在笔记本执行完成后通过 zeppelin 终止该作业 为此我做了sc stop这停止了 sparkCont
  • 当价格低于阈值时使用 pandas DataFrame 实施矢量化止损

    给出这个示例数据框 date close signal positions 2017 01 02 27 90 0 0 0 0 2017 01 03 27 76 0 0 0 0 2017 01 04 28 65 1 0 1 0 2017 01
  • Python matplotlib 在鼠标悬停时不显示完整日期

    我有一个数据框日期索引 and 温度值 Date Temperature 2015 10 21 9 118 2015 10 22 9 099 2015 10 23 8 945 2015 10 26 8 848 2015 10 27 8 84

随机推荐

  • CakePHP 2.0 账户激活后自动登录

    我正在研究我们新项目的用户管理组件 计划是 用户使用最少量的帐户数据 用户名 密码 电子邮件 在页面上注册 用户收到一封电子邮件 其中包含用于激活帐户的激活链接 用户点击链接并激活他的帐户 系统在激活后自动登录用户 并将其重定向到带有帐户信
  • 如何在页面加载时动态更改aspx页面的标题

    我有一组 ASPX 页面 其中每个页面都有不同的标题 但我想为没有标题的页面设置默认标题 默认标题必须是可配置的 如果这是经典的 ASP NET 不是 MVC 并且您正在使用MasterPage然后你可以设置默认标题Page Load事件在
  • PHP sqlsrv 查询数据库

    我从 MySQL 迁移到 MS SQL Server 并尝试从例程表中获取所有数据 我已连接 但不确定如何使用 sqlsrv 获取数据 这就是我已经走了多远 conn array array UID gt sa PWD gt root Da
  • 在选中列表框中创建选中项目的字符串数组

    如何使用 foreach 循环 或任何其他方式 创建一个包含 checklistbox 中选中项目的数组 我无法知道列表中的项目数量 假设您使用 3 5 或更高版本 object items lb CheckedItems OfType T
  • 凿子3.功能模块Mux4

    我正在按照文档学习 Chisel在 Github 上 https github com ucb bar chisel3 wiki Short 20Users 20Guide 20to 20Chisel 到目前为止 一切都完美无缺 但我还是卡
  • 使用 INSERT 和 AUTO-INCREMENT 列的 SQL 语句中出现错误

    INSERT INTO configuration VALUES News Box Character Count NEWS BOX CHAR COUNT 200 Set the number of characters bytes tha
  • 如何实现制表符补全

    我试图弄清楚如何在 C 应用程序中实现子命令的制表符补全 我希望它的功能与 Git 的制表符补全非常相似 我正在浏览 Git 的源代码 但它并没有引起我的注意 我已经搜索了实现选项卡完成的方法 但没有找到直接的答案 因此我猜测它可能不一定是
  • 如何将hashMap转换为Json文件

    我是偏向Java的 我必须使用 rpc 将 Hashmap 传输到服务器 HashMap Map
  • 在给定 sqlite 进度的情况下,在 Android 中存储图像的规范方法

    我完全清楚 通常不建议将图像作为 blob 存储在数据库中 但我最近遇到this https www sqlite org fasterthanfs html网站记录了 sqlite 在向数据库读取和写入图像 blob 方面的性能提升 简而
  • Android 错误:应用程序意外停止,请重试

    我制作了一个运行良好的应用程序 它显示应用程序已启动的次数 这是代码 import android app Activity import android content SharedPreferences import android o
  • “释放未使用的内核内存”从何而来?

    我经常看到Freeing unused kernel memory xxxK from dmesg 但在 grep rg 的帮助下我永远无法从内核源代码中找到此日志 它从何而来 该行文本不作为单个完整字符串存在 因此您无法对其进行 grep
  • 如何在 Android 的 Volley 中创建一个新的 newRequestQueue

    我有一个片段 我尝试实例化一个新的newRequestQueue使用 Volley API 我尝试像这样实例化它 RequestQueue queue Volley newRequestQueue this 但是 当我尝试创建请求时 出现以
  • 完全删除 Angular4 中的测试

    我使用构建了一个非常小的应用程序angular4 我有一个主应用程序组件 两个子组件和一项服务 我觉得我不需要对这么小的应用程序进行测试 并且想删除与使项目更干净相关的所有测试 所以我的问题是我可以从项目中删除哪些与测试相关的文件 我已经删
  • 选择当前聚焦的元素

    我想在整个文档中找到当前关注的元素 我尝试使用 focusjQuery 1 6 引入的伪类 document find focus But document find focus length总是返回0 您应该能够使用activeEleme
  • 这是矫枉过正,还是对 CakePHP 的 HTML 帮助器的良好利用?

    我刚刚重新格式化了 CakePHP 应用程序的默认布局 我通过将几乎所有内容都放在 html 帮助器方法中来消除尽可能多的内联 html 这很有趣 但我想知道我从这次练习中获得了什么好处 如果有的话
  • 将数组的每个对应元素转换为r中的向量

    我有大量数组 希望将所有这些数组中特定位置的所有元素转换为向量 也就是说 如果我有 2 个数组 如下所示 39 1 2 3 4 5 1 0 00000000 0 00000000 0 0000000 0 000000 0 2 0 06703
  • Ruby Timeout::timeout 不会引发异常,也不会返回记录的内容

    我有这段代码 begin complete results Timeout timeout 4 do results platform search artist album name end rescue Timeout Error pu
  • 如何在Camel路由中使用上下文路径?

    我是骆驼新手 我正在 spring boot 中做一个项目 使用camel作为路由 我注意到 当我去 SwaggerUi 查看 Post 调用的正确功能时 路由的 contextPath 不起作用 public void configure
  • 如何准确测量 C++ 函数使用的时钟周期?

    我知道我必须使用 rdtsc 测量的函数是确定性的 但结果远不能重复 每次运行我得到 5 的振荡 可能的原因有 上下文切换 缓存未命中 您还知道其他原因吗 如何消除它们 TSC 什么rdtsc使用 在多处理器系统上通常不同步 它可能有助于设
  • SparkSQL 超前/滞后函数中的动态/变量偏移

    我们可以以某种方式使用取决于 Spark SQL 中的领先 滞后函数中的列值的偏移值吗 示例 以下是运行良好的方法 val sampleData Seq bob Developer 125000 mark Developer 108000