在 Apache Spark 中,为什么 RDD.union 不保留分区器?

2024-02-07

众所周知,Spark中的分区器对任何“宽”操作都会产生巨大的性能影响,因此通常在操作中进行定制。我正在尝试以下代码:

val rdd1 =
  sc.parallelize(1 to 50).keyBy(_ % 10)
    .partitionBy(new HashPartitioner(10))
val rdd2 =
  sc.parallelize(200 to 230).keyBy(_ % 13)

val cogrouped = rdd1.cogroup(rdd2)
println("cogrouped: " + cogrouped.partitioner)

val unioned = rdd1.union(rdd2)
println("union: " + unioned.partitioner)

我看到默认情况下cogroup()总是生成带有自定义分区器的 RDD,但是union()不,它总是会恢复为默认值。这是违反直觉的,因为我们通常假设 PairRDD 应该使用其第一个元素作为分区键。有没有办法“强制”Spark 合并 2 个 PairRDD 以使用相同的分区键?


union是一个非常有效的操作,因为它不会移动任何数据。如果rdd1有 10 个分区rdd2那么有20个分区rdd1.union(rdd2)将有 30 个分区:两个 RDD 的分区放在一起。这只是记账变化,没有洗牌。

但它必然会丢弃分区器。为给定数量的分区构建分区器。生成的 RDD 有许多与两者不同的分区rdd1 and rdd2.

加入工会后你可以运行repartition打乱数据并按键组织它。


