插入缺失的日期行并在新行中插入旧值 PySpark

2024-02-05

我有一个 DataFrame,其中包含一个人、一个体重和时间戳,如下所示:

+-----------+-------------------+------+
|     person|          timestamp|weight|
+-----------+-------------------+------+
|          1|2019-12-02 14:54:17| 49.94|
|          1|2019-12-03 08:58:39| 50.49|
|          1|2019-12-06 10:44:01| 50.24|
|          2|2019-12-02 08:58:39| 62.32|
|          2|2019-12-04 10:44:01| 65.64|
+-----------+-------------------+------+

我想填写这样的内容,让每个人每个日期都有一些东西,这意味着上面的内容应该是:

+-----------+-------------------+------+
|     person|          timestamp|weight|
+-----------+-------------------+------+
|          1|2019-12-02 14:54:17| 49.94|
|          1|2019-12-03 08:58:39| 50.49|
|          1|2019-12-04 00:00:01| 50.49|
|          1|2019-12-05 00:00:01| 50.49|
|          1|2019-12-06 10:44:01| 50.24|
|          1|2019-12-07 00:00:01| 50.24|
|          1|2019-12-08 00:00:01| 50.24|
|          2|2019-12-02 08:58:39| 62.32|
|          2|2019-12-03 00:00:01| 62.32|
|          2|2019-12-04 10:44:01| 65.64|
|          2|2019-12-05 00:00:01| 65.64|
|          2|2019-12-06 00:00:01| 65.64|
|          2|2019-12-07 00:00:01| 65.64|
|          2|2019-12-08 00:00:01| 65.64|
+-----------+-------------------+------+

我定义了一个新表,使用datediff包含最小日期和最大日期之间的所有日期:

min_max_date = df_person_weights.select(min("timestamp"), max("timestamp")) \
        .withColumnRenamed("min(timestamp)", "min_date") \
        .withColumnRenamed("max(timestamp)", "max_date")

min_max_date = min_max_date.withColumn("datediff", datediff("max_date", "min_date")) \
        .withColumn("repeat", expr("split(repeat(',', datediff), ',')")) \
        .select("*", posexplode("repeat").alias("date", "val")) \
        .withColumn("date", expr("date_add(min_date, date)"))

这给了我一个新的 DataFrame,其中包含如下日期:

+----------+
|      date|
+----------+
|2019-12-03|    
|2019-12-03|
|2019-12-04|
|2019-12-05|
|2019-12-06|
|2019-12-07|
|2019-12-08|
+----------+

我尝试过不同的连接,例如:

min_max_date.join(df_price_history, min_max_date.date != df_price_history.date, "leftouter")

但我没有得到我需要的结果,有人可以帮忙吗?如何合并我现在拥有的信息?


您正在寻找前向填充数据集。这变得有点复杂,因为您需要按类别(人)进行操作。

一种方法是这样的:创建一个新的 DataFrame,其中包含您想要为每个人赋值的所有日期(见下文,这只是dates_by_person).

然后,将原始 DataFrame 左连接到此 DataFrame,以便开始创建缺失的行。

接下来,使用加窗函数在每组中查找person,按日期排序,最后一个非空权重。如果每个日期可以有多个条目(因此一个人在一个特定日期有多个填写的记录),您还必须按时间戳列进行排序。

最后合并列,以便任何空字段都被预期值替换。

from datetime import datetime, timedelta
from itertools import product

import pyspark.sql.functions as psf
from pyspark.sql import Window

data = (  # recreate the DataFrame
    (1, datetime(2019, 12, 2, 14, 54, 17), 49.94),
    (1, datetime(2019, 12, 3, 8, 58, 39), 50.49),
    (1, datetime(2019, 12, 6, 10, 44, 1), 50.24),
    (2, datetime(2019, 12, 2, 8, 58, 39), 62.32),
    (2, datetime(2019, 12, 4, 10, 44, 1), 65.64))
df = spark.createDataFrame(data, schema=("person", "timestamp", "weight"))

