Spark - 带有递归的窗口? - 有条件地跨行传播值

2023-12-08

我有以下数据框显示购买收入。

+-------+--------+-------+
|user_id|visit_id|revenue|
+-------+--------+-------+
|      1|       1|      0|
|      1|       2|      0|
|      1|       3|      0|
|      1|       4|    100|
|      1|       5|      0|
|      1|       6|      0|
|      1|       7|    200|
|      1|       8|      0|
|      1|       9|     10|
+-------+--------+-------+

最终我想要新专栏purch_revenue显示每行购买产生的收入。 作为解决方法,我还尝试引入购买标识符purch_id每次购买时都会增加。所以列出来只是作为参考。

+-------+--------+-------+-------------+--------+
|user_id|visit_id|revenue|purch_revenue|purch_id|
+-------+--------+-------+-------------+--------+
|      1|       1|      0|          100|       1|
|      1|       2|      0|          100|       1|
|      1|       3|      0|          100|       1|
|      1|       4|    100|          100|       1|
|      1|       5|      0|          100|       2|
|      1|       6|      0|          100|       2|
|      1|       7|    200|          100|       2|
|      1|       8|      0|          100|       3|
|      1|       9|     10|          100|       3|
+-------+--------+-------+-------------+--------+

我尝试过使用lag/lead像这样的函数:

user_timeline = Window.partitionBy("user_id").orderBy("visit_id")
find_rev = fn.when(fn.col("revenue") > 0,fn.col("revenue"))\ 
  .otherwise(fn.lead(fn.col("revenue"), 1).over(user_timeline))
df.withColumn("purch_revenue", find_rev)

这会重复收入列,如果revenue > 0并且还将其拉起一排。显然,我可以将其链接到有限的 N,但这不是解决方案。

  • 有没有办法递归地应用这个直到revenue > 0?
  • 或者,有没有办法根据条件增加值?我试图找出一种方法来做到这一点,但很难找到。

窗口函数不支持递归,但这里不需要。这种类型的分段可以通过累积和轻松处理:

from pyspark.sql.functions import col, sum, when, lag
from pyspark.sql.window import Window

w = Window.partitionBy("user_id").orderBy("visit_id")
purch_id = sum(lag(when(
    col("revenue") > 0, 1).otherwise(0), 
    1, 0
).over(w)).over(w) + 1

df.withColumn("purch_id", purch_id).show()
+-------+--------+-------+--------+
|user_id|visit_id|revenue|purch_id|
+-------+--------+-------+--------+
|      1|       1|      0|       1|
|      1|       2|      0|       1|
|      1|       3|      0|       1|
|      1|       4|    100|       1|
|      1|       5|      0|       2|
|      1|       6|      0|       2|
|      1|       7|    200|       2|
|      1|       8|      0|       3|
|      1|       9|     10|       3|
+-------+--------+-------+--------+
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark - 带有递归的窗口? - 有条件地跨行传播值 的相关文章