上述情况有一个例外。如果rdd1 and rdd2具有相同的分区器(具有相同数量的分区),union行为不同。它将成对连接两个 RDD 的分区,为其提供与每个输入相同数量的分区。这可能涉及移动数据(如果分区不是位于同一位置),但不会涉及洗牌。在这种情况下,分区器被保留。 (此代码位于PartitionerAwareUnionRDD.scala https://github.com/apache/spark/blob/v1.3.1/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala.)

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在 Apache Spark 中,为什么 RDD.union 不保留分区器? 的相关文章

随机推荐

  • 升级时创建表

    我一直在努力解决这个问题 但我不知道我错过了什么 我有一个 Android 应用程序 我希望再添加 1 个表 但是我无法做到这一点 而且我也没有例外 不喜欢这些无声杀手 下面是我的 SQLiteHelper 类的代码 public clas
  • GWT 对单元格表进行排序,可能只是我没有看到的

    在过去的几个小时里 我一直在努力对 GWT CellTable 进行排序 这确实是一个愚蠢的问题 因为它已经在这里完成了http gwt google com samples Showcase Showcase html CwCellTab
  • Operator= 和 C++ 中未继承的函数?

    在我刚刚进行的测试之前 我认为在 C 中只有构造函数不被继承 但显然 任务operator 是不是太 这是什么原因呢 是否有任何解决方法来继承赋值运算符 是否也是如此operator operator 所有其他函数 除了构造函数 运算符 都
  • 暂停JW播放器?

    我有三个标签 每个选项卡的滑块中有两个视频 问题是当我切换任何选项卡时or单击任何单个视频 所有其他视频都应暂停 我可以收集所有 id 然后循环使用 stop 但是还有其他更干净 更简单的方法吗 jwplayer video pub sto
  • Universal Analytics - 第三方支付网关

    我们的网站目前正在通过跟踪代码管理器使用 Universal Analytics 进行跟踪 我们的结账流程包括在前往感谢页面之前重定向至第三方支付网关 所以 它看起来像这样 site com checkout gt site com pay
  • 通过保留顺序,根据 id 列将 Spark DataFrame 拆分为两个 DataFrame(70% 和 30%)

    我有一个 Spark 数据框 就像 id start time feature 1 01 01 2018 3 567 1 01 02 2018 4 454 1 01 03 2018 6 455 2 01 02 2018 343 4 2 01
  • 预先计算多维线性插值的权重

    我有一个沿 D 维度的非均匀矩形网格 网格上的逻辑值 V 矩阵和查询数据点 X 矩阵 网格点的数量在不同维度上有所不同 我对同一网格 G 和查询 X 多次运行插值 但对于不同的值 V 目标是预先计算插值的索引和权重并重用它们 因为它们始终相
  • Haystack more_like_this 返回全部

    我正在使用 Django haystack solr 进行搜索 我已经能够搜索 现在我想使用 more like this 查找类似的项目 当我尝试使用 more like this 功能时 我会返回属于该模型类型的所有对象 而不仅仅是与其
  • 以编程方式创建/运行命令文件

    我正在尝试创建一个 cmd 文件来安装 msi 然后通过 C 代码执行该 cmd 文件 如果我使用 Visual Studio 中的 f5 或 control f5 运行该代码 则该代码可以完美运行 但是 一旦我将代码打包在 msi 文件中
  • 如何在 ecmascript 6 中模拟导入的模块?

    我有一个带有 mocha babel 和 node 的测试设置 旨在测试 ecmascript 6 代码 有人对如何模拟被测模块中的导入有任何建议吗 ES2015 中的导入和导出是语言本身的一部分 并且被设计为可静态分析 因此 它们无法在运
  • 嵌套 CAST 不起作用

    为什么嵌套转换在 MySQL 中不起作用 它确实使用 SQL Server select cast cast myColumn as decimal 5 2 as int from myTable SQLFiddle示例 http sqlf
  • Codeigniter ajax CSRF问题

    我制作了一个简单的自动加载功能 当您在网站上向下滚动时加载内容 但是 当我在 Codeigniter 中启用 CSRF 保护时 似乎存在一些问题 我没有使用表单 所以当我在滚动时执行发布请求时 我不知道如何将令牌从 A 发送到 B 我的 J
  • 静态库的静态成员

    我有带有静态成员的静态库 该库静态链接到主应用程序及其插件之一 看起来像在 main 应用程序 和 dll 插件 中初始化静态变量 Question 如何避免动态库加载时静态变量重新初始化 或者我可能错过了一些简单的事情 更多信息 这是一个
  • Oracle:用户数据中按循环连接

    我了解 Oracle 中何时会发生循环 从理论上讲 如果一条记录既是另一个节点的父节点 又是另一个节点的子节点 那么它就可以进入循环 但我不明白为什么我的这个特定查询会陷入循环 SELECT Empno Ename Job FROM Emp
  • Github API OAuth 令牌验证

    有什么方法可以验证我的 github API 的 OAuth 令牌吗 我所说的 令牌 是指用户登录我的网站后获得的令牌 我使用 cookie 将其存储在客户端计算机上 但仅检查是否有令牌是不够的 我需要实际检查令牌是否有效 目前 这需要我提
  • 如果我访问 UserTransaction 是否意味着我使用 2 阶段提交或 XA?

    UserTransaction ut 查找 ut beginTransaction saveToFooDB statelessEjb transactionSupportedMethod 将一些内容保存到 Foo DB saveToFooD
  • mysql2 gem 出现 Gem::LoadError,但它已经在 Gemfile 中

    Gem LoadError Specified mysql2 for database adapter but the gem is not loaded Add gem mysql2 to your Gemfile 加载以下文件时发生此错
  • 使用nodejs通过ssl使用明文密码连接到MariaDB

    我正在尝试通过 ssl 连接到 mariadb 实例 var mysql require mysql var conn mysql createConnection user user password password debug tru
  • 如何找到列表的模式?

    我有一个清单 List
  • 在 Apache Spark 中,为什么 RDD.union 不保留分区器?

    众所周知 Spark中的分区器对任何 宽 操作都会产生巨大的性能影响 因此通常在操作中进行定制 我正在尝试以下代码 val rdd1 sc parallelize 1 to 50 keyBy 10 partitionBy new HashP