具有当前行条件的 Spark 窗口函数

2024-02-19

我正在尝试计算给定的order_id过去 365 天内有多少订单已付款。这不是问题:我使用窗函数 https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html.

对我来说棘手的是:我不想在这个时间窗口内计算订单,因为payment_date是在之后order_date当前的order_id.

目前,我有这样的事情:

val window: WindowSpec = Window
  .partitionBy("customer_id")
  .orderBy("order_date")
  .rangeBetween(-365*days, -1)

and

df.withColumn("paid_order_count", count("*") over window)

这将统计客户当前订单之前过去 365 天内的所有订单。

现在我怎样才能为需要的计数添加一个条件order_date考虑当前订单?

Example:

+---------+-----------+-------------+------------+
|order_id |order_date |payment_date |customer_id |
+---------+-----------+-------------+------------+
|1        |2017-01-01 |2017-01-10   |A           |
|2        |2017-02-01 |2017-02-10   |A           |
|3        |2017-02-02 |2017-02-20   |A           |

结果表应如下所示:

+---------+-----------+-------------+------------+-----------------+
|order_id |order_date |payment_date |customer_id |paid_order_count |
+---------+-----------+-------------+------------+-----------------+
|1        |2017-01-01 |2017-01-10   |A           |0                |
|2        |2017-02-01 |2017-02-10   |A           |1                |
|3        |2017-02-02 |2017-02-20   |A           |1                |

For order_id = 3 the paid_order_count不应该是2 but 1 as order_id = 2是在之后支付的order_id = 3被放置。

我希望我能很好地解释我的问题并期待您的想法!


很好的问题!!! 一些评论,使用范围之间创建一个基于其中的行数而不是值的固定框架,因此在两种情况下会出现问题:

  1. 客户并非每天都有订单,因此 365 行窗口可能包含 order_date 早于一年前的行
  2. 如果客户每天有多个订单,则会扰乱一年的保修范围
  3. 1 和 2 的组合

Also 范围之间不适用于日期和时间戳数据类型。

为了解决这个问题,可以使用带有列表和 UDF 的窗口函数:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

  val df = spark.sparkContext.parallelize(Seq(
    (1, "2017-01-01", "2017-01-10", "A")
    , (2, "2017-02-01", "2017-02-10", "A")
    , (3, "2017-02-02", "2017-02-20", "A")
  )
  ).toDF("order_id", "order_date", "payment_date", "customer_id")
    .withColumn("order_date_ts", to_timestamp($"order_date", "yyyy-MM-dd").cast("long"))
    .withColumn("payment_date_ts", to_timestamp($"payment_date", "yyyy-MM-dd").cast("long"))

//      df.printSchema()
//      df.show(false)

  val window = Window.partitionBy("customer_id").orderBy("order_date_ts").rangeBetween(Window.unboundedPreceding, -1)

  val count_filtered_dates = udf( (days: Int, top: Long, array: Seq[Long]) => {
      val bottom = top - (days * 60 * 60 * 24).toLong // in spark timestamps are in secconds, calculating the date days ago
      array.count(v => v >= bottom && v < top)
    }
  )

  val res = df.withColumn("paid_orders", collect_list("payment_date_ts") over window)
      .withColumn("paid_order_count", count_filtered_dates(lit(365), $"order_date_ts", $"paid_orders"))

  res.show(false)

Output:

+--------+----------+------------+-----------+-------------+---------------+------------------------+----------------+
|order_id|order_date|payment_date|customer_id|order_date_ts|payment_date_ts|paid_orders             |paid_order_count|
+--------+----------+------------+-----------+-------------+---------------+------------------------+----------------+
|1       |2017-01-01|2017-01-10  |A          |1483228800   |1484006400     |[]                      |0               |
|2       |2017-02-01|2017-02-10  |A          |1485907200   |1486684800     |[1484006400]            |1               |
|3       |2017-02-02|2017-02-20  |A          |1485993600   |1487548800     |[1484006400, 1486684800]|1               |
+--------+----------+------------+-----------+-------------+---------------+------------------------+----------------+

将日期转换为 Spark 时间戳(以秒为单位)可以提高列表的内存效率。

这是最容易实现的代码,但不是最佳的,因为列表会占用一些内存,自定义 UDAF 是最好的,但需要更多编码,可能稍后会做。如果每个客户有数千个订单,这仍然有效。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

