您正在寻找前向填充数据集。这变得有点复杂,因为您需要按类别(人)进行操作。
一种方法是这样的:创建一个新的 DataFrame,其中包含您想要为每个人赋值的所有日期(见下文,这只是dates_by_person
).
然后,将原始 DataFrame 左连接到此 DataFrame,以便开始创建缺失的行。
接下来,使用加窗函数在每组中查找person
,按日期排序,最后一个非空权重。如果每个日期可以有多个条目(因此一个人在一个特定日期有多个填写的记录),您还必须按时间戳列进行排序。
最后合并列,以便任何空字段都被预期值替换。
from datetime import datetime, timedelta
from itertools import product
import pyspark.sql.functions as psf
from pyspark.sql import Window
data = ( # recreate the DataFrame
(1, datetime(2019, 12, 2, 14, 54, 17), 49.94),
(1, datetime(2019, 12, 3, 8, 58, 39), 50.49),
(1, datetime(2019, 12, 6, 10, 44, 1), 50.24),
(2, datetime(2019, 12, 2, 8, 58, 39), 62.32),
(2, datetime(2019, 12, 4, 10, 44, 1), 65.64))
df = spark.createDataFrame(data, schema=("person", "timestamp", "weight"))
min_max_timestamps = df.agg(psf.min(df.timestamp), psf.max(df.timestamp)).head()
first_date, last_date = [ts.date() for ts in min_max_timestamps]
all_days_in_range = [first_date + timedelta(days=d)
for d in range((last_date - first_date).days + 1)]
people = [row.person for row in df.select("person").distinct().collect()]
dates_by_person = spark.createDataFrame(product(people, all_days_in_range),
schema=("person", "date"))
df2 = (dates_by_person.join(df,
(psf.to_date(df.timestamp) == dates_by_person.date)
& (dates_by_person.person == df.person),
how="left")
.drop(df.person)
)
wind = (Window
.partitionBy("person")
.rangeBetween(Window.unboundedPreceding, -1)
.orderBy(psf.unix_timestamp("date"))
)
df3 = df2.withColumn("last_weight",
psf.last("weight", ignorenulls=True).over(wind))
df4 = df3.select(
df3.person,
psf.coalesce(df3.timestamp, psf.to_timestamp(df3.date)).alias("timestamp"),
psf.coalesce(df3.weight, df3.last_weight).alias("weight"))
df4.show()
# +------+-------------------+------+
# |person| timestamp|weight|
# +------+-------------------+------+
# | 1|2019-12-02 14:54:17| 49.94|
# | 1|2019-12-03 08:58:39| 50.49|
# | 1|2019-12-04 00:00:00| 50.49|
# | 1|2019-12-05 00:00:00| 50.49|
# | 1|2019-12-06 10:44:01| 50.24|
# | 2|2019-12-02 08:58:39| 62.32|
# | 2|2019-12-03 00:00:00| 62.32|
# | 2|2019-12-04 10:44:01| 65.64|
# | 2|2019-12-05 00:00:00| 65.64|
# | 2|2019-12-06 00:00:00| 65.64|
# +------+-------------------+------+
编辑:正如大卫在评论中建议的那样,如果你有很多人,那么构建dates_by_people
不需要将所有东西都交给驾驶员即可完成。在这个例子中,我们讨论的是少量的整数,没什么大的。但如果它变大,请尝试:
dates = spark.createDataFrame(((d,) for d in all_days_in_range),
schema=("date",))
people = df.select("person").distinct()
dates_by_person = dates.crossJoin(people)