这个用例可以通过 Spark 的滞后/任何其他功能来完成吗?

2024-03-08

我使用的是spark-2.4.1v。我的项目中有一个用例,对于每个日期(process_date),我需要考虑当天的记录和前一天的记录,并对该数据集执行某些其他操作。 那么如何为此准备数据集呢?我尝试使用滞后函数但没有取得太大成功。

对于上述用例,给出的数据如下:

+----------+----------+----+-------+------------+-----------+
|company_id|  gen_date|year|quarter|total_assets|create_date|
+----------+----------+----+-------+------------+-----------+
| 989856662|2019-01-02|2019|      1| 3900.435058| 2019-09-11|
| 989856665|2019-01-02|2019|      1| 4836.435058| 2019-09-11|
| 989856667|2019-01-02|2019|      1| 5836.435058| 2019-09-11|
| 989856662|2019-01-01|2019|      1| 3800.435058| 2019-09-11|
| 989856665|2019-01-01|2019|      1| 3834.435058| 2019-09-11|
| 989856667|2019-01-01|2019|      1| 5834.435058| 2019-09-11|
| 989856662|2018-12-31|2018|      4| 3700.435058| 2019-09-11|
| 989856665|2018-12-31|2018|      4| 3900.435058| 2019-09-11|
| 989856667|2018-12-31|2018|      4| 5833.435058| 2019-09-11|
| 989856662|2018-12-30|2018|      4| 3832.435058| 2019-09-11|
| 989856665|2018-12-30|2018|      4| 3700.435058| 2019-09-11|
| 989856667|2018-12-30|2018|      4| 5832.435058| 2019-09-11|
+----------+----------+----+-------+------------+-----------+

这里 gen_date 是关键列。对于每个 gen_date,我需要获取其先前可用的 gen_date 记录。这些将按设置一起处理,即对于 process_date 2019-01-02,它应该具有 2019-01-02 和 2019-01-01 的记录,就像 gen_date 2018-12-30 及其之前的 gen_date 的 process_date 记录一样,即2018-12-29,但这里的 2018-12-29 gen_date 记录不可用,因此应被视为 gen_date 2018-12-30 记录。

在给定的集合中:

对于 process_date 2019-01-02 => ( gen_date 2019-01-02 ) 的记录 + ( gen_date 2019-01-01 ) 的记录
对于 process_date 2019-01-01 => ( gen_date 2019-01-01 ) 的记录 + ( gen_date 2018-12-31 ) 的记录
对于 process_date 2018-12-31 => ( gen_date 2018-12-31 ) 的记录 + ( gen_date 2018-12-30 ) 的记录
对于 process_date 2018-12-30 => ( gen_date 2018-12-30 ) 的记录 + 没有以前的 gen_date 记录。

输出应如下所示:

+----------+------------+----------+----+-------+------------+-----------+
|company_id|process_date|  gen_date|year|quarter|total_assets|create_date|
+----------+------------+----------+----+-------+------------+-----------+
| 989856662|  2019-01-02|2019-01-02|2019|      1| 3900.435058| 2019-09-11|
| 989856662|  2019-01-02|2019-01-01|2019|      1| 3800.435058| 2019-09-11|
| 989856665|  2019-01-02|2019-01-02|2019|      1| 4836.435058| 2019-09-11|
| 989856665|  2019-01-02|2019-01-01|2019|      1| 3834.435058| 2019-09-11|
| 989856667|  2019-01-02|2019-01-02|2019|      1| 5836.435058| 2019-09-11|
| 989856667|  2019-01-02|2019-01-01|2019|      1| 5834.435058| 2019-09-11|
| 989856662|  2019-01-01|2019-01-01|2019|      1| 3800.435058| 2019-09-11|
| 989856662|  2019-01-01|2018-12-31|2018|      4| 3700.435058| 2019-09-11|
| 989856665|  2019-01-01|2019-01-01|2019|      1| 3834.435058| 2019-09-11|
| 989856665|  2019-01-01|2018-12-31|2018|      4| 3900.435058| 2019-09-11|
| 989856667|  2019-01-01|2019-01-01|2019|      1| 5834.435058| 2019-09-11|
| 989856667|  2019-01-01|2018-12-31|2018|      4| 5833.435058| 2019-09-11|
| 989856662|  2018-12-31|2018-12-31|2018|      4| 3700.435058| 2019-09-11|
| 989856662|  2018-12-31|2018-12-30|2018|      4| 3832.435058| 2019-09-11|
| 989856665|  2018-12-31|2018-12-31|2018|      4| 3900.435058| 2019-09-11|
| 989856665|  2018-12-31|2018-12-30|2018|      4| 3700.435058| 2019-09-11|
| 989856667|  2018-12-31|2018-12-31|2018|      4| 5833.435058| 2019-09-11|
| 989856667|  2018-12-31|2018-12-30|2018|      4| 5832.435058| 2019-09-11|
| 989856662|  2018-12-30|2018-12-30|2018|      4| 3832.435058| 2019-09-11|
| 989856665|  2018-12-30|2018-12-30|2018|      4| 3700.435058| 2019-09-11|
| 989856667|  2018-12-30|2018-12-30|2018|      4| 5832.435058| 2019-09-11|
+----------+------------+----------+----+-------+------------+-----------+

如何实现上述输出?

下面是所附的笔记本网址。

https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1165111237342523/988191344931748/7035720262824085/latest.html https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1165111237342523/988191344931748/7035720262824085/latest.html


为了获取给定的前一天的详细信息gen_datecompany_id,您可以使用滞后函数,如下所示spec,

val windowSpec  = Window.partitionBy("company_id").orderBy("gen_date") 

val intermediateDF = finDF
  .withColumn("previous_gen_date", lag("gen_date",1).over(windowSpec))

上述步骤将根据company_id和gen_date为您获取上一代日期,您可以将此数据与您的原始数据连接起来以获得相关的前一天数据。

val finalDF = intermediateDF.alias("a")
  .join(finDF.alias("b"), col("a.company_id") === col("b.company_id") &&
    col("a.previous_gen_date") === col("b.gen_date"), "left_outer")
    .select(col("a.*"),
      col("b.year").as("previous_gen_date_year"),
      col("b.quarter").as("previous_gen_date_quarter"),
      col("b.total_assets").as("previous_gen_date_total_assets"),
      col("b.create_date").as("previous_gen_date_create_date")
    )

上述连接将产生前一天的完整数据以及生成日期。

+----------+----------+----+-------+------------+-----------+-----------------+----------------------+-------------------------+------------------------------+-----------------------------+
|company_id|gen_date  |year|quarter|total_assets|create_date|previous_gen_date|previous_gen_date_year|previous_gen_date_quarter|previous_gen_date_total_assets|previous_gen_date_create_date|
+----------+----------+----+-------+------------+-----------+-----------------+----------------------+-------------------------+------------------------------+-----------------------------+
|989856662 |2018-12-30|2018|4      |3832.435058 |2019-09-11 |null             |null                  |null                     |null                          |null                         |
|989856662 |2018-12-31|2018|4      |3700.435058 |2019-09-11 |2018-12-30       |2018                  |4                        |3832.435058                   |2019-09-11                   |
|989856662 |2019-01-01|2019|1      |3800.435058 |2019-09-11 |2018-12-31       |2018                  |4                        |3700.435058                   |2019-09-11                   |
|989856662 |2019-01-02|2019|1      |3900.435058 |2019-09-11 |2019-01-01       |2019                  |1                        |3800.435058                   |2019-09-11                   |
+----------+----------+----+-------+------------+-----------+-----------------+----------------------+-------------------------+------------------------------+-----------------------------+

在这里你的gen_date也可以充当process_date列,您可以使用此比较任何操作的两天数据。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

这个用例可以通过 Spark 的滞后/任何其他功能来完成吗? 的相关文章

随机推荐