Spark如何执行join+filter?它具有可扩展性吗?

2024-03-01

假设我有两个大型 RDD,A 和 B,包含键值对。我想使用密钥连接 A 和 B,但是在匹配的 (a,b) 对中,我只想要一小部分“好”的。所以我进行连接并随后应用过滤器:

A.join(B).filter(isGoodPair)

where isGoodPair是一个布尔函数,它告诉我一对 (a,b) 是否良好。

为了很好地扩展,Spark 的调度程序最好避免在A.join(B)明确地。即使在大规模分布式的基础上,这也可能导致耗时的磁盘溢出,甚至耗尽某些节点上的所有内存和磁盘资源。为了避免这种情况,Spark 应该在每个分区内生成对 (a,b) 时应用过滤器。

我的问题:

  1. Spark 真的这样做吗?
  2. 其架构的哪些方面可以实现或阻止所需的行为?
  3. 我应该使用cogroup反而?在 PySpark 中,它返回一个迭代器,因此我可以将过滤器应用于迭代器,对吧?

我在 PySpark shell(运行 Spark 1.2.1)中进行了一个实验来回答这些问题。结论如下:

  1. 不幸的是,Spark 确实not当连接生成对时应用过滤器。它在继续过滤连接对之前显式生成整个连接对集。
  2. 这可能是因为 Spark 一次运行一次 RDD 转换。它通常无法执行这种微妙的链接优化。
  3. 通过使用cogroup代替join,我们可以手动实现想要的优化。

实验

我制作了一个包含 100 个组的 RDD,每个组包含 1 到 10,000 的整数,并且在每个组中我计算了最多相距 1 的整数的数量:

import itertools as it
g = int(1e2) # number of groups
n = int(1e4) # number of integers in each group
nPart = 32 # standard partitioning: 8 cores, 4 partitions per core
A = sc.parallelize(list(it.product(xrange(g),xrange(n))),nPart) 

def joinAndFilter(A):
    return A.join(A).filter(lambda (k,(x1,x2)): abs(x1 - x2) <= 1)

def cogroupAndFilter(A):
    def fun(xs):
        k,(xs1,xs2) = xs
        return [(x1,x2) for (x1,x2) in it.product(xs1,xs2) if abs(x1 - x2) <= 1]
    return A.cogroup(A).flatMap(fun)

cogroupAndFilter(A).count()
joinAndFilter(A).count() 

我没有简单的方法来分析代码,所以我只是在我的 Mac 上的“活动监视器”中观察它的运行情况:

当我使用时,内存使用量激增joinAndFilter,大概是因为它在应用相差一过滤器之前生成了所有对。事实上,我不得不杀死 PySpark,因为它耗尽了我所有的内存,并且即将导致系统崩溃。和cogroupAndFilter,这些对在生成时就被过滤,因此内存保持在控制之下。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark如何执行join+filter?它具有可扩展性吗? 的相关文章

