我正在使用 pyspark 处理一些传入的流数据,我想向我的数据帧添加一个具有 50 秒移动平均值的新列。
我尝试使用带有 rangeBetween 的 Window 规范:
import pyspark.sql.window as W
w = (W.Window()
.partitionBy(col("sender"))
.orderBy(F.col("event_time").cast('long'))
.rangeBetween(-50, 0))
df2 = df.withColumn('rolling_average', F.avg("fr").over(w))
但这给了我一个错误,因为结构化流需要基于时间的窗口(可能是为了管理状态):
AnalysisException: Non-time-based windows are not supported on streaming DataFrames/Datasets
使用 sql.window 函数,我还可以计算移动平均值,但这将通过对使用翻滚(或跳跃)窗口的窗口(以及称为发送者的唯一 id 键)进行分组来给出结果:
df.select('sender', 'event_time', 'fr').groupBy("sender", window("event_time", "50 second")).avg().alias('avg_fr')
sender |
window |
avg(fr) |
59834cfd-6cb2-4ece-8353-0a9b20389656 |
{"start":"2021-04-12T09:57:30.000+0000","end":"2021-04-12T09:58:20.000+0000"} |
0.17443667352199554 |
8b5d90b9-65d9-4dd2-b742-31c4f0ce37d6 |
{"start":"2021-04-12T09:57:30.000+0000","end":"2021-04-12T09:58:20.000+0000"} |
0.010564474388957024 |
a74204f3-e25d-4737-a302-9206cd69e90a |
{"start":"2021-04-12T09:57:30.000+0000","end":"2021-04-12T09:58:20.000+0000"} |
0.16375258564949036 |
db16426d-a9ba-449b-9777-3bdfadf0e0d9 |
{"start":"2021-04-12T09:57:30.000+0000","end":"2021-04-12T09:58:20.000+0000"} |
0.17516431212425232 |
翻滚窗口显然不是我想要的,我需要以某种方式将其再次连接到原始表。
我不确定如何根据传入的不规则事件时间戳定义滑动窗口。
现在,我考虑编写一个有状态函数,将一组先前接收到的记录存储到一个状态中,并为每个进入的新数据点更新该状态。但这对于我期望可以在一个常见活动中完成的这种常见活动来说似乎相当复杂。更简单的方法。
编辑:当前版本的 Spark (3.1.1) 只允许在 Java 或 Scala(而不是 python)中构建任意有状态函数,以保护到 JVM 的转换。
有什么想法这是否真的是正确的方法?