min_max_timestamps = df.agg(psf.min(df.timestamp), psf.max(df.timestamp)).head()
first_date, last_date = [ts.date() for ts in min_max_timestamps]
all_days_in_range = [first_date + timedelta(days=d)
                     for d in range((last_date - first_date).days + 1)]
people = [row.person for row in df.select("person").distinct().collect()]
dates_by_person = spark.createDataFrame(product(people, all_days_in_range),
                                        schema=("person", "date"))

df2 = (dates_by_person.join(df,
                            (psf.to_date(df.timestamp) == dates_by_person.date)
                            & (dates_by_person.person == df.person),
                            how="left")
       .drop(df.person)
       )
wind = (Window
        .partitionBy("person")
        .rangeBetween(Window.unboundedPreceding, -1)
        .orderBy(psf.unix_timestamp("date"))
        )
df3 = df2.withColumn("last_weight",
                     psf.last("weight", ignorenulls=True).over(wind))
df4 = df3.select(
    df3.person,
    psf.coalesce(df3.timestamp, psf.to_timestamp(df3.date)).alias("timestamp"),
    psf.coalesce(df3.weight, df3.last_weight).alias("weight"))
df4.show()
# +------+-------------------+------+
# |person|          timestamp|weight|
# +------+-------------------+------+
# |     1|2019-12-02 14:54:17| 49.94|
# |     1|2019-12-03 08:58:39| 50.49|
# |     1|2019-12-04 00:00:00| 50.49|
# |     1|2019-12-05 00:00:00| 50.49|
# |     1|2019-12-06 10:44:01| 50.24|
# |     2|2019-12-02 08:58:39| 62.32|
# |     2|2019-12-03 00:00:00| 62.32|
# |     2|2019-12-04 10:44:01| 65.64|
# |     2|2019-12-05 00:00:00| 65.64|
# |     2|2019-12-06 00:00:00| 65.64|
# +------+-------------------+------+

编辑:正如大卫在评论中建议的那样,如果你有很多人,那么构建dates_by_people不需要将所有东西都交给驾驶员即可完成。在这个例子中,我们讨论的是少量的整数,没什么大的。但如果它变大,请尝试:

dates = spark.createDataFrame(((d,) for d in all_days_in_range),
                              schema=("date",))
people = df.select("person").distinct()
dates_by_person = dates.crossJoin(people)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

插入缺失的日期行并在新行中插入旧值 PySpark 的相关文章

