根据最近的关键条件连接 Spark DataFrame

2024-04-20

在 PySpark 中执行模糊连接的高效方法是什么?

我正在寻找社区对在最近的关键条件下加入大型 Spark DataFrame 的可扩展方法的看法。请允许我通过一个有代表性的例子来说明这个问题。假设我们有以下 Spark DataFrame,其中包含在某个时间点发生的事件:

ddf_event = spark.createDataFrame(
    data=[
        [1, 'A'],
        [5, 'A'],
        [10, 'B'],
        [15, 'A'],
        [20, 'B'],
        [25, 'B'],
        [30, 'A']
    ],
    schema=['ts_event', 'event']
)

以及以下包含在某个时间点测量的 GPS 数据的 Spark DataFrame:

ddf_gps = spark.createDataFrame(
    data=[
        [2, '(-46.84635, 173.13674)'],
        [4, '(2.50362, 104.34136)'],
        [8, '(-24.20741, 51.80755)'],
        [15, '(-59.07798, -20.49141)'],
        [18, '(-44.34468, -167.90401)'],
        [24, '(-18.84175, 16.68628)'],
        [27, '(20.48501,58.42423)']
    ],
    schema=['ts_gps', 'gps_coordinates']
)

我们希望加入它以生成以下结果数据帧:

+--------+-----+------+-----------------------+
|ts_event|event|ts_gps|gps_coordinates        |
+--------+-----+------+-----------------------+
|1       |A    |2     |(-46.84635, 173.13674) |
|5       |A    |4     |(2.50362, 104.34136)   |
|10      |B    |8     |(-24.20741, 51.80755)  |
|15      |A    |15    |(-59.07798, -20.49141) |
|20      |B    |18    |(-44.34468, -167.90401)|
|25      |B    |24    |(-18.84175, 16.68628)  |
|30      |A    |27    |(20.48501,58.42423)    |
+--------+-----+------+-----------------------+

给定事件时间戳和 GPS 数据时间戳,有效找到最近的 GPS 数据点。

因此,我们遇到了连接最近关键条件的问题,在这种情况下,“最近”被定义为时间戳之间的最小绝对差。

我探索了两种方法来实现此目的:一种基于过滤分箱连接 (FBJ),一种基于过滤排序联合 (FSU)。下面更详细地描述这两种方法。

FBJ 方法取决于参数bin_size,这限制了可以找到匹配 GPS 时间戳的时间窗口。增加bin_size增加计算负载,减少计算负载会降低结果质量。

这两种方法似乎都不随输入 DataFrame 的大小线性缩放。

在实践中,我必须处理由数千万行组成的输入数据,因此我目前找不到解决该问题的可行解决方案。

FBJ方法

FBJ 方法包括以下步骤:

  1. 创建一个ts_bin列,将timestamp列,实施者:
bin_size = 10
ddf_event = ddf_event.withColumn(
    'ts_bin',
    F.round(F.col('ts_event') / bin_size)
)

ddf_gps = ddf_gps.withColumn(
    'ts_bin',
    F.round(F.col('ts_gps') / bin_size)
)
  1. 加入 DataFramests_bin专栏,实施者:
ddf = ddf_event.join(ddf_gps, 'ts_bin', 'left_outer')
  1. 确定最小时间戳差异,实现方式为:
from pyspark.sql.window import Window

window = Window.partitionBy('ts_event')

ddf = ddf.withColumn(
    'ts_diff',
    F.abs(F.col('ts_gps') - F.col('ts_event'))
)

ddf = ddf.withColumn(
    'min_ts_diff',
    F.min(F.col('ts_diff')).over(window)
)
  1. 过滤并选择相关的行和列,实现方式为:
ddf = (
    ddf
    .where(
        (F.col('ts_diff') == F.col('min_ts_diff')) |
        (F.col('ts_diff').isNull())   
    )
    .select(
        'ts_event',
        'event',
        'ts_gps',
        'gps_coordinates'
    )
)

Limit bin_size情况:

  • bin_size >> 1有效地产生完全交叉连接
  • bin_size = 1有效地导致左连接ts_event == ts_gps

前苏联方法

FSU 方法包括以下步骤:

  1. 联合 DataFrames,实现者:
def union(df1, df2):
    cols = list(set(df1.columns).union(set(df2.columns)))
    for col in cols:
        if col not in df1.columns:
            df1 = df1.withColumn(col, F.lit(None))
        if col not in df2.columns:
            df2 = df2.withColumn(col, F.lit(None))
    return df1.select(cols).union(df2.select(cols))

ddf_event = ddf_event.withColumn('timestamp', F.col('ts_event'))
ddf_gps = ddf_gps.withColumn('timestamp', F.col('ts_gps'))
ddf = union(ddf_event, ddf_gps)
  1. 对生成的 DataFrame 进行排序并获取相邻的 GPS 时间戳,实现方式为:
from sys import maxsize

last_window = Window.orderBy(
    F.col('timestamp').asc()).rowsBetween(-maxsize, 0)
first_window = Window.orderBy(
    F.col('timestamp').asc()).rowsBetween(0, maxsize)

ddf = (
    ddf.withColumn(
        'prev_time',
        F.last(F.col('ts_gps'), ignorenulls=True)
         .over(last_window)
    ).withColumn(
        'prev_coordinates',
        F.last(F.col('gps_coordinates'), ignorenulls=True)
         .over(last_window)
    ).withColumn(
        'next_time',
        F.first(F.col('ts_gps'), ignorenulls=True)
         .over(first_window)
    ).withColumn(
        'next_coordinates',
        F.first(F.col('gps_coordinates'), ignorenulls=True)
         .over(first_window)
    )
)
  1. 过滤并选择相关的行和列,实现方式为:
condition = (F.col('timestamp') - F.col('prev_time')
             < F.col('next_time') - F.col('timestamp'))

ddf = (
    ddf
    .where(F.col('event').isNotNull())
    .withColumn(
        'ts_gps',
        F.when(condition | F.col('next_time').isNull(), F.col('prev_time')).otherwise(F.col('next_time'))
    ).withColumn(
        'gps_coordinates',
        F.when(condition | F.col('next_time').isNull(),
               F.col('prev_coordinates'))
         .otherwise(F.col('next_coordinates'))
    ).select(
        'ts_event',
        'event',
        'ts_gps',
        'gps_coordinates'
    )
)

您正在寻找的是时间连接。 查看时间序列Spark库Flint(原名HuoHua,中文为Spark):https://github.com/twosigma/flint https://github.com/twosigma/flint

使用此库,对于 2 个给定的时间序列数据帧(文档解释了这些对象),您可以在 PySpark(或 Scala Spark)中执行:

ddf_event = ...
ddf_gps = ...
result = ddf_event.leftJoin(ddf_gps, tolerance = "1day")

您的时间戳不清楚,因此请根据您的需要设置容差。 如果需要,您还可以进行“未来连接”。

查看他们的 Spark Summit 演示,了解更多解释和示例:https://youtu.be/g8o5-2lLcvQ https://youtu.be/g8o5-2lLcvQ

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

