pyspark 滞后函数(基于列)

2023-12-14

我想实现以下目标

lag(column1,datediff(column2,column3)).over(window)

偏移量是动态的。我也尝试过使用UDF,但没有成功。

有什么想法如何实现上述目标吗?


论点count of the lag函数采用整数而不是列对象:

psf.lag(col, count=1, default=None)

因此它不可能是一个“动态”值。 相反,您可以在列中构建滞后,然后将表与其自身连接起来。

首先让我们创建我们的数据框:

df = spark.createDataFrame(
    sc.parallelize(
        [[1, "2011-01-01"], [1, "2012-01-01"], [2, "2013-01-01"], [1, "2014-01-01"]]
    ), 
    ["int", "date"]
)

我们想要枚举行:

from pyspark.sql import Window
import pyspark.sql.functions as psf
df = df.withColumn(
    "id", 
    psf.monotonically_increasing_id()
)
w = Window.orderBy("id")
df = df.withColumn("rn", psf.row_number().over(w))
    +---+----------+-----------+---+
    |int|      date|         id| rn|
    +---+----------+-----------+---+
    |  1|2011-01-01|17179869184|  1|
    |  1|2012-01-01|42949672960|  2|
    |  2|2013-01-01|68719476736|  3|
    |  1|2014-01-01|94489280512|  4|
    +---+----------+-----------+---+

现在建立滞后:

df1 = df.select(
    "int", 
    df.date.alias("date1"), 
    (df.rn - df.int).alias("rn")
)
df2 = df.select(
    df.date.alias("date2"), 
    'rn'
)

最后我们可以加入它们并计算日期差:

df1.join(df2, "rn", "inner").withColumn(
    "date_diff", 
    psf.datediff("date1", "date2")
).drop("rn")

    +---+----------+----------+---------+
    |int|     date1|     date2|date_diff|
    +---+----------+----------+---------+
    |  1|2012-01-01|2011-01-01|      365|
    |  2|2013-01-01|2011-01-01|      731|
    |  1|2014-01-01|2013-01-01|      365|
    +---+----------+----------+---------+
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

