PySpark:如何分组、重新采样和前向填充空值?

2024-03-12

考虑以下数据集in Spark,我想以特定频率(例如 5 分钟)对日期重新采样。

START_DATE = dt.datetime(2019,8,15,20,33,0)
test_df = pd.DataFrame({
    'school_id': ['remote','remote','remote','remote','onsite','onsite','onsite','onsite','remote','remote'],
    'class_id': ['green', 'green', 'red', 'red', 'green', 'green', 'green', 'green', 'red', 'green'],
    'user_id': [15,15,16,16,15,17,17,17,16,17],
    'status': [0,1,1,1,0,1,0,1,1,0],
    'start': pd.date_range(start=START_DATE, periods=10, freq='2min')
})

test_df.groupby(['school_id', 'class_id', 'user_id', 'start']).min()

不过,我还希望在两个特定日期范围之间进行重新采样:2019-08-15 20:30:00 and 2019-08-15 21:00:00。所以每组school_id, class_id and user_id将有 6 个条目,两个日期范围之间每 5 分钟存储一个条目。 这null重采样生成的条目应通过前向填充进行填充。

我使用 Pandas 作为示例数据集,但实际的数据帧将在 Spark 中提取,因此我正在寻找的方法也应该在 Spark 中完成。

我想这种方法可能与此类似PySpark:如何重新采样频率 https://stackoverflow.com/questions/39271374/pyspark-how-to-resample-frequencies但我无法让它在这种情况下工作。

感谢您的帮助


这可能不是获得最终结果的最佳方法,但只是想在这里展示这个想法。

  1. 首先,创建 DataFrame 并将时间戳转换为整数
from datetime import datetime
import pytz
from pytz import timezone

# Create DataFrame
START_DATE = datetime(2019,8,15,20,33,0)
test_df = pd.DataFrame({
    'school_id': ['remote','remote','remote','remote','onsite','onsite','onsite','onsite','remote','remote'],
    'class_id': ['green', 'green', 'red', 'red', 'green', 'green', 'green', 'green', 'red', 'green'],
    'user_id': [15,15,16,16,15,17,17,17,16,17],
    'status': [0,1,1,1,0,1,0,1,1,0],
    'start': pd.date_range(start=START_DATE, periods=10, freq='2min')
})

# Convert TimeStamp to Integers
df = spark.createDataFrame(test_df)
print(df.dtypes)
df = df.withColumn('start', F.col('start').cast("bigint"))
df.show()

这输出:

+---------+--------+-------+------+----------+
|school_id|class_id|user_id|status|     start|
+---------+--------+-------+------+----------+
|   remote|   green|     15|     0|1565915580|
|   remote|   green|     15|     1|1565915700|
|   remote|     red|     16|     1|1565915820|
|   remote|     red|     16|     1|1565915940|
|   onsite|   green|     15|     0|1565916060|
|   onsite|   green|     17|     1|1565916180|
|   onsite|   green|     17|     0|1565916300|
|   onsite|   green|     17|     1|1565916420|
|   remote|     red|     16|     1|1565916540|
|   remote|   green|     17|     0|1565916660|
+---------+--------+-------+------+----------+
  1. 创建您想要的时间序列
# Create time sequece needed
start = datetime.strptime('2019-08-15 20:30:00', '%Y-%m-%d %H:%M:%S')
eastern = timezone('US/Eastern')
start = eastern.localize(start)
times = pd.date_range(start = start, periods = 6, freq='5min')
times = [s.timestamp() for s in times]
print(times)
[1565915400.0, 1565915700.0, 1565916000.0, 1565916300.0, 1565916600.0, 1565916900.0]
  1. 最后,为每个组创建数据框
# Use pandas_udf to create final DataFrame
schm = StructType(df.schema.fields + [StructField('epoch', IntegerType(), True)])
@pandas_udf(schm, PandasUDFType.GROUPED_MAP)
def resample(pdf):
    pddf = pd.DataFrame({'epoch':times})
    pddf['school_id'] = pdf['school_id'][0]
    pddf['class_id'] = pdf['class_id'][0]
    pddf['user_id'] = pdf['user_id'][0]


    res = np.searchsorted(times, pdf['start'])
    arr = np.zeros(len(times))
    arr[:] = np.nan
    arr[res] = pdf['start']
    pddf['status'] = arr

    arr[:] = np.nan
    arr[res] = pdf['status']
    pddf['start'] = arr
    return pddf

df = df.groupBy('school_id', 'class_id', 'user_id').apply(resample)
df = df.withColumn('timestamp', F.to_timestamp(df['epoch']))
df.show(60)

最终结果:

+---------+--------+-------+----------+-----+----------+-------------------+
|school_id|class_id|user_id|    status|start|     epoch|          timestamp|
+---------+--------+-------+----------+-----+----------+-------------------+
|   remote|     red|     16|      null| null|1565915400|2019-08-15 20:30:00|
|   remote|     red|     16|      null| null|1565915700|2019-08-15 20:35:00|
|   remote|     red|     16|1565915940|    1|1565916000|2019-08-15 20:40:00|
|   remote|     red|     16|      null| null|1565916300|2019-08-15 20:45:00|
|   remote|     red|     16|1565916540|    1|1565916600|2019-08-15 20:50:00|
|   remote|     red|     16|      null| null|1565916900|2019-08-15 20:55:00|
|   onsite|   green|     15|      null| null|1565915400|2019-08-15 20:30:00|
|   onsite|   green|     15|      null| null|1565915700|2019-08-15 20:35:00|
|   onsite|   green|     15|      null| null|1565916000|2019-08-15 20:40:00|
|   onsite|   green|     15|1565916060|    0|1565916300|2019-08-15 20:45:00|
|   onsite|   green|     15|      null| null|1565916600|2019-08-15 20:50:00|
|   onsite|   green|     15|      null| null|1565916900|2019-08-15 20:55:00|
|   remote|   green|     17|      null| null|1565915400|2019-08-15 20:30:00|
|   remote|   green|     17|      null| null|1565915700|2019-08-15 20:35:00|
|   remote|   green|     17|      null| null|1565916000|2019-08-15 20:40:00|
|   remote|   green|     17|      null| null|1565916300|2019-08-15 20:45:00|
|   remote|   green|     17|      null| null|1565916600|2019-08-15 20:50:00|
|   remote|   green|     17|1565916660|    0|1565916900|2019-08-15 20:55:00|
|   onsite|   green|     17|      null| null|1565915400|2019-08-15 20:30:00|
|   onsite|   green|     17|      null| null|1565915700|2019-08-15 20:35:00|
|   onsite|   green|     17|      null| null|1565916000|2019-08-15 20:40:00|
|   onsite|   green|     17|1565916180|    1|1565916300|2019-08-15 20:45:00|
|   onsite|   green|     17|1565916420|    1|1565916600|2019-08-15 20:50:00|
|   onsite|   green|     17|      null| null|1565916900|2019-08-15 20:55:00|
|   remote|   green|     15|      null| null|1565915400|2019-08-15 20:30:00|
|   remote|   green|     15|1565915580|    0|1565915700|2019-08-15 20:35:00|
|   remote|   green|     15|      null| null|1565916000|2019-08-15 20:40:00|
|   remote|   green|     15|      null| null|1565916300|2019-08-15 20:45:00|
|   remote|   green|     15|      null| null|1565916600|2019-08-15 20:50:00|
|   remote|   green|     15|      null| null|1565916900|2019-08-15 20:55:00|
+---------+--------+-------+----------+-----+----------+-------------------+

现在,您将获得每组 6 个时间戳。 请注意,并非所有原始的“状态”和“开始”都映射到最终的 DataFrame,这是因为在resampleudf,它发生在5minute间隔,两个“开始”时间可以映射到同一时间网格点,您在这里会丢失一个。这可以在udf根据您的频率以及您希望如何保存数据。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

PySpark:如何分组、重新采样和前向填充空值? 的相关文章

