PySpark 结构化流,窗口根据时间戳值获取最早和最新记录

2024-02-02

我有一个从 deltalake 读取的结构化流处理。数据包含随时间增加的值。在每个窗口中,我想根据该窗口内记录的时间戳来获取最早记录和最新记录之间的差异。

价值观就像

sensor_id |TimeStamp       |Value
sensor_1  |Jun 16 10:10:01 |65534
sensor_1  |Jun 16 10:10:02 |65535
sensor_1  |Jun 16 10:10:03 |0
sensor_1  |Jun 16 10:10:04 |1
...
sensor_1  |Jun 16 10:10:59 |567

我想检索每个窗口的最早值 (Jun 16 10:10:01, 65534) 和最新值 (Jun 16 10:10:59, 567)

Silver = (Bronze 
    .withWatermark("TimeStamp", "1 minute") 
    .groupBy(['sensor_id', F.window('TimeStamp', '1 minute')])
    .agg(
         F.last(F.col('value')).alias('lastvalue'), 
         F.first(F.col('value')).alias('firstvalue'), 
         F.last(F.col('TimeStamp')).alias('lastTimeStamp'),
         F.first(F.col('TimeStamp')).alias('firstTimeStamp')
         )
)

问题是顺序是不确定的https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.last.html https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.last.html

因此,“最后”记录不一定是具有最新时间戳的记录。有没有办法根据记录中的时间戳来选择最早记录和最新记录的值?

使用结构化流时排序似乎不起作用。另一种可能性是使用滞后函数并对结果求和 - 但也没有找到任何使用结构化流的工作示例。


火花3.0+ has max_by https://spark.apache.org/docs/latest/api/sql/index.html#max_by and min_by https://spark.apache.org/docs/latest/api/sql/index.html#min_by,在你的情况下可以很好地使用。

from pyspark.sql import functions as F
Bronze = spark.createDataFrame(
    [('sensor_1', '2022-02-02 10:10:01', 65534),
     ('sensor_1', '2022-02-02 10:10:02', 65535),
     ('sensor_1', '2022-02-02 10:10:03', 0),
     ('sensor_1', '2022-02-02 10:10:04', 1),
     ('sensor_1', '2022-02-02 10:11:02', 2),
     ('sensor_1', '2022-02-02 10:11:04', 4),
     ('sensor_1', '2022-02-02 10:10:59', 567)],
    ['sensor_id', 'TimeStamp', 'Value'])

Silver = (Bronze 
    .withWatermark("TimeStamp", "1 minute")
    .groupBy(['sensor_id', F.window('TimeStamp', '1 minute')])
    .agg(
         F.expr("max_by(value, TimeStamp)").alias('lastvalue'),
         F.expr("min_by(value, TimeStamp)").alias('firstvalue'),
         F.max('TimeStamp').alias('lastTimeStamp'),
         F.min('TimeStamp').alias('firstTimeStamp')
    )
)
Silver.show()
# +---------+--------------------+---------+----------+-------------------+-------------------+
# |sensor_id|              window|lastvalue|firstvalue|      lastTimeStamp|     firstTimeStamp|
# +---------+--------------------+---------+----------+-------------------+-------------------+
# | sensor_1|{2022-02-02 10:10...|      567|     65534|2022-02-02 10:10:59|2022-02-02 10:10:01|
# | sensor_1|{2022-02-02 10:11...|        4|         2|2022-02-02 10:11:04|2022-02-02 10:11:02|
# +---------+--------------------+---------+----------+-------------------+-------------------+

较旧的 Spark 版本可以使用窗口函数来完成此操作。

from pyspark.sql import functions as F, Window as W
partition = ['sensor_id', F.window('TimeStamp', '1 minute')]
w_desc = W.partitionBy(partition).orderBy(F.desc('Timestamp'))
w_asc = W.partitionBy(partition).orderBy('Timestamp')
Silver = (Bronze 
    .withWatermark("TimeStamp", "1 minute")
    .withColumn('lastvalue', F.first('Value').over(w_desc))
    .withColumn('lastTimeStamp', F.first('TimeStamp').over(w_desc))
    .withColumn('firstvalue', F.first('Value').over(w_asc))
    .withColumn('firstTimeStamp', F.first('TimeStamp').over(w_asc))
    .groupBy(*partition)
    .agg(
         F.first('lastvalue').alias('lastvalue'), 
         F.first('firstvalue').alias('firstvalue'), 
         F.first('lastTimeStamp').alias('lastTimeStamp'),
         F.first('firstTimeStamp').alias('firstTimeStamp')
    )
)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

PySpark 结构化流,窗口根据时间戳值获取最早和最新记录 的相关文章

随机推荐