PySpark - RDD 中对象的时间重叠

2023-12-15

我的目标是根据时间重叠对对象进行分组。

我的每个对象rdd包含一个start_time and end_time.

我可能效率很低,但我计划做的是根据每个对象是否与任何其他对象有任何时间重叠来为每个对象分配一个重叠 id。我有时间重叠的逻辑。然后,我希望以此分组overlap_id.

所以首先,

mapped_rdd = rdd.map(assign_overlap_id)
final_rdd = mapped_rdd.reduceByKey(combine_objects)

现在我的问题来了。我该如何编写 allocate_overlap_id 函数?

def assign_overlap_id(x):
  ...
  ...
  return (overlap_id, x)

使用 Spark SQL 和数据帧的简单解决方案:

Scala:

import org.apache.spark.sql.functions.udf

case class Interval(start_time: Long, end_time: Long)

val rdd = sc.parallelize(
    Interval(0, 3) :: Interval(1, 4) ::
    Interval(2, 5) :: Interval(3, 4) ::
    Interval(5, 8) :: Interval(7, 10) :: Nil
)

val df = sqlContext.createDataFrame(rdd)

// Simple check if a given intervals overlap
def overlaps(start_first: Long, end_first: Long,
        start_second: Long, end_second: Long):Boolean = {
    (start_second > start_first & start_second < end_first) |
    (end_second > start_first & end_second < end_first) 
}

// Register udf and data frame aliases
// It look like Spark SQL doesn't support
// aliases in FROM clause [1] so we have to
// register df twice
sqlContext.udf.register("overlaps", overlaps)
df.registerTempTable("df1")
df.registerTempTable("df2")

// Join and filter
sqlContext.sql("""
     SELECT * FROM df1 JOIN df2
     WHERE overlaps(df1.start_time, df1.end_time, df2.start_time, df2.end_time)
""").show

使用 PySpark 也能完成同样的事情

from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType

rdd = sc.parallelize([
    (0, 3), (1, 4), 
    (2, 5), (3, 4),
    (5, 8), (7, 10)
])

df = sqlContext.createDataFrame(rdd, ('start_time', 'end_time'))

def overlaps(start_first, end_first, start_second, end_second):
    return ((start_first < start_second < end_first) or
        (start_first < end_second < end_first))

sqlContext.registerFunction('overlaps', overlaps, BooleanType())
df.registerTempTable("df1")
df.registerTempTable("df2")

sqlContext.sql("""
     SELECT * FROM df1 JOIN df2
     WHERE overlaps(df1.start_time, df1.end_time, df2.start_time, df2.end_time)
""").show()

按窗口分组的低级转换

更聪明的方法是使用某个指定宽度的窗口生成候选对。这是一个相当简化的解决方案:

Scala:

// Generates list of "buckets" for a given interval
def genRange(interval: Interval) = interval match {
    case Interval(start_time, end_time) => {
      (start_time / 10L * 10L) to (((end_time / 10) + 1) * 10) by 1
    }
}


// For each interval generate pairs (bucket, interval)
val pairs = rdd.flatMap( (i: Interval) => genRange(i).map((r) => (r, i)))

// Join (in the worst case scenario it is still O(n^2)
// But in practice should be better than a naive
// Cartesian product
val candidates = pairs.
    join(pairs).
    map({
        case (k, (Interval(s1, e1), Interval(s2, e2))) => (s1, e1, s2, e2)
   }).distinct


// For each candidate pair check if there is overlap
candidates.filter { case (s1, e1, s2, e2) => overlaps(s1, e1, s2, e2) }

Python:

def genRange(start_time, end_time):
    return xrange(start_time / 10L * 10L, ((end_time / 10) + 1) * 10)

pairs = rdd.flatMap(lambda (s, e): ((r, (s, e)) for r in genRange(s, e)))
candidates = (pairs
    .join(pairs)
    .map(lambda (k, ((s1, e1), (s2, e2))): (s1, e1, s2, e2))
    .distinct())

candidates.filter(lambda (s1, e1, s2, e2): overlaps(s1, e1, s2, e2))

虽然对于某些数据集来说它足以用于生产就绪的解决方案,但您应该考虑实现一些最先进的算法,例如NCList.

  1. http://docs.datastax.com/en/datastax_enterprise/4.6/datastax_enterprise/spark/sparkSqlSupportedSyntax.html
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

PySpark - RDD 中对象的时间重叠 的相关文章