随机推荐

  • 使用转换器和提供程序而不是属性映射时如何让 ModelMapper.validate() 成功?

    有类似的东西 Getter Setter public static class Entity private int hash private LocalDateTime createdTime and Getter Setter pub
  • 调用 oncontextmenu 时 Firefox 浏览器的事件未定义

    我在右键单击页面中的按钮时显示上下文菜单 用于显示上下文菜单的代码是 window addEventListener contextmenu function e e preventDefault false 当我右键单击按钮时 调用的上下
  • 解决 git merge octopus 上的冲突

    章鱼合并是指合并2个以上的头 用户应该如何使用普通的三向合并工具比较文件 3 个文件被传递到合并工具 BASE LOCAL REMOTE 3 个头需要多少个以及什么组合 在这种情况下 您不应该使用章鱼合并 八达通合并仅应在不存在合并冲突或冲
  • setInterval 使用 Electron 一小时后停止

    希望有人能在这里指出我正确的方向 我的电子应用程序需要每 10 分钟左右执行一次 API 调用 目前 我在渲染器进程中使用 setInterval 循环来执行此操作 该循环每 10 分钟触发一次 它通常工作得很好 几个小时后似乎就停止了 我
  • Android Studio 在创建新活动时“发生 IDE 错误”

    更新见底部 由于 Android Studio 将自身更新到 v3 0 我无法创建新的 Activity 我收到此错误报告对话框 显示 2 个错误 我尝试了不同的项目 尝试清理项目 重新同步 gradle 重新启动计算机 卸载 重新安装 A
  • strdup():对警告感到困惑(“隐式声明”、“使指针...无需强制转换”、内存泄漏)

    当我编译下面的一小段代码 其中我们定义一个字符串 然后使用 strdup 来制作副本 时 我收到 3 个警告 来自 GCC 的 2 个编译器警告和来自 valgrind 的 1 个运行时警告 错误 我怀疑内存泄漏错误 由 valgrind
  • x86-SSE 中四个压缩单精度浮点到无符号双字的转换

    有没有办法将四个打包单精度浮点值转换为具有 SSE 扩展的 x86 中的四个双字 最接近的指令是CVTPS2PI 但它不能在两个 xmm 寄存器上执行 而是应该给出为CVTPS2PI MM XMM M64 如果我想要类似的东西怎么办
  • R metaMDS 排序距离

    我一直在对不同采样点的丰富物种数据集进行一些排序 我在用metaMDS 素食主义者可以做到这一点 通过此功能 您可以 直接输入群落数据 行中的站点和列中的物种 并指定您希望使用的距离类型 即 jaccard brays curtis euc
  • Linux 上的 OpenCL,集成英特尔图形芯片

    我想用OpenCL在 Debian 8 上 我读到在本页 http streamcomputing eu blog 2011 12 29 opencl hardware support Linux 上不支持 Intel 的 GPU 这篇文章
  • XCode 4.5 警告父/子类的类别之间的方法名称冲突

    我正在开发一个最初使用 XCode 4 0 构建的项目 然后迁移到使用 XCode 4 2 现在我已经测试了迁移到 XCode 4 5 并且收到了大量如下警告 instance method values in category from
  • 如何将模型数据对象数组转换为dataProvider

    假设我有模型User与其自身有多对多的关系 命名为friends so user gt friends or model gt friends在视图中 给了我一个数组User对象 我想将朋友显示为网格视图 但CGridView数据为data
  • 在二维动画上绘制可变大小和位置的圆圈

    我正在 Python 3 3 中使用 matplotlib 我有一个动画 2d 和 3d 窗口 我在上面画点 这些点代表物体 但不确定它们是否真的存在 所以我想围绕这些点画一个圆圈来显示不确定性 这种不确定性是变化的 所以底线是 我想在 2
  • JavaScript 中可以编写连续的嵌套函数吗?

    我知道这就是封闭的领域 但是可以连续调用嵌套的匿名函数吗 假设我有这个 function testing input var testing 0 function testing testing 1 return testing 我们可以有
  • 多列的加权平均值,按组(在 data.table 中)

    这个问题紧接着另一个问题组加权平均值 https stackoverflow com questions 14145859 weighted means by group and column 我想使用创建加权组内平均值data table
  • 自动更新AngularJS中的范围变量

    我目前正在使用 AngularJS 我想从服务返回一个变量 让作用域知道它何时发生变化 为了说明这一点 请查看 www angularjs org 中的示例 连接后端 粗略地 我们可以看到以下内容 var projects firebase
  • 如何使用 ITfoxtec 创建 IdP

    我有兴趣使用 SAML 创建 IdP 身份提供商 是否可以使用 ITfoxtec Nuget 创建完整的 IdP 功能 有 ASP Net C 的示例吗 是的 可以使用 ITfoxtec Identity Saml 2 0 库实施 IdP
  • 使用 WCF 将 Linq-to-Sql 对象发送到服务器时出错

    我正在尝试开发一个具有 2 层的系统 移动客户端和使用 LINQ to SQL 在数据库中存储信息的服务器 我想创建一个在服务器中存储任务的 WCF 服务器 因此它将从客户端接收任务并使用 LINQ to SQL 来存储它 为了执行此服务
  • 从键值对动态填充列

    我正在使用 xslt 文件进行样式设置 我以表格格式显示一些内容 我需要从预定义的键值对动态填充一列 请看下面的例子
  • 动态增加android中的相对布局大小

    我有一个相对布局 在其中显示页面和一些内容 当我缩放页面时 布局尺寸没有增加 我希望我的布局能够动态增加其大小 我该如何设置 我尝试用java代码来做 contentLayout getLayoutParams height x some
  • PySpark:如何分组、重新采样和前向填充空值?

    考虑以下数据集in Spark 我想以特定频率 例如 5 分钟 对日期重新采样 START DATE dt datetime 2019 8 15 20 33 0 test df pd DataFrame school id remote r