具有当前行条件的 Spark 窗口函数 的相关文章

  • 如何在 Lift 框架中添加新页面

    如何在 lift 中的 webapp 目录中添加一个可供用户访问的新页面 目前只能通过index html访问http localhost 8080 com http localhost 8080 or http localhost 808
  • 火花内存不足

    我有一个文件夹 里面有 150 G 的 txt 文件 大约 700 个文件 平均每个 200 MB 我使用 scala 来处理文件并最终计算一些汇总统计数据 我认为有两种可能的方法可以做到这一点 手动循环所有文件 对每个文件进行计算并最终合
  • Scala 的“神奇”函数列表

    在哪里可以找到 Scala 的 神奇 函数列表 例如apply unapply update etc 魔法函数是指编译器的某些语法糖使用的函数 例如 o update x y lt gt o x y 我用谷歌搜索了一些组合scala mag
  • 在没有匹配器的情况下如何跳过specs2中的测试?

    我正在尝试使用 scala 中的 specs2 测试一些与数据库相关的内容 目标是测试 db running 然后执行测试 我发现如果数据库关闭 我可以使用 Matcher 类中的 orSkip 问题是 我正在获取一个匹配条件的输出 作为
  • 我可以在没有 Hadoop 的情况下使用 Spark 作为开发环境吗?

    我对大数据和相关领域的概念非常陌生 如果我犯了一些错误或拼写错误 我很抱歉 我想了解阿帕奇火花 http spark apache org 并使用它仅在我的电脑中 在开发 测试环境中 由于Hadoop包含HDFS Hadoop分布式文件系统
  • 你能在 scala 中使用 varargs 柯里化一个函数吗?

    我正在考虑如何用可变参数柯里化一种方法 然后我意识到我什至不知道如何去做 理想情况下 它应该让您可以随时开始使用它 然后以可迭代结束 def concat strs String strs mkString val curriedConca
  • PySpark - 系统找不到指定的路径

    Hy 我已经多次运行 Spark Spyder IDE 今天我收到这个错误 代码是相同的 from py4j java gateway import JavaGateway gateway JavaGateway os environ SP
  • Scala 为了在 JVM 上运行做出了哪些妥协?

    Scala 是一种很棒的语言 但我想知道如果它有自己的运行时 如何改进 IE 由于 JVM 的选择 做出了哪些设计选择 我所知道的两个最重要的妥协是 类型擦除 http java sun com docs books tutorial ja
  • hive - 在值范围之间将一行拆分为多行

    我在下面有一张表 想按从开始列到结束列的范围拆分行 即 id 和 value 应该对开始和结束之间的每个值重复 包括两者 id value start end 1 5 1 4 2 8 5 9 所需输出 id value current
  • 如何记录来自 Akka (Java) 的所有传入消息

    在 Scala 中 您可以使用 LoggingReceive 包装接收函数 如何通过 Java API 实现相同的目标 def receive LoggingReceive case x do something Scala API 有Lo
  • Slick:将操作与 DBIOAction 的 Seq 组合起来

    我有 工作 以下代码 val actions for lt slickUsers insertOrUpdate dbUser loginInfo lt loginInfoAction lt slickUserLoginInfos DBUse
  • 函数式 Scala 中的选择排序

    我正在学习 Scala 编程 并编写了选择排序算法的快速实现 然而 由于我对函数式编程还不太了解 所以在转换为更 Scala 风格时遇到了困难 对于 Scala 程序员来说 如何使用 Lists 和 vals 来做到这一点 而不是回到我的命
  • 在 Windows 7 64 位中删除 Spark 临时目录时出现异常

    我正在尝试在 Windows 7 64 位中运行 Spark 作业的单元测试 我有 HADOOP HOME D winutils winutils path D winutils bin winutils exe 我运行了以下命令 winu
  • Scala 程序中三元运算符的用法[关闭]

    Closed 这个问题是无法重现或由拼写错误引起 help closed questions 目前不接受答案 我有一个需要应用过滤器的对象数组 val filteredList list filter l gt pid true l Pro
  • Java时间转正常格式

    我有 Java 时间1380822000000 我想转换为我可以阅读的内容 import java util Date object Ws1 val a new Date 1380822000000 toString 导致异常 warnin
  • 如何将 Spark DataFrame 以 csv 格式保存在磁盘上?

    例如 这样的结果 df filter project en select title count groupBy title sum 将返回一个数组 如何将 Spark DataFrame 作为 csv 文件保存在磁盘上 Apache Sp
  • 如何使用 zio-test 测试异常情况

    我有以下功能 我想测试 def people id Int RIO R People 如果有 People 则此函数返回 Peopleid 分别 如果没有则失败 例如 IO fail ServiceException s No People
  • “为 Apache Hadoop 2.7 及更高版本预构建”是什么意思?

    Apache Spark 下载页面上的 pre built for Apache Hadoop 2 7 and later 是什么意思 这是否意味着spark中HDFS必须有库 如果是这样 其他存储系统 例如 Cassandra s3 HB
  • SBT Scaladoc 配置

    我正在尝试在 SBT 中配置 Scaladoc 特别是标题 输出目录和类路径 我通过将以下内容添加到 build sbt 来定义标题 scalacOptions in Compile doc Opts doc title Scala Too
  • 使用spark-sql从oracle加载数据时如何增加默认精度和小数位数

    尝试从 oracle 表加载数据 其中我有几列保存浮点值 有时它最多保存 DecimalType 40 20 即点后 20 位数字 目前 当我使用加载其列时 var local ora df DataFrameReader ora df l

随机推荐