连接 Spark 数据帧时相当于 pandas merge_asof,具有合并最近和容差

2024-03-19

我正在尝试复制熊猫merge_asof https://pandas.pydata.org/pandas-docs/version/0.25.0/reference/api/pandas.merge_asof.html加入 Spark 数据帧时的行为。

假设我有两个数据框,df1 and df2:

import pandas as pd
df1 = pd.DataFrame([{"timestamp": 0.5 * i, "a": i * 2} for i in range(66)])
df2 = pd.DataFrame([{"timestamp": 0.33 * i, "b": i} for i in range(100)])

# use merge_asof to merge df1 and df2
merge_df = pd.merge_asof(df1, df2, on='timestamp', direction='nearest', tolerance=df.timestamp.diff().mean() - 1e-6)

结果merge_df将会:

timestamp a b
0.0 0 0
0.5 2 2
1.0 4 3
1.5 6 5
2.0 8 6
... ... ..
30.5 122 92
31.0 124 94
31.5 126 95
32.0 128 97
32.5 130 98

现在在 Spark 中给出类似的数据帧:

df1_spark = spark.createDataFrame([{"timestamp": 0.5 * i, "a": i * 2} for i in range(66)])
df2_spark = spark.createDataFrame([{"timestamp": 0.33 * i, "b": i} for i in range(100)])

如何连接两个 Spark 数据帧以产生与 pandas 类似的结果,并且可配置direction and tolerance?

[Edit]
根据类似帖子的建议,将功能应用于Window会产生类似的行为direction范围。但是,我仍然不知道如何应用函数来查找最近的行(比如如何nearest会表现)并且在一定范围内(tolerance).


下面提供了可配置的direction and tolerance论据。使用窗口函数(last对于“落后”,first代表“向前”,两者都代表“最近”)。

另外,根据我的经验,by熊猫的论点merge_asof经常需要。所以我也努力将此参数添加到函数中。论据by将是有益的,因为它可以通过创建分区进一步提高性能。

from pyspark.sql import functions as F, Window as W

def merge_asof(df_left, df_right, on: str, by=None, tolerance=None, direction: str='backward'):
    def backward():
        return add_diff(F.last(stru1, True).over(w0))
    def forward():
        return add_diff(F.first(stru1, True).over(w0.rowsBetween(0, W.unboundedFollowing)))
    def nearest():
        return F.array_sort(F.array(backward(), forward()))[0]
    def add_diff(col):
        return F.struct(
            F.abs(F.col(on) - col[on]).alias('diff'),
            col[on].alias(on),
            col[c].alias(c)
        )

    df_r = df_right if by else df_right.withColumn('_by', F.lit(1))
    df_l = df_left if by else df_left.withColumn('_by', F.lit(1))
    df_l = df_l.withColumn('_df_l', F.lit(True))
    by = [by] if isinstance(by, str) else by or ['_by']

    join_on = [on] + by
    df = df_l.join(df_r, join_on, 'full')

    w0 = W.partitionBy(*by).orderBy(on)
    for c in set(df_right.columns) - set(join_on):
        stru1 = F.when(~F.isnull(c), F.struct(on, c))
        stru2 = eval(f'{direction}()')
        if tolerance:
            stru2 = stru2.withField(c, F.when(stru2['diff'] <= tolerance, stru2[c]))
        df = df.withColumn(c, stru2[c])
    df = df.filter('_df_l').drop('_df_l', '_by')

    return df

一些解释

首先,函数的参数被稍微修改,并且基于两者执行完全连接,on and by论据。

df_r = df_right if by else df_right.withColumn('_by', F.lit(1))
df_l = df_left if by else df_left.withColumn('_by', F.lit(1))
df_l = df_l.withColumn('_df_l', F.lit(True))
by = [by] if isinstance(by, str) else by or ['_by']

join_on = [on] + by
df = df_l.join(df_r, join_on, 'full')

然后,对于右侧数据框中的每一列(除了on and by列),正在计算一个新值direction and tolerance.