随机推荐

  • 我只需要重写 Toast 类的 show()

    我只需要覆盖show 方法为Toast班级 我创建了一个扩展的类Toast类 但随后我创建了一条 toast 消息 但出现异常setView View view 还没有被调用 但我不想创建自定义View方法 但使用默认方法 那么 如何才能覆
  • SecurityAttribute.Unrestricted 问题

    我对这个属性感到困惑 正如这里提到的 http msdn microsoft com en us library system security permissions securityattribute unrestricted aspx
  • 使用自定义 CSS 将 HTML 文件加载到 WebView

    我的 Android 应用程序上有一个 WebView 可以加载 WebView loadUrl 手机内部存储中的不同本地 HTML 文件 我想为它们添加一些自定义 css 样式 现在 我可以让我的应用程序编辑每个 HTML 文件并添加 C
  • scipy curve_fit 不喜欢数学模块

    在尝试创建一个示例时scipy optimize curve fit我发现scipy似乎与Python的不兼容math模块 而函数f1工作正常 f2抛出错误消息 from scipy optimize import curve fit fr
  • 添加到词典的不同方式

    有什么区别Dictionary add key value and Dictionary key value 我注意到最后一个版本没有抛出ArgumentException插入重复密钥时 但有什么理由更喜欢第一个版本 Edit 请问谁有这方
  • 警告:无法访问代码,使用 Reactjs

    我正在使用 ReactJs 我有两个组件 PrescriptionIndex 和 PrescriptionNew 将一个组件与另一个组件集成 这是我的第一个组件 PrescriptionNew import React Component
  • 为什么我的 Java 中的 PriorityBlockingQueue 无法正确排序?

    由于某种原因 当我添加到优先级队列时 它不会完全按字母顺序对我的字符串进行排序 我不明白为什么 这是添加到 PriorityBlockingQueue 的代码 String toAdd String format s s directory
  • 在 JTextField 上设置光标

    我正在用 Java 制作一个使用 JTextField 的小型应用程序 现在 我希望 一旦运行应用程序 光标就会自动放置在该位置 以便用户不必单击它然后编写文本 我已经尝试了在网上找到的几乎所有内容 setCaretPosition 0 g
  • 如何使用 cmake 将 git SHA1 作为定义传递给编译器?

    在 Makefile 中 这可以通过以下方式完成 g DGIT SHA1 git log 1 head n 1 这非常有用 因为二进制文件知道确切的提交 SHA1 因此可以在发生段错误时转储它 我怎样才能用 CMake 达到同样的效果 我制
  • 在 Python 中保存图像的最佳格式是什么,为什么 pydot 会保存无效图像?

    我在用pydot在我的机器上保存图像文件 然后使用图像模块打开该文件 但是 我使用时遇到错误Windows Photo viewer 该图像无法打开 因为它已被删除或位于不可用的位置 这是不正确的 因为图像被保存到我的桌面上 有时 图片确实
  • redis 好像服务器已经关闭了连接

    我想使用redis sub pub 但是当我订阅一个频道时 2分钟后 控制台输出异常 服务器似乎已关闭连接 redis版本 redis 3 0 3 jedis版本 2 3 0 操作系统 OS X Yosemite 10 10 5 订阅 cl
  • 连接到 Firebird 3 的 C# 程序中的“连接被远程接口拒绝”

    从 Firebird 2 5 迁移到 3 0 后 当我尝试使用 C 程序测试数据库连接时 会出现此错误 连接被远程接口拒绝 这是用于测试连接的代码 当我尝试连接到 firebird 2 5 数据库时我使用此代码 txtPassword Pr
  • 使用 PHP 访问 Windows 共享

    我需要使用 PHP 访问 Windows 共享上的 Excel 文件 但似乎遇到了身份验证问题 我在用着PHP ExcelReader打开并读取文件 在我的本地计算机上工作正常 但我放置它的服务器无权访问此共享 因此它告诉我该路径不可读 我
  • 如何使用 OleDB 避免 Excel 文件中的空行?

    我开始使用OleDB 我正在尝试从 Excel 文件 C 中 中提取大量随机数据 这是我的查询 SELECT FROM NAMED RANGE 但我有很多空行 我循环删除它们 但我不太喜欢它 是否可以仅选择至少填充了一个单元格的行 此致 A
  • 连接 MongoDB 中的两个集合

    我是 mongoDB 的初学者 我有两本收藏书和作者 姓名和作品 分别是公共栏 使用内部联接 我必须发出 Book 和 Author 中的一些列 就像这个 sql 查询 select book name book editions book
  • 用于单击网页按钮的 Python 脚本

    我有一个 python 脚本 它使用 requests 库将数据发送到 django 应用程序 然后用户切换到网页并单击一个按钮来获取编辑表单以添加一些附加信息 我希望请求收到状态代码 200 后立即切换到网页并自动单击按钮 而不是用户每次
  • 如何从 json 结果中快速删除可选文本

    我正在使用 newsApi 从中获取新闻列表 我根据新闻的属性创建了模型 所有属性在模型中都是可选的 当我解析它时 它会打印到控制台获取结果 但所有字段都有带有可选文本的数据 我创建了三个基于新闻 api 字段的结构 它们就像 struct
  • 检索数据库中所有表的列表

    我在通常的数据库文件夹下有这个数据库文件 里面有一堆表 请注意 我不想在命令行下检索 我知道我可以使用 tables 命令 我想使用代码检索其中所有表的列表 以便我可以对每个表执行一些特定的算法 另外 我在Android上没有发现任何与此问
  • 服务器发送的事件不适用于球衣 SSE

    我正在尝试使用来自 Jersey 的 JavaScript SSE 我的资源中有以下代码 我在 Java7 和 Tomcat 7 上托管 我没有收到任何错误 但我在页面上也看不到数据 I call broadcast发布数据 它确实显示消息
  • Spark - 带有递归的窗口? - 有条件地跨行传播值

    我有以下数据框显示购买收入 user id visit id revenue 1 1 0 1 2 0 1 3 0 1 4 100 1 5 0 1 6 0 1 7 200 1 8 0 1 9 10