随机推荐

  • Android Activity 在 4.3 设备上运行缓慢

    我开发了一个可以从网络传输视频的应用程序 我有一个活动列出了视频 包括它们的图标 标题和状态 新更新 每行都有一个视频缩略图 视频标题 然后有一个 新鲜 图标来指示它是新上传的 在模拟器中 这工作得相当好 在 2 3 和 4 0 3 HTC
  • 使用 Python 与其他程序交互

    我有一个想法 使用 Python 编写一个程序 该程序将找到我提供的歌曲的歌词 我认为整个过程应该归结为以下几件事 这些是我希望程序在运行时执行的操作 提示我输入歌曲名称 复制那个名字 打开网络浏览器 例如谷歌浏览器 将该名称粘贴到地址栏中
  • 是否还可以自定义STL向量的“参考”类型?

    是否可以定制reference of a std vector 直到 C 11 似乎可以通过Allocator模板参数 但现在不再了吗 根据文档 http en cppreference com w cpp container vector
  • 如何确定 gfortran 正在矢量化什么

    我正在尝试编写一个大规模并行蒙特卡罗代码 其中一部分将导出到 Xeon phi 协处理器 为了确保我有效地使用协处理器 我想看看编译器 当前为 gfortran 能够对代码的哪些部分进行矢量化 我知道我可以使用 ifort commane
  • Android 10 MediaStore 文件权限

    我仅在 Android 10 中遇到图像权限问题 我声明该问题仅适用于 Android 10 事实上 Android 11 和 Android 9 及更早版本都启用了写入和读取权限 在清单中我有
  • 剥离可执行文件 (Windows)

    我听说 strip 是一个可以使可执行文件变小的程序 我尝试从我的编译器 针对 Python 打开它 但是当运行 strip 时 我只是在命令提示符中看到 strip 未被识别为命令或程序 错误 那么我在哪里可以获得 Windows 版 s
  • 如何让图像看起来好像站在平台上(如果它“降落”在平台上)

    所以我正在创建我的第一个 2d java 游戏 我想知道如何让玩家看起来好像站在一个平台上 如果它落在平台上 问题在于 在我的游戏中 NINJA 始终位于屏幕中央并且从不移动 但只有背景和平台移动 关于如何解决问题有什么想法吗 r back
  • 如何调整 JTextField 的大小? [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心以获得指导 如何调整 JTextFie
  • ASP.NET:向不同的 Web 表单添加限制

    我目前正在寻求一些建议和帮助 以了解如何避免人们访问页面 除非他们 1 已登录 2 具有访问该页面的正确角色 到目前为止 我已经完成了登录页面 注册页面和其他一些页面 我还有一个链接到这些页面的数据库 用于存储用户及其各自的角色 当前在注册
  • Flutter - 如何切换flutter通道而不需要每次都下载flutter & dart sdk

    目前我正在尝试 flutter web 为此我需要在 flutter master 频道上工作 但是 然后我需要处理其他项目 在他们身上 我正在开发颤振稳定通道 但是 每次我使用命令 flutter channel stable 或 flu
  • 将组合框字符串值转换为 int

    我有一个关于转换类型的问题 我想将当前选定的组合框值字符串更改为 int 但出现错误 My code int Parse age SelectedItem ToString 对于这个问题我能做什么 好的 现在我们知道错误了 您可以在尝试解析
  • xpath 查找特定根下具有特定名称的所有属性

    为了找到所有具有名称的属性myAttr在文档中我可以这样做 myAttr但是如果我想指定根并仍然在文档中查找具有该名称的所有属性怎么办 就像是 root whatever or nothing myAttribute 这样怎么样 root
  • SQL Server 中按 x 排序,然后按 y 列排序

    考虑一个像这样的表 debit credit code 0 10 5 5 0 3 0 11 2 0 15 1 7 0 6 6 0 2 5 0 1 我需要生成这样的结果集 首先借记 然后按代码列排序 debit credit code 5 0
  • 如何在 Flutter 中更改主题?

    所以我在这里尝试获取当前主题 无论是浅色还是深色 所以我可以相应地改变小部件颜色 但是 它不起作用 我使用 if 语句来知道何时是黑暗模式 但它总是 False 这是代码 顺便说一句 它在深色和浅色主题之间切换 但是当我尝试获取当前主题时
  • Subversion E160004 X的根节点的前身是Y但应该是Z

    我继承了一个大型 Subversion 存储库 74010 修订版 并且我正在尝试执行转储 加载以将存储库升级到 1 8 版本 以利用节省空间的功能 在尝试这个过程之前我跑了svnadmin verify对有问题的存储库进行检查 以确保该存
  • 在 Google 商店中将多个 Chrome 扩展程序作为单个项目发布

    Chrome 扩展程序和 Chrome 应用程序具有我需要实现某些功能的 API 但我无法仅使用扩展程序或仅使用应用程序或使用本机代码来实现此目的 所以我制作了一个扩展程序和一个应用程序 并使它们通过消息相互通信 一切正常 但现在我必须发布
  • 将表单提交到操作 php 文件

    我有一个表单 当用户单击 提交 时 我需要运行一个 php 文件 下面是表单和 php 文件
  • Spirit X3,如何让属性类型匹配规则类型?

    对于 Spirit X3 解析器的开发 我想使用语义操作 脚注 1 对我来说 控制如何将属性存储到 STL 容器中非常重要 这个问题是关于如何控制解析器属性 attr ctx 与规则类型 val ctx 匹配 以便可以正确分配它 也许这个问
  • 如何构建电影数据库和用户选择?

    我想创建电影数据库 用户可以在其中标记他 她观看和喜欢的电影 class Movies ndb Model watched ndb UserProperty liked ndb UserProperty 那行得通吗 我使用谷歌帐户 以后我应
  • PySpark - RDD 中对象的时间重叠

    我的目标是根据时间重叠对对象进行分组 我的每个对象rdd包含一个start time and end time 我可能效率很低 但我计划做的是根据每个对象是否与任何其他对象有任何时间重叠来为每个对象分配一个重叠 id 我有时间重叠的逻辑 然