PySpark:如何重新采样频率

2024-02-22

想象一个由变量的值观察组成的 Spark Dataframe。每个观察都有一个特定的时间戳,并且不同变量之间的这些时间戳不相同。这是因为时间戳是在变量值更改并记录时生成的。

#Variable     Time                Value
#852-YF-007   2016-05-10 00:00:00 0
#852-YF-007   2016-05-09 23:59:00 0
#852-YF-007   2016-05-09 23:58:00 0

Problem我想使用前向填充将所有变量置于相同的频率(例如 10 分钟)。为了形象化这一点,我复制了《Python for Data Analysis》一书中的一页。问题:如何在 Spark Dataframe 中执行此操作高效的 way?


Question:如何在 Spark Dataframe 上高效地做到这一点?

Spark DataFrame对于这样的操作来说根本不是一个好的选择。一般来说,SQL 原语的表达能力不够,PySparkDataFrame不提供实现它所需的低级访问权限。

虽然重新采样可以使用纪元/时间戳算法轻松表示。有了这样的数据:

from pyspark.sql.functions import col, max as max_, min as min_

df = (spark  
    .createDataFrame([
        ("2012-06-13", 0.694), ("2012-06-20", -2.669), ("2012-06-27", 0.245)],   
        ["ts", "val"])        
   .withColumn("ts", col("ts").cast("date").cast("timestamp")))

我们可以重新采样输入:

day = 60 * 60 * 24
epoch = (col("ts").cast("bigint") / day).cast("bigint") * day

with_epoch = df.withColumn("epoch", epoch)

min_epoch, max_epoch = with_epoch.select(min_("epoch"), max_("epoch")).first()

并加入参考:

# Reference range 
ref = spark.range(
    min_epoch, max_epoch + 1, day
).toDF("epoch")

(ref
    .join(with_epoch, "epoch", "left")
    .orderBy("epoch")
    .withColumn("ts_resampled", col("epoch").cast("timestamp"))
    .show(15, False))

## +----------+---------------------+------+---------------------+   
## |epoch     |ts                   |val   |ts_resampled         |
## +----------+---------------------+------+---------------------+
## |1339459200|2012-06-13 00:00:00.0|0.694 |2012-06-12 02:00:00.0|
## |1339545600|null                 |null  |2012-06-13 02:00:00.0|
## |1339632000|null                 |null  |2012-06-14 02:00:00.0|
## |1339718400|null                 |null  |2012-06-15 02:00:00.0|
## |1339804800|null                 |null  |2012-06-16 02:00:00.0|
## |1339891200|null                 |null  |2012-06-17 02:00:00.0|
## |1339977600|null                 |null  |2012-06-18 02:00:00.0|
## |1340064000|2012-06-20 00:00:00.0|-2.669|2012-06-19 02:00:00.0|
## |1340150400|null                 |null  |2012-06-20 02:00:00.0|
## |1340236800|null                 |null  |2012-06-21 02:00:00.0|
## |1340323200|null                 |null  |2012-06-22 02:00:00.0|
## |1340409600|null                 |null  |2012-06-23 02:00:00.0|
## |1340496000|null                 |null  |2012-06-24 02:00:00.0|
## |1340582400|null                 |null  |2012-06-25 02:00:00.0|
## |1340668800|2012-06-27 00:00:00.0|0.245 |2012-06-26 02:00:00.0|
## +----------+---------------------+------+---------------------+

在 Spark >= 3.1 中替换

col("epoch").cast("timestamp")

with

from pyspark.sql.functions import timestamp_seconds

timestamp_seconds("epoch")

使用低级 API 可以填充这样的数据,正如我在回答中所示的那样Spark / Scala:使用最后一次观察进行前向填充 https://stackoverflow.com/q/33621319/1560062。使用 RDD,我们还可以避免对数据进行两次洗牌(一次用于连接,一次用于重新排序)。

但这里还有一个更重要的问题。当问题可以简化为按元素计算或按分区计算时,Spark 的性能最佳。虽然前向填充是可能的情况,但据我所知,常用的时间序列模型通常不是这种情况,如果某些操作需要顺序访问,那么 Spark 根本不会提供任何好处。

因此,如果您使用的系列足够大,需要分布式数据结构,您可能希望将其聚合到某个可以由单台机器轻松处理的对象,然后使用您最喜欢的非分布式工具来处理其余部分。

如果您使用多个时间序列,每个时间序列都可以在内存中处理,那么当然有sparkts,但我知道你已经意识到了这一点。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

PySpark:如何重新采样频率 的相关文章

随机推荐