pyspark 滞后函数(基于列) 的相关文章

  • Spark 中的广播 Annoy 对象(对于最近邻居)?

    由于 Spark 的 mllib 没有最近邻居功能 我正在尝试使用Annoy https github com spotify annoy为近似最近邻 我尝试广播 Annoy 对象并将其传递给工人 然而 它并没有按预期运行 下面是可重复性的
  • Spark SQL / PySpark 中的逆透视

    我手头有一个问题陈述 其中我想在 Spark SQL PySpark 中取消透视表 我已经浏览了文档 我可以看到仅支持pivot 但到目前为止还不支持取消透视 有什么方法可以实现这个目标吗 让我的初始表如下所示 When I pivotPy
  • 为什么 PySpark 中的 agg() 一次只能汇总 DataFrame 的一列? [复制]

    这个问题在这里已经有答案了 对于下面的数据框 df spark createDataFrame data Alice 4 300 Bob 7 677 schema name High 当我尝试找到最小值和最大值时 我只得到输出中的最小值 d
  • 列对象不可调用 Spark

    我尝试安装 Spark 并运行教程中给出的命令 但出现以下错误 https spark apache org docs latest quick start html https spark apache org docs latest q
  • 使用 pyspark 计算所有可能的单词对

    我有一个文本文档 我需要找到整个文档中重复单词对的可能数量 例如 我有下面的word文档 该文档有两行 每行用 分隔 文档 My name is Sam My name is Sam My name is Sam My name is Sa
  • 在 GPU 支持下对高维数据进行更快的 Kmeans 聚类

    我们一直在使用 Kmeans 来对日志进行聚类 典型的数据集有 10 mill 具有 100k 特征的样本 为了找到最佳 k 我们并行运行多个 Kmeans 并选择轮廓得分最佳的一个 在 90 的情况下 我们最终得到的 k 介于 2 到 1
  • 将嵌套字典键值转换为 pyspark 数据帧

    我有一个 Pyspark 数据框 如下所示 我想提取 dic 列中的那些嵌套字典并将它们转换为 PySpark 数据帧 像这样 请让我知道如何实现这一目标 Thanks from pyspark sql import functions a
  • pyspark加入多个条件

    我如何指定很多条件 当我使用pyspark时 join 例子 与蜂巢 query select a NUMCNT b NUMCNT as RNUMCNT a POLE b POLE as RPOLE a ACTIVITE b ACTIVIT
  • PySpark - 系统找不到指定的路径

    Hy 我已经多次运行 Spark Spyder IDE 今天我收到这个错误 代码是相同的 from py4j java gateway import JavaGateway gateway JavaGateway os environ SP
  • pyspark通过特定键加入rdd

    我有两个 rdd 需要将它们连接在一起 它们看起来像下面这样 RDD1 u 2 u 100 2 u 1 u 300 1 u 1 u 200 1 RDD2 u 1 u 2 u 1 u 3 我想要的输出是 u 1 u 2 u 100 2 所以我
  • 在 Jupyter 笔记本中使用 PySpark 读取 XML

    我正在尝试读取 XML 文件 df spark read format com databricks spark xml load path to my xml 并收到以下错误 java lang ClassNotFoundExceptio
  • pyspark:聚合列中最常见的值

    aggregrated table df input groupBy city income bracket agg count suburb alias suburb sum population alias population sum
  • 使用notebook时将jar添加到pyspark

    我正在尝试将 mongodb hadoop 与 Spark 集成 但不知道如何使 IPython 笔记本可以访问这些 jar 这是我想做的 set up parameters for reading from MongoDB via Had
  • Spark Workers 上缺少 SLF4J 记录器

    我正在尝试通过以下方式运行工作spark submit 此作业导致的错误是 Exception in thread main java lang NoClassDefFoundError org slf4j Logger at java l
  • 按年份进行透视并获取 2020 年以来的金额总和

    我有这样的数据 我想按年份旋转并仅显示 2020 年以来的总数 我该如何实现这一目标 您可以使用以下方法实现此目的PIVOT https spark apache org docs 3 2 1 api python reference ap
  • 如何从本地模式下运行的 pyspark 中的 S3 读取数据?

    我正在使用 PyCharm 2018 1 使用 Python 3 4 并通过 virtualenv 中的 pip 安装 Spark 2 3 本地主机上没有安装hadoop 因此没有安装Spark 因此没有SPARK HOME HADOOP
  • Spark、pyspark中从TF-IDF到LDA聚类

    我正在尝试对存储在格式键 listofwords 中的推文进行聚类 我的第一步是使用 dataframe 提取单词列表的 TF IDF 值 dbURL hdfs pathtodir file sc textFile dbURL Define
  • Apache Spark 和 scikit_learn 之间的 KMeans 结果不一致

    我正在使用 PySpark 对数据集执行聚类 为了找到簇的数量 我对一系列值 2 20 进行了聚类 并找到了wsse 簇内平方和 每个值的值k 在这里我发现了一些不寻常的东西 根据我的理解 当你增加集群数量时 wsse单调递减 但我得到的结
  • PySpark DataFrame 上分组数据的 Pandas 式转换

    如果我们有一个由一列类别和一列值组成的 Pandas 数据框 我们可以通过执行以下操作来删除每个类别中的平均值 df DemeanedValues df groupby Category Values transform lambda g
  • 如何在 pySpark 数据框中添加行 ID [重复]

    这个问题在这里已经有答案了 我有一个 csv 文件 我在 pyspark 中将其转换为 DataFrame df 经过一番改造后 我想在 df 中添加一列 这应该是简单的行 ID 从 0 或 1 开始到 N 我将 df 转换为 rdd 并使

