如何计算 pyspark dataframe 中的每日基础(时间序列)

2024-05-04

所以我有一个数据框,我想计算一些数量,比如说每天......假设我们有 10 列 col1,col2,col3,col4 ... coln,其中每列都依赖于值col1、 col2、 col3 、 col4 .. 等等,日期根据id..

    +--------+----+----              +----+
        date |col1|id  |col2|.    .  |coln
    +--------+----+----              +----+
    2020-08-01| 0|  M1 |   .    .   .    3|
    2020-08-02| 4|  M1 |                10|
    2020-08-03| 3|  M1 |   .     .   .  9 |
    2020-08-04| 2|  M1 |    .   .    .  8 |
    2020-08-05| 1|  M1 |   .   .     .  7 |
    2020-08-06| 0|  M1 |   .    .   .   0 |
    2020-08-01| 0|  M2 |   .   .     .  0 |
    2020-08-02| 0|  M2 |    .   .   . . 1 |
    2020-08-03| 0|  M2 |    .   .  . .  2 |
   +---------+----+----+-----------------+   

假设我们执行这个数据帧,这个 df 中可能有更多的列...... 为了清楚起见,我们假设今天的日期是 2020 年 8 月 1 日。我们做了一些计算,我们在 coln 得到了一些输出coln =3在 2020-08-01,我想在 2020-08-02 coln == col1 即 col1 ==3 并在 2020-08-02 进行计算等等......所以 df 的例子看起来像下面这个

    +--------+----+----              +----+
        date |col1|id  |col2|.    .  |coln
    +--------+----+----              +----+
    2020-08-01| 0|  M1 |   .    .   .    3|
    2020-08-02| 3|  M1 |                10|
    2020-08-03|10|  M1 |   .     .   .  9 |
    2020-08-04| 9|  M1 |    .   .    .  8 |
    2020-08-05| 8|  M1 |   .   .     .  7 |
    2020-08-06| 7|  M1 |   .    .   .   0 |
    2020-08-01| 0|  M2 |   .   .     .  1 |
    2020-08-02| 1|  M2 |    .   .   . . 2 |
    2020-08-03| 2|  M2 |    .   .  . .  0 |
   +---------+----+----+-----------------+   
  

如果你们能给我一个如何在 pyspark 中完成此操作的例子,那就太好了。

example:比方说col3 = col1+ col2最初,假设 col1 全部为 0。

df1_schema = StructType([StructField("Date", StringType(), True),\
                              StructField("col1", IntegerType(), True),\
                             StructField("id", StringType(), True),\
                       StructField("col2", IntegerType(), True),\
                       StructField("col3", IntegerType(), True),\
                        StructField("coln", IntegerType(), True)])
df_data = [('2020-08-01',0,'M1',3,3,2),('2020-08-02',0,'M1',2,3,1),\
           ('2020-08-03',0,'M1',3,3,3),('2020-08-04',0,'M1',3,3,1),\
            ('2020-08-01',0,'M2',1,3,1),('2020-08-02',0,'M2',-1,3,2)]
rdd = sc.parallelize(df_data)
df1 = sqlContext.createDataFrame(df_data, df1_schema)
df1 = df1.withColumn("Date",to_date("Date", 'yyyy-MM-dd'))
df1.show()

+----------+----+---+----+----+----+
|      Date|col1| id|col2|col3|coln|
+----------+----+---+----+----+----+
|2020-08-01|   0| M1|   3|   3|   2|
|2020-08-02|   0| M1|   2|   3|   1|
|2020-08-03|   0| M1|   3|   3|   3|
|2020-08-04|   0| M1|   3|   3|   1|
|2020-08-01|   0| M2|   1|   3|   1|
|2020-08-02|   0| M2|  -1|   3|   2|
+----------+----+---+----+----+----+

所以让我们重点关注2020-08-01这是开始,我们想要的是 col1+col2,即 3 = col3。在依赖于 col3.. col4... col5.. 的第 n 次计算之后,假设我们得到了某个数字 coln= 3。计算完成后,我们想要2020-08-02, coln=3 应该位于 col1 所以它是在2020-08-01计算完成后动态变化的

所以我想要的 df 看起来像这样

+----------+----+---+----+----+----+
|      Date|col1| id|col2|col3|coln|
+----------+----+---+----+----+----+
|2020-08-01|   0| M1|   3|   3|   2|
|2020-08-02|   2| M1|   2|   5|   1|
|2020-08-03|   1| M1|   3|   4|   3|
|2020-08-04|   3| M1|   3|   6|   1|
|2020-08-01|   1| M2|   1|   4|   1|
|2020-08-02|   1| M2|  -1|   0|   2|
+----------+----+---+----+----+----+

EDIT 2:

df1_schema = StructType([StructField("Date", StringType(), True),\
                              StructField("col1", IntegerType(), True),\
                             StructField("id", StringType(), True),\
                       StructField("col2", IntegerType(), True),\
                       StructField("col3", IntegerType(), True),\
                       StructField("col4", IntegerType(), True),\
                        StructField("coln", IntegerType(), True)])
