火花3.0+ has max_by https://spark.apache.org/docs/latest/api/sql/index.html#max_by and min_by https://spark.apache.org/docs/latest/api/sql/index.html#min_by,在你的情况下可以很好地使用。
from pyspark.sql import functions as F
Bronze = spark.createDataFrame(
[('sensor_1', '2022-02-02 10:10:01', 65534),
('sensor_1', '2022-02-02 10:10:02', 65535),
('sensor_1', '2022-02-02 10:10:03', 0),
('sensor_1', '2022-02-02 10:10:04', 1),
('sensor_1', '2022-02-02 10:11:02', 2),
('sensor_1', '2022-02-02 10:11:04', 4),
('sensor_1', '2022-02-02 10:10:59', 567)],
['sensor_id', 'TimeStamp', 'Value'])
Silver = (Bronze
.withWatermark("TimeStamp", "1 minute")
.groupBy(['sensor_id', F.window('TimeStamp', '1 minute')])
.agg(
F.expr("max_by(value, TimeStamp)").alias('lastvalue'),
F.expr("min_by(value, TimeStamp)").alias('firstvalue'),
F.max('TimeStamp').alias('lastTimeStamp'),
F.min('TimeStamp').alias('firstTimeStamp')
)
)
Silver.show()
# +---------+--------------------+---------+----------+-------------------+-------------------+
# |sensor_id| window|lastvalue|firstvalue| lastTimeStamp| firstTimeStamp|
# +---------+--------------------+---------+----------+-------------------+-------------------+
# | sensor_1|{2022-02-02 10:10...| 567| 65534|2022-02-02 10:10:59|2022-02-02 10:10:01|
# | sensor_1|{2022-02-02 10:11...| 4| 2|2022-02-02 10:11:04|2022-02-02 10:11:02|
# +---------+--------------------+---------+----------+-------------------+-------------------+
较旧的 Spark 版本可以使用窗口函数来完成此操作。
from pyspark.sql import functions as F, Window as W
partition = ['sensor_id', F.window('TimeStamp', '1 minute')]
w_desc = W.partitionBy(partition).orderBy(F.desc('Timestamp'))
w_asc = W.partitionBy(partition).orderBy('Timestamp')
Silver = (Bronze
.withWatermark("TimeStamp", "1 minute")
.withColumn('lastvalue', F.first('Value').over(w_desc))
.withColumn('lastTimeStamp', F.first('TimeStamp').over(w_desc))
.withColumn('firstvalue', F.first('Value').over(w_asc))
.withColumn('firstTimeStamp', F.first('TimeStamp').over(w_asc))
.groupBy(*partition)
.agg(
F.first('lastvalue').alias('lastvalue'),
F.first('firstvalue').alias('firstvalue'),
F.first('lastTimeStamp').alias('lastTimeStamp'),
F.first('firstTimeStamp').alias('firstTimeStamp')
)
)