w0 = W.partitionBy(*by).orderBy(on)
for c in set(df_right.columns) - set(join_on):
    stru1 = F.when(~F.isnull(c), F.struct(on, c))
    stru2 = eval(f'{direction}()')
    if tolerance:
        stru2 = stru2.withField(c, F.when(stru2['diff'] <= tolerance, stru2[c]))
    df = df.withColumn(c, stru2[c])

stru1列(的struct类型)被创建,持有on and c价值观。eval(f'{direction}()')执行一个基于的函数direction。函数是为每个direction值(“向后”、“向前”、“最近”)。这些函数向结构列添加另一字段(“diff”)。那么,如果“diff”在上面tolerancelevel,列的值null.

一些例子

df1_spark = spark.createDataFrame([{"timestamp": 0.5 * i, "a": i * 2} for i in range(66)])
df2_spark = spark.createDataFrame([{"timestamp": 0.33 * i, "b": i} for i in range(100)])

merge_asof(df1_spark, df2_spark, on='timestamp', direction='backward').show(3)
# +---------+---+---+
# |timestamp|  a|  b|
# +---------+---+---+
# |      0.0|  0|  0|
# |      0.5|  2|  1|
# |      1.0|  4|  3|
# +---------+---+---+

merge_asof(df1_spark, df2_spark, on='timestamp', direction='forward').show(3)
# +---------+---+---+
# |timestamp|  a|  b|
# +---------+---+---+
# |      0.0|  0|  0|
# |      0.5|  2|  2|
# |      1.0|  4|  4|
# +---------+---+---+

merge_asof(df1_spark, df2_spark, on='timestamp', direction='nearest').show(3)
# +---------+---+---+
# |timestamp|  a|  b|
# +---------+---+---+
# |      0.0|  0|  0|
# |      0.5|  2|  2|
# |      1.0|  4|  3|
# +---------+---+---+

merge_asof(df1_spark, df2_spark, on='timestamp', tolerance=0.05, direction='nearest').show()
# +---------+---+----+
# |timestamp|  a|   b|
# +---------+---+----+
# |      0.0|  0|   0|
# |      0.5|  2|null|
# |      1.0|  4|   3|
# |      1.5|  6|null|
# |      2.0|  8|   6|
# |      2.5| 10|null|
# |      3.0| 12|   9|
# |      3.5| 14|null|
# |      4.0| 16|  12|
# |      4.5| 18|null|
# |      5.0| 20|  15|
# |      5.5| 22|null|
# |      6.0| 24|null|
# |      6.5| 26|null|
# |      7.0| 28|null|
# |      7.5| 30|null|
# |      8.0| 32|null|
# |      8.5| 34|null|
# |      9.0| 36|null|
# |      9.5| 38|null|
# +---------+---+----+
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

连接 Spark 数据帧时相当于 pandas merge_asof,具有合并最近和容差 的相关文章