随机推荐

  • Java:三个字符串,字典顺序

    初学者 Java 程序员在这里 我试图将三个字符串相互比较 并让系统按字典顺序吐出第二个 中间的单词 import java util public class Ordered2 public static void main String
  • Elm - 生成随时间变化的随机数列表

    我试图使一列随机数每秒发生变化 但我收到不同的错误消息 import Random main flow down asText Random range 0 100 every second asText Random range 0 10
  • 标准库函数在 C 中如何工作? [复制]

    这个问题在这里已经有答案了 在浏览 C 标准库函数时 glibc 我找到printf 实际上打电话puts 功能 IO puts 但我无法找出 put 函数实际上是如何写入的stdout 是否使用write 系统调用定义在unistd h或
  • 带有虚线图案的渐变线

    我需要创建一条具有线性渐变的虚线 我设法使用创建了一条虚线 hr 以及以下样式 line border 0px border bottom 2px dashed 我也知道要实现渐变 我需要这样做 background webkit grad
  • 在 ec2 中启动 minikube 显示“X 抱歉,Kubernetes v1.18.0 需要将 conntrack 安装在 root 路径中”

    我正在尝试启动 Minikube 所以我跑了 minikube start vm driver none 但它在控制台中显示以下行 Amazon 2 Xen amd64 上的 minikube v1 9 2 根据用户配置使用无驱动程序 X
  • 使用 Google Apps 脚本 (GAS) V8 定义私有类字段

    自从 Google 推出 V8 引擎以来 我正在将一些代码迁移到新引擎 ES6 允许定义私有类 但是在 Google App Script 上运行时 我收到错误 Example class IncreasingCounter count 0
  • SharePlum 错误:“无法获取用户信息列表”

    我正在尝试使用分享梅花 https pypi python org pypi SharePlum 0 1 1这是 SharePoint 的 Python 模块 但是当我尝试连接到我的 SharePoint 时 SharePlum 会向我抛出
  • 对象拥有 QObject 派生类集合的正确方法是什么?

    我正在尝试创建一个类 公开 QObject 派生类 具有其自己的 qt 属性 的集合 或多个 我可以在 qml 中使用 qt 属性 根据http qt project org doc qt 5 0 qtcore qobject html n
  • 如何从wpf中的代码隐藏更改控件的Grid.Row和Grid.Column

    我已将控件放置在DataGrid像这样
  • python结构解包长度错误

    我有一个长度为 41 的字节对象 我尝试用以下方法解压它 struct unpack 2B2B32sBi data 但我收到一个错误 struct error 解包需要长度为 44 的字节对象 我认为长度2B2B32sBi应该2 1 2 1
  • 使c++程序在windows中的特定核心上运行

    我想知道如何强制 Visual Studio 中的 C 程序在特定的核心上运行 在拥有多个核心的计算机上 i found this https stackoverflow com questions 8326427 how to force
  • 以编程方式设置Android动画列表

    我正在尝试将 gif 动画添加到我的应用程序中 1 我可以从服务器下载 gif 动画 2 我能够解码动画 gif 使用我的自定义解码器 并拥有与其帧相对应的单独位图 现在我想使用逐帧动画来制作它的动画 正如我所读到的 要执行逐帧动画 首先需
  • 如何设置camel处理器或其他路由成分的id

    Camel 自动生成处理器和其他内容的 ID processor1 processor25 有没有办法设置这个名字 我们需要通过 jmx 识别某些处理器来获取遥测数据 我想要设置的名称是通过属性给出的 它们在开始时是已知的 因此 我需要在定
  • jQuery 交换图像未加载

    我正在尝试使用 jQuery attr 将图像从 Images origImage 的原始图像源位置交换到 Images newImage 的新图像源位置 当我单击 div 时 如果右键单击图像并查看地址 URL 属性 图像 Url 属性会
  • 如何将不同类型的列插入到numpy数组中?

    我想附加两个类型的 numpy 数组np datetime64 and int到另一个 这会导致错误 我需要做什么来纠正这个问题 如果我将向量附加到自身上 即 np append c c axis 1 or np append a a ax
  • 如何设置gdb的默认选项?

    我每次打开 GDB 时都会设置几个选项 例如 set print thread events off 有没有办法默认设置这些选项 也许类似于 gdb rc 文件 初始化文件为gdb叫做 gdbinit 您可以将所需的选项放入此文件中 它们将
  • 当 div 滚动到视口时淡入

    好的 所以我一直在寻找simple当用户将其滚动到视图中时淡入 div 的方法 但我找不到直接的解决方案 HTML div class container div class topdiv This is a 100 height div
  • C# - 无法在方法内声明委托

    我这里真的是一片空白 我在想why我无法在方法中声明委托类型 但我必须在类级别声明委托类型 namespace delegate learning class Program Works fine public delegate void
  • 静态内存实例中的字符串计数

    据我所知 编译时类似 C 的字符串仅作为一个实例保存在静态内存中 例如我两者都有true在 gcc 4 6 上运行下面的示例 但我想知道它是否总是如此并且可以便携 C 和 C 上的行为都很有趣 include
  • Spark如何执行join+filter?它具有可扩展性吗?

    假设我有两个大型 RDD A 和 B 包含键值对 我想使用密钥连接 A 和 B 但是在匹配的 a b 对中 我只想要一小部分 好 的 所以我进行连接并随后应用过滤器 A join B filter isGoodPair where isGo