df_data = [('2020-08-01',0,'M1',3,3,2,2),('2020-08-02',0,'M1',2,3,0,1),\
           ('2020-08-03',0,'M1',3,3,2,3),('2020-08-04',0,'M1',3,3,2,1),\
            ('2020-08-01',0,'M2',1,3,3,1),('2020-08-02',0,'M2',-1,3,1,2)]
rdd = sc.parallelize(df_data)
df1 = sqlContext.createDataFrame(df_data, df1_schema)
df1 = df1.withColumn("Date",to_date("Date", 'yyyy-MM-dd'))
df1.show()
+----------+----+---+----+----+----+----+
|      Date|col1| id|col2|col3|col4|coln|
+----------+----+---+----+----+----+----+
|2020-08-01|   0| M1|   3|   3|   2|   2|
|2020-08-02|   0| M1|   2|   3|   0|   1|
|2020-08-03|   0| M1|   3|   3|   2|   3|
|2020-08-04|   0| M1|   3|   3|   2|   1|
|2020-08-01|   0| M2|   1|   3|   3|   1|
|2020-08-02|   0| M2|  -1|   3|   1|   2|
+----------+----+---+----+----+----+----+

所以假设 coln = col4 - col2 那么

+----------+----+---+----+----+----+----+
|      Date|col1| id|col2|col3|col4|coln|
+----------+----+---+----+----+----+----+
|2020-08-01|   0| M1|   3|   3|   2|  -1|
|2020-08-02|  -1| M1|   2|   1|   0|  -2|
|2020-08-03|  -2| M1|   3|   1|   2|  -1|
|2020-08-04|  -1| M1|   3|   2|   2|  -1|
|2020-08-01|   0| M2|   1|   1|   3|   2|
|2020-08-02|   2| M2|  -1|   1|   1|   2|
+----------+----+---+----+----+----+----+

这是您可以使用 Spark SQL 内置函数处理的一类问题总计的 https://spark.apache.org/docs/latest/api/sql/index.html#aggregate(要求火花2.4+),下面概述了基本思想:

from pyspark.sql.functions import sort_array, collect_list, struct, to_date

cols = ['Date', 'col1', 'col2', 'col3', 'coln']

df_new = df1.groupby('id') \
    .agg(sort_array(collect_list(struct(*cols))).alias('dta')) \
    .selectExpr("id", """  
      inline( 
        aggregate( 
          /* expr: iterate through the array `dta` from the 2nd to the last items*/
          slice(dta,2,size(dta)-1), 
          /* start: AKA. the zero value which is an array of structs 
           * with a single element dta[0]
           */
          array(dta[0]), 
          /* merge: do the calculations */
          (acc, x) ->   
            concat(acc, array(named_struct( 
              'Date', x.Date, 
              'col1', element_at(acc, -1).coln, 
              'col2', x.col2, 
              'col3', element_at(acc, -1).col3 + x.col2, 
              'coln', x.col3 - x.col2 
            )))  
         )    
       )    
   """)

Output:

df_new.show()
+---+----------+----+----+----+----+ 
| id|      Date|col1|col2|col3|coln|
+---+----------+----+----+----+----+
| M1|2020-08-01|   0|   3|   3|   2|
| M1|2020-08-02|   2|   2|   5|   1|
| M1|2020-08-03|   1|   3|   8|   0|
| M1|2020-08-04|   0|   3|  11|   0|
| M2|2020-08-01|   0|   1|   3|   1|
| M2|2020-08-02|   1|  -1|   2|   4|
+---+----------+----+----+----+----+

Where:

  1. 我们对相同的行进行分组id并对它们进行排序Date,将结果结构数组命名为dta

  2. 在聚合函数中,我们初始化acc带有结构体数组array(dta[0])然后遍历数组dta从第二项到最后一项使用slice https://spark.apache.org/docs/latest/api/sql/index.html#slice功能

  3. in the merge聚合函数的一部分,您可以使用x.col1, x.coln等引用同一日期的值并使用element_at(acc, -1).col1, element_at(acc, -1).coln等来引用前一个日期的值。

  4. 在合并函数中,我们使用concat(acc, array(...))将新元素追加到结构数组中acc

  5. use inline https://spark.apache.org/docs/latest/api/sql/index.html#inline函数来分解上面的结构数组acc

  6. 这里假设日期是连续的,如果存在缺失日期,可以添加一些IF条件。例如计算col3 below:

    IF(datediff(x.Date, element_at(acc, -1).Date) = 1, element_at(acc, -1).coln, 0) + x.col2
    

顺便提一句。我没有使用这个例子coln = col4 - col2, using con3 = col3_prev + col2相反,我认为这是一个更好的例子。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何计算 pyspark dataframe 中的每日基础(时间序列) 的相关文章

随机推荐