使用 pyspark 结构化流计算移动平均列

2024-02-20

我正在使用 pyspark 处理一些传入的流数据,我想向我的数据帧添加一个具有 50 秒移动平均值的新列。

我尝试使用带有 rangeBetween 的 Window 规范:

import pyspark.sql.window as W

w = (W.Window()
     .partitionBy(col("sender"))
     .orderBy(F.col("event_time").cast('long'))
     .rangeBetween(-50, 0))
df2 = df.withColumn('rolling_average', F.avg("fr").over(w))

但这给了我一个错误,因为结构化流需要基于时间的窗口(可能是为了管理状态):

AnalysisException: Non-time-based windows are not supported on streaming DataFrames/Datasets

使用 sql.window 函数,我还可以计算移动平均值,但这将通过对使用翻滚(或跳跃)窗口的窗口(以及称为发送者的唯一 id 键)进行分组来给出结果:

df.select('sender', 'event_time', 'fr').groupBy("sender", window("event_time", "50 second")).avg().alias('avg_fr')
sender window avg(fr)
59834cfd-6cb2-4ece-8353-0a9b20389656 {"start":"2021-04-12T09:57:30.000+0000","end":"2021-04-12T09:58:20.000+0000"} 0.17443667352199554
8b5d90b9-65d9-4dd2-b742-31c4f0ce37d6 {"start":"2021-04-12T09:57:30.000+0000","end":"2021-04-12T09:58:20.000+0000"} 0.010564474388957024
a74204f3-e25d-4737-a302-9206cd69e90a {"start":"2021-04-12T09:57:30.000+0000","end":"2021-04-12T09:58:20.000+0000"} 0.16375258564949036
db16426d-a9ba-449b-9777-3bdfadf0e0d9 {"start":"2021-04-12T09:57:30.000+0000","end":"2021-04-12T09:58:20.000+0000"} 0.17516431212425232

翻滚窗口显然不是我想要的,我需要以某种方式将其再次连接到原始表。 我不确定如何根据传入的不规则事件时间戳定义滑动窗口。

现在,我考虑编写一个有状态函数,将一组先前接收到的记录存储到一个状态中,并为每个进入的新数据点更新该状态。但这对于我期望可以在一个常见活动中完成的这种常见活动来说似乎相当复杂。更简单的方法。

编辑:当前版本的 Spark (3.1.1) 只允许在 Java 或 Scala(而不是 python)中构建任意有状态函数,以保护到 JVM 的转换。

有什么想法这是否真的是正确的方法?


您收到异常是因为您似乎正在构建用于批处理的窗口,而不是流数据帧。

在《结构化流编程指南》部分中事件时间的窗口操作 https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#window-operations-on-event-time给出了一个可以应用于您的用例的示例:

streamDf = ...  # streaming DataFrame of schema { event_time: Timestamp, sender: String, fr: Integer }

# Group the data by window and sender and compute the average of each group
movingAverageDf = streamDf.groupBy(
    window(streamDf.event_time, "50 seconds", "5 seconds"),
    streamDf.sender
).avg(streamDf.fr)

请记住,如果不使用水印,应用程序的内部状态将无限期增长。因此建议还添加水印。确保在水印中使用与窗口相同的事件时间。

关于流查询的输出模式的另一个注释:查看概述输出模式 https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes了解您的流式查询支持哪些模式。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 pyspark 结构化流计算移动平均列 的相关文章

随机推荐