根据最近的关键条件连接 Spark DataFrame 的相关文章

  • 以类似字典的方式将新项目添加到某些结构化数组中

    我想扩展 numpy 中的结构化数组对象 以便我可以轻松添加新元素 例如 对于一个简单的结构化数组 gt gt gt import numpy as np gt gt gt x np ndarray 2 dtype names A B fo
  • 可视化时间序列时标记特定日期

    我有一个包含几年数据的时间序列 例如 ts pd Series np random randn 1000 index pd date range 1 1 2000 periods 1000 ts ts cumsum ts plot 我还有两
  • 从 SQL Server 中调用 Python 文件

    我的文件名中有 Python 脚本 C Python HL py 在此 Python 脚本中 有预测模型以及对 SQL 数据库中某些表的更新 我想将此文件称为 SQL 作业 我怎样才能做到这一点 这个问题不一样 如何在 SQL Server
  • Flask-httpauth: get_password 装饰器如何为 basic-auth 工作?

    我想知道有没有人用过这个烧瓶延伸 https github com miguelgrinberg flask httpauth简化 http basic auth 基本上我不明白这个example https github com migu
  • 可重用的 Tensorflow 卷积网络

    我想重用来自Tensorflow 专业人士的 MNIST CNN 示例 http www tensorflow org tutorials mnist pros index md 我的图像尺寸为 388px X 191px 只有 2 个输出
  • python中remove方法的安全使用

    我从列表继承了一个 UserList 类并实现了以下方法来删除标记为已删除的条目 def purge deleted self for element in list iter self if ele mark deleted lt 1 s
  • 从文件中读取单词并放入列表中

    本质上 我有一个巨大的文件 所有文件包含每行多个单词 每个单词用空格分隔 有点像这样 WORD WORD WORD WORD ANOTHER WORD SCRABBLE BLAH YES NO 我想要做的是将文件中的所有单词放入一个巨大的列
  • 设置区域设置和字符串模块

    这个简单的脚本 from locale import LC ALL setlocale print setlocale LC ALL from string import letters print letters 给我这个输出 tr TR
  • Python 中 Goto 标签的替代方案?

    我知道我不能使用 Goto 我也知道 Goto 不是答案 我读过类似的问题 但我只是想不出解决我的问题的方法 所以 我正在编写一个程序 你必须在其中猜测一个数字 这是我遇到问题的部分的摘录 x random randint 0 100 I
  • 如何将 Jinja 与 Twisted 一起使用?

    我正在计划使用 Python 与 Twisted Storm 和 Jinja 一起开发一个讨论软件 问题是 Jinja 不是为 Twisted 或异步套接字库而设计的 并且使用 Twisted 提供的性能是我不打算使用 Flask 的原因
  • Python 对象属性 - 访问方法

    假设我有一个具有某些属性的类 在 Pythonic OOP 中 如何访问这些属性是最好的 就像obj attr 或者也许编写 get 访问器 此类事物可接受的命名风格是什么 Edit 您能否详细说明使用单下划线或双前导下划线命名属性的最佳实
  • 在 Python 中将 int 转换为 ASCII 并返回

    我正在为我的网站制作一个 URL 缩短器 我当前的计划 我愿意接受建议 是使用节点 ID 来生成缩短的 URL 因此 理论上 节点 26 可能是short com z 节点 1 可能是short com a 节点 52 可能是short c
  • Java中使用final关键字会提高性能吗?

    在 Java 中 我们看到很多地方final可以使用关键字 但其使用并不常见 例如 String str abc System out println str 在上述情况下 str can be final但这通常被忽略 当一个方法永远不会
  • 按分区“内”键进行高效分组

    我正在尝试调整一个流程来激发火花 基本上 该过程分析来自 JDBC 数据源的批量数据 每条记录都有一个batchId 还有一个更高级别的groupId 批次数量较大 提前未知 组数约为 100 RAM 中可以容纳每个批次的记录数 实际的分析
  • 按多索引的一级对 pandas DataFrame 进行排序

    我有一个多索引 pandas DataFrame 需要按索引器之一进行排序 这是数据片段 gene VIM treatment dose time TGFb 0 1 2 0 158406 1 2 0 039158 10 2 0 052608
  • 如何点击 Google Trends 中的“加载更多”按钮并通过 Selenium 和 Python 打印所有标题

    这次我想单击一个按钮来加载更多实时搜索 这是网站的链接 该按钮位于页面末尾 代码如下 div class feed load more button Load more div 由于涉及到一些 AngularJS 我不知道该怎么做 有什么提
  • NumPy 数组不可 JSON 序列化

    创建 NumPy 数组并将其保存为 Django 上下文变量后 加载网页时收到以下错误 array 0 239 479 717 952 1192 1432 1667 dtype int64 is not JSON serializable
  • 是否可以使用 Python 中的密码安全地加密然后解密数据?

    我在 python 程序中有一些数据 我想在使用密码写入文件之前对其进行加密 然后在使用它之前读取并解密它 我正在寻找一些可以根据密码进行加密和解密的安全对称算法 这个问题 https stackoverflow com questions
  • 网站性能衡量

    我需要一个免费的工具来测量网站的性能 并且不需要对代码 jsp asp 页面 进行任何更改 感谢所有帮助 对于绩效衡量 我建议您YSlow http developer yahoo com yslow 它是一个 Firefox 插件 集成了
  • 如何访问模板缓存? - 姜戈

    I am 缓存 HTML在几个模板内 例如 cache 900 stats stats endcache 我可以使用以下方式访问缓存吗低级图书馆 例如 html cache get stats 我确实需要对模板缓存进行一些细粒度的控制 有任

随机推荐