随机推荐

  • Android NDK - NativeActivity 与 JNI 库

    两年前 我开发了一个增强现实框架android 7 闪电泡芙 http developer android com about versions android 2 1 html 由于 AR 应用程序是计算密集型任务 因此我开发了一个 JN
  • 我应该使用类还是字典?

    我有一个只包含字段而不包含方法的类 如下所示 class Request object def init self environ self environ environ self request method environ get R
  • 如何创建通用存储库?

    我想知道是否有人有关于制作通用存储库的任何好的教程 或者甚至可能是已经制作好的并且有详细记录的库 我当前正在使用 linq to sql 但它可能会发生变化 所以我不知道您是否可以创建一个通用存储库 如果我说切换到实体框架 则几乎不需要任何
  • 以编程方式展开/折叠 CoordinatorLayout 中的底部导航视图

    我有一个CoordinatorLayout其中包含一个BottomNavigationView and an AppBarLayout with a ToolBar在它里面 这BottomNavigationView不在里面AppBarLa
  • 部署不渲染 Kendo UI

    VS2012 asp net MVC4 c 带有 KendoUI 实现的互联网应用程序 最简单的说法是我的网站看起来像这样 开发机器上的本地 像这样部署 我检查了服务器 Kendo UI 论坛 所有论坛都指向图像丢失 我检查了甚至复制了我的
  • 授予 EC2 实例对 S3 存储桶的访问权限

    我想授予我的 ec2 实例对 s3 存储桶的访问权限 在此 ec2 实例上 启动了一个包含我的应用程序的容器 现在我没有获得 s3 存储桶的许可 这是我的存储桶政策 Version 2012 10 17 Id Policy146280822
  • 如何使用iPhone SDK下载文本文件?

    我是开发基于视图的 iPhone 应用程序的新手 我需要下载 这个 txt 文件来自互联网 并将其保存到应用程序的文档文件夹中 谁能简单地告诉我如何做到这一点 txt 文件很小 所以我不会 需要任何用户界面对象 Thanks Kevin N
  • 如何使用CSS仅在移动设备上显示文本?

    我有一个文本 在 div 中 显示在桌面和移动屏幕上 Expected 我希望文本仅显示在 media only screen and max width 768px How to 隐藏 div 与display none or 还有其他解
  • Django-rest-framework 多个 url 参数

    如何将 示例对象 映射到 url website com api
  • 编辑 PrimeNG 组件的 CSS

    我目前正在使用 Angular 4 Angular Materials 和 PrimeNG 组件开发用户界面 我正在处理的最新组件是来自 PrimeNG 的 MultiSelect 组件 https www primefaces org p
  • iOS 禁用横向 LaunchScreen.storyboard

    我有一个LaunchScreen storybaord显示徽标 文本 因此与方向无关 该应用程序始终以纵向启动 但它有某些允许横向模式的视图控制器 因此不能选择仅使应用程序纵向 我想要的是启动屏幕始终以纵向显示 因此 在应用程序启动期间将手
  • 通过模拟器发送电话号码

    我正在制作一个应用程序 我想检索设备电话号码并将其发送到服务器上 但我正在 Android 模拟器上测试这个应用程序 谁能告诉我如何在模拟器和实际设备中设置或获取电话号码 Thanks 如果我们使用电话管理器 我们可以在模拟器中获取电话号码
  • 如何在 Entity Framework Core 中删除多行? [复制]

    这个问题在这里已经有答案了 我需要使用 Entity Framework Core 从数据库中删除多行 此代码不起作用 foreach var item in items myCollection Remove item 因为我在第一个对象
  • 如何测试 Ruby on Rails 功能测试的 JSON 结果?

    我该如何维护我的Ajax http en wikipedia org wiki Ajax 28programming 29请求并测试 Ruby on Rails 功能测试的 JSON 输出 在 Rails gt 5 中 Use Action
  • Jena PrefixMapping:当模型是从数据集中获取的命名模型时,基本命名空间缺失

    这是我用来加载的代码OntModel to a Dataset作为命名模型 然后我尝试检索PrefixMapping以两种不同的方式实现相同的目的 public static void loadDatasetwithNamedModels
  • 获取“exec”调用中最后一个表达式的值

    假设我在字符串中有一些 python 代码 code a 42 a and I exec那串代码 result exec code Then result一直会None 有没有办法获得最后一个表达式的值 在这种情况下 那就是5 since
  • VueJS 读取 Dom 属性

    我想获取按钮单击事件的 href 属性 a href user all 2 i class fa fa edit i span Get Data span a 主要 JS 文件 new Vue el body methods func fu
  • 实体框架 4 存储过程调用超时

    我有一个导入到 EF4 中的存储过程 当我在 30 秒后使用某些参数调用它时 它会抛出超时错误 在 SQL Server Profiler 中 我可以看到使用正确参数的存储过程调用仅花费了 30 秒多一点 这是我的应用程序的超时时间 然而
  • Spring MVC:当未指定内容类型时@RequestBody

    我有一个 Spring MVC 应用程序 它以 JSON 字符串的形式从外部系统接收 HTTP 请求 其响应的返回方式与 JSON 字符串类似 我的控制器正确注释为 RequestBody and ResponseBody我有集成测试 它实
  • 连接 Spark 数据帧时相当于 pandas merge_asof,具有合并最近和容差

    我正在尝试复制熊猫merge asof https pandas pydata org pandas docs version 0 25 0 reference api pandas merge asof html加入 Spark 数据帧时