随机推荐

  • 原生 javascript 中的 jQuery index()

    根据 jQuery api get 的补充操作 它接受索引并返回 DOM 节点 index 可以获取 DOM 节点并返回索引 假设我们页面上有一个简单的无序列表 ul li foo li li bar li li baz li ul ind
  • 如何用假测试模块替换 Guice 模块进行单元测试?

    这是我们的使用方式Guice在新的应用程序中 public class ObjectFactory private static final ObjectFactory instance new ObjectFactory private
  • 在 Qt 中创建旋转进度条

    Create Spin Progress bar in Qt I want to show progress bar like the one which appears while loading Please Find Image 我的
  • DNS 消息是否将名称填充为偶数字节?

    初步说明 是的 我知道 DNS 库和易于使用的 DNS 服务器的存在 我这样做纯粹是为了学术目的 并了解 DNS 查询的工作原理 问题 我在看RFC 1035了解 DNS 消息的工作原理 我想我或多或少理解了该备忘录中的所有内容 然而 有一
  • pygame 与海龟崩溃

    我收到此错误代码 2017 04 13 03 04 14 958 Python 606 839244 SDLApplication setup unrecognized selector sent to instance 0x1007a0d
  • SQL Server 2005 中的临时表不会自动删除

    我正在对一个令人讨厌的存储过程进行故障排除 并注意到在运行它之后 并且我已经关闭了会话 许多临时表仍然留在 tempdb 中 他们的名字如下 000E262B 002334C4 004E1D4D 00583EEE 00783A7F 0083
  • 使用 MTKTextureLoader 加载远程图像

    我正在尝试将远程图像加载到MTLTexture用这个代码 let textureLoader MTKTextureLoader device device textureLoader newTexture withContentsOf ur
  • 从Github API获取特定的README.md数据

    最近 我开始尝试使用 GitHub API 从公共存储库获取特定数据 长话短说 我想从存储库中获取 README md 文件的特定部分 例如 来自 Facebook 的 React 存储库的 master 分支 我想抓取下面的文本文档Git
  • 最小列总和差是多少? [关闭]

    Closed 这个问题需要多问focused 目前不接受答案 想象一下 给定一个正整数矩阵 最大 25 15 数字值不超过 3000000 当您进行列求和并选择最小和最大的一项时 它们之间的差异必须尽可能小 您可以根据需要交换每行中的数字
  • 以 HTML 形式显示的 LaTeX 表格

    我有以下 LateX 表 在以下情况下按预期呈现format pdf title Test Table format pdf begin center begin tabular l l l hline Var Class Descript
  • 用户名网址,例如 twitter 和 facebook

    twitter 和 facebook 等如何制作唯一的 url 例如 twitter com billgates 如果我想做同样的事情 为我的用户提供带有用户名的唯一网址 它是一个应用程序还是您为每个用户创建一个带有索引页面的目录 顺便说一
  • InvalidParameterValueException:无法访问流

    我正在尝试使用 Terraform 创建 dynamodb 表和 lambda 触发器 这就是我定义表 角色策略和 lambda 触发器的方式 resource aws dynamodb table filenames name local
  • 如何更新数组中嵌套的数组中的项目

    我通过最新的 C 驱动程序 此时为 v2 7 0 使用 MongoDB 4 0 我有一份文件 其中有Options and Options have Inventory 换句话说 库存数组嵌套在选项数组中 如何了解库存水平并仅更新库存 以下
  • 如何读取和修改已绘制的 Google 图表上的轴?

    我有一个页面 上面有几个谷歌图表 主要是组合图和折线图 例如 chart new google visualization LineChart chartDiv 绘制页面后 我希望能够读取轴上的最大值 然后重新绘制图表 以便它们在轴上都具有
  • 当存在多个路由时,使用查询字符串进行路由属性路由

    我有这个 HttpGet Route Cats public IHttpActionResult GetByCatId int catId HttpGet Route Cats public IHttpActionResult GetByN
  • 在 Ant 复制任务过滤器中获取文件名

    是否可以获取 Ant 复制任务中正在复制的当前文件名 我正在尝试运行 beanshell 脚本并希望访问当前文件名
  • 错误:无法在属性初始值设定项中使用实例成员 - Swift 3

    当我编译以下代码时 出现错误 无法在属性初始值设定项中使用实例成员 AddEployeeName 属性初始值设定项在 self 可用之前运行 你能帮忙解决这个错误吗 该计划允许员工输入姓名并拍摄照片 class AddEmployeeVie
  • 具有复杂形状的 ggplot 和 grid.picture 之间的区别

    我希望获得单个字母的 x y 坐标并用 ggplot 绘制它们 我在用grImport PostScriptTrace从 Postscript 文件获取 XML 文件 从那里 我从 Picture 类的 S4 对象中提取 x y 坐标 绘制
  • 断言 UITest 中的按钮上存在图像

    我在 ZStack 中有一个带有前景图像的按钮 Button action self highlighted self highlighted ZStack Text Text if self highlighted Image highl
  • pyspark 滞后函数(基于列)

    我想实现以下目标 lag column1 datediff column2 column3 over window 偏移量是动态的 我也尝试过使用UDF 但没有成功 有什么想法如何实现上述目标吗 论点count of the lag函数采用