随机推荐

  • Ubuntu 11.10 上的 OpenCV

    我刚刚将系统从 ubuntu 11 04 更新到 11 10 现在我无法再编译任何包含 OpenCV 库引用的 C 程序 我已经尝试重新安装 OpenCV 我使用2 1版本 但我遇到了这个错误 tmp ccArHTZL o In funct
  • 如何按百分比设置 svg 宽度和 svg 高度?

    我用 svg 创建了一条线 但是当我调整浏览器大小时 用 svg 创建的线没有调整大小 我尝试以百分比设置 svg 的宽度 但这样做后该线不会出现 如何按百分比设置 svg 的宽度
  • Stream.dropWhile() 没有以两个不同的值返回正确的值

    我正在尝试学习 Java 9 中的新功能 我开始了解 Stream 的 dropWhile 方法 但它在两种不同的场景中返回不同的值 这是我的代码 package src module import java util stream Col
  • 从模态窗口打开的帮助文件没有响应

    使用Delphi XE2 Win64 因此 我有一个包含许多表单的大型应用程序 如果我从主表单打开帮助文件并打开模式窗口 然后按 F1 在模式窗口上触发上下文相关帮助 帮助文件窗口将显示正确的信息 但是在关闭模式窗口之前 无法关闭帮助文件
  • Devise + Omniauth 登录 Facebook 时调用操作通道

    当我尝试使用 Omniauth 和 Devise 登录 Facebook 时 passthru被称为而不是facebook 我如何通过link to user omniauth authorize path facebook 我多次修改代码
  • Web Worker 与 Promise

    为了使 Web 应用程序具有响应能力 您可以使用异步非阻塞请求 我可以设想两种方法来实现这一目标 一种是使用 deferreds promise 另一个是网络工作者 对于 Web Workers 我们最终引入了另一个流程 并且产生了来回整理
  • PHP 错误日志已停止工作。它确实起作用了

    它已经工作了很长时间并且停止了 我一定错过了一些明显的东西 File etc php5 apache2 php ini相关设置有 display errors On I am not sure if this makes a differe
  • NSUndoManager:重做不起作用

    我正在制作一个使用 NSSlider 的简单应用程序 可以使用两个按钮将其设置为最大值或最小值 撤消管理器应跟踪所有更改并允许撤消 重做使用这两个按钮所做的所有更改 这是界面 import
  • 生成一个范围内的随机偶数?

    这是我遵循的格式 int randomNum rand nextInt max min 1 min 这是我的代码 我正在尝试获取 1 到 100 之间的随机偶数 Random rand new Random int randomNum ra
  • 如果我想使用 gitignore 中的文件怎么办

    对于敏感数据 例如 aws 密钥或密码 我将它们放入 gitignore 中的文件中 以确保它不会提交到 git 但是 当脚本运行时要使用按键时 我该怎么办 运行前手动添加文件中的关键内容 如果程序需要由Jenkins定期触发怎么办 谁能帮
  • Django:操作错误没有这样的表

    我正在使用 Django CMS 的 Django 项目中构建一个相当简单的应用程序 Research 这是我对项目 应用程序的第一次尝试 它的主要目的是存储各种知识资产 即研究人员撰写的文章 书籍等 问题是当我将浏览器指向 researc
  • Visual Studio 监视中的向量“无运算符“[]”与这些操作数匹配”错误

    在 Visual Studio 2012 中单步执行以下示例代码时 std vector
  • 如何在 OpenGL 中对 glutSolidTorus 进行纹理处理? [关闭]

    Closed 这个问题需要调试细节 help minimal reproducible example 目前不接受答案 我正在尝试纹理glutSolidTorus 这是我的代码 glColor3f 1 0f 1 0f 1 0f glEnab
  • UIButton 垂直对齐不起作用

    我不明白为什么在下面的代码中 标题对齐没有保持在顶部 UIButton btn2 UIButton buttonWithType UIButtonTypeRoundedRect btn2 titleLabel font UIFont sys
  • 编辑主键

    如果表只包含主键字段 是否可以在 MVC3 中编辑主键 例如 我有一个控制台表 其中我将控制台名称作为主键 我希望能够编辑它并更改它并保存编辑后的值 如果您需要更多信息 请告诉我 作为一般规则 您永远不应该编辑主键 SQL Server 中
  • Jquery:如何向 mouseleave 添加延迟,以便如果有人无意间将鼠标悬停在元素上,它仍然保持打开状态

    悬停意图插件与我需要的相反 我有一个由 trigger 触发的 popup 当我将鼠标悬停在其上时 我希望 popup 在几秒钟内不淡出 但如果我将鼠标悬停 然后再次悬停 则取消将要发生的淡出并保持 popup 打开 有谁知道我会怎么做 这
  • 反应式扩展超时不会停止序列?

    我正在尝试做一个IObservable
  • 为我的数据库 mysql 中的每一行调用 php 脚本

    如果这是一个愚蠢的问题 我很抱歉 但我现在不知道如何为表中的每一行调用 php 脚本 我正在使用mysql 我以后会使用PDO 我知道它更好 这是我的 选择 页面 我在其中选择所需的行
  • Printer_open() 打印 html 输出

    With printer open php 中的函数 我可以打印我保存的字符串 content变量 并且能够从文件打印 printer Pserver php net printername handler printer open pri
  • 插入缺失的日期行并在新行中插入旧值 PySpark

    我有一个 DataFrame 其中包含一个人 一个体重和时间戳 如下所示 person timestamp weight 1 2019 12 02 14 54 17 49 94 1 2019 12 03 08 58 39 50 49 1 2