Question:如何在 Spark Dataframe 上高效地做到这一点?
Spark DataFrame
对于这样的操作来说根本不是一个好的选择。一般来说,SQL 原语的表达能力不够,PySparkDataFrame
不提供实现它所需的低级访问权限。
虽然重新采样可以使用纪元/时间戳算法轻松表示。有了这样的数据:
from pyspark.sql.functions import col, max as max_, min as min_
df = (spark
.createDataFrame([
("2012-06-13", 0.694), ("2012-06-20", -2.669), ("2012-06-27", 0.245)],
["ts", "val"])
.withColumn("ts", col("ts").cast("date").cast("timestamp")))
我们可以重新采样输入:
day = 60 * 60 * 24
epoch = (col("ts").cast("bigint") / day).cast("bigint") * day
with_epoch = df.withColumn("epoch", epoch)
min_epoch, max_epoch = with_epoch.select(min_("epoch"), max_("epoch")).first()
并加入参考:
# Reference range
ref = spark.range(
min_epoch, max_epoch + 1, day
).toDF("epoch")
(ref
.join(with_epoch, "epoch", "left")
.orderBy("epoch")
.withColumn("ts_resampled", col("epoch").cast("timestamp"))
.show(15, False))
## +----------+---------------------+------+---------------------+
## |epoch |ts |val |ts_resampled |
## +----------+---------------------+------+---------------------+
## |1339459200|2012-06-13 00:00:00.0|0.694 |2012-06-12 02:00:00.0|
## |1339545600|null |null |2012-06-13 02:00:00.0|
## |1339632000|null |null |2012-06-14 02:00:00.0|
## |1339718400|null |null |2012-06-15 02:00:00.0|
## |1339804800|null |null |2012-06-16 02:00:00.0|
## |1339891200|null |null |2012-06-17 02:00:00.0|
## |1339977600|null |null |2012-06-18 02:00:00.0|
## |1340064000|2012-06-20 00:00:00.0|-2.669|2012-06-19 02:00:00.0|
## |1340150400|null |null |2012-06-20 02:00:00.0|
## |1340236800|null |null |2012-06-21 02:00:00.0|
## |1340323200|null |null |2012-06-22 02:00:00.0|
## |1340409600|null |null |2012-06-23 02:00:00.0|
## |1340496000|null |null |2012-06-24 02:00:00.0|
## |1340582400|null |null |2012-06-25 02:00:00.0|
## |1340668800|2012-06-27 00:00:00.0|0.245 |2012-06-26 02:00:00.0|
## +----------+---------------------+------+---------------------+
在 Spark >= 3.1 中替换
col("epoch").cast("timestamp")
with
from pyspark.sql.functions import timestamp_seconds
timestamp_seconds("epoch")
使用低级 API 可以填充这样的数据,正如我在回答中所示的那样Spark / Scala:使用最后一次观察进行前向填充 https://stackoverflow.com/q/33621319/1560062。使用 RDD,我们还可以避免对数据进行两次洗牌(一次用于连接,一次用于重新排序)。
但这里还有一个更重要的问题。当问题可以简化为按元素计算或按分区计算时,Spark 的性能最佳。虽然前向填充是可能的情况,但据我所知,常用的时间序列模型通常不是这种情况,如果某些操作需要顺序访问,那么 Spark 根本不会提供任何好处。
因此,如果您使用的系列足够大,需要分布式数据结构,您可能希望将其聚合到某个可以由单台机器轻松处理的对象,然后使用您最喜欢的非分布式工具来处理其余部分。
如果您使用多个时间序列,每个时间序列都可以在内存中处理,那么当然有sparkts
,但我知道你已经意识到了这一点。