Spark:如何将 RDD[T]` 拆分为 Seq[RDD[T]] 并保留顺序

2024-02-01

如何有效拆分RDD[T] into a Seq[RDD[T]] / Iterable[RDD[T]] with n元素并保留原始顺序?

我希望能够写出这样的东西

RDD(1, 2, 3, 4, 5, 6, 7, 8, 9).split(3)

这应该会导致类似的结果

Seq(RDD(1, 2, 3), RDD(4, 5, 6), RDD(7, 8, 9))

Spark有提供这样的功能吗?如果不是,什么是实现这一目标的高效方法?

val parts = rdd.length / n
val rdds = rdd.zipWithIndex().map{ case (t, i) => (i - (i % parts), t)}.groupByKey().values.map(iter => sc.parallelize(iter.toSeq)).collect

看起来不是很快。。


从技术上讲,您可以按照您的建议进行操作。然而,在利用计算集群来执行大数据的分布式处理的背景下,它确实没有意义。它首先违背了 Spark 的整个观点。如果您执行 groupByKey 然后尝试将它们提取到单独的 RDD 中,那么您实际上是将 RDD 中分布的所有数据拉到驱动程序上,然后将每个数据重新分布回集群。如果驱动程序无法加载整个数据文件,它也将无法执行此操作。

您不应将大型数据文件从本地文件系统加载到驱动程序节点上。您应该将文件移动到 HDFS 或 S3 等分布式文件系统上。然后,您可以通过以下方式将单个大数据文件加载到集群上:val lines = SparkContext.textFile(...)成一个 RDD 行。当您执行此操作时,集群中的每个工作线程将仅加载文件的一部分,这是可以完成的,因为数据已经分布在分布式文件系统中的集群中。

如果您随后需要将数据组织成对数据的功能处理很重要的“批次”,则可以使用适当的批次标识符来键入数据,例如:val batches = lines.keyBy( line => lineBatchID(line) )

然后,每个批次可以缩减为批次级摘要,并且这些摘要可以缩减为单个总体结果。

为了测试 Spark 代码,可以加载small将数据文件样本保存到一台机器上。但是,当涉及完整数据集时,您应该利用分布式文件系统与 Spark 集群结合来处理这些数据。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark:如何将 RDD[T]` 拆分为 Seq[RDD[T]] 并保留顺序 的相关文章

  • 如何初始化子类型中特征的值?

    如果我写 trait T val t 3 val u 1 t Nil class U extends T override val t 2 new U u 它表明了这一点 List 1 0 我应该如何更改上面的代码以使其显示以下内容 Lis
  • 如何设置 jacoco4sbt 来处理 Play 中主模块和子模块中的类?

    我有一些问题要解决雅可可4sbt https github com sbt jacoco4sbt正在使用我的 Play 2 3 4 项目 我的项目由 3 个子模块组成 common api and frontend并且没有代码app根文件夹
  • 创建自定义 scala 集合,其中映射默认返回自定义集合?

    特质TraversableLike A Repr 允许人们在其中进行收藏some函数将返回一个Repr 而其他人则继续返回类型参数That在功能上 有没有办法定义一个CustomCollection A 其中函数如map 其他的默认That
  • 如何从java程序的main方法调用Scala程序的main方法?

    假设我在 Java 项目中有一个 Scala 类和一个 Java 类 scala 类如下所示 class Sam def main args Array String Unit println Hello 如何从同一项目中存在的 java
  • 错误:无法在 scala 中找到或加载主类

    安装 eclipse scala 插件和 eclipse maven scala 插件后 我是 scala 新手 所以我尝试确保在测试 scala hello world 项目后环境正常工作 它按预期工作 但我在尝试执行我从公司存储库中签出
  • Apache Spark 和 scikit_learn 之间的 KMeans 结果不一致

    我正在使用 PySpark 对数据集执行聚类 为了找到簇的数量 我对一系列值 2 20 进行了聚类 并找到了wsse 簇内平方和 每个值的值k 在这里我发现了一些不寻常的东西 根据我的理解 当你增加集群数量时 wsse单调递减 但我得到的结
  • Apache Spark 何时发生混洗?

    我正在优化 Spark 中的参数 并且想确切地了解 Spark 是如何对数据进行洗牌的 准确地说 我有一个简单的字数统计程序 并且想知道spark shuffle file buffer kb如何影响运行时间 现在 当我将此参数设置得非常高
  • 将 Apache Zeppelin 连接到 Hive

    我尝试将我的 apache zeppelin 与我的 hive 元存储连接起来 我使用 zeppelin 0 7 3 所以没有 hive 解释器 只有 jdbc 我已将 hive site xml 复制到 zeppelin conf 文件夹
  • 为什么我不需要在 Databricks 中创建 SparkSession?

    为什么我不需要在 Databricks 中创建 SparkSession 集群设置的时候会自动创建一个SparkSession吗 还是其他人帮我做的 这仅在笔记本中完成 以简化用户的工作并避免他们指定不同的参数 其中许多参数不会产生任何效果
  • 如何在 Spark 数据帧 groupBy 中执行 count(*)

    我的目的是做相当于基本sql的事情 select shipgrp shipstatus count cnt from shipstatus group by shipgrp shipstatus 我见过的 Spark 数据帧的示例包括其他列
  • 使用 pyspark awsglue 时显示 DataFrame

    如何使用 awsglue 的 job etl 显示 DataFrame 我尝试了下面的代码 但没有显示任何内容 df show code datasource0 glueContext create dynamic frame from c
  • xsbt 插件 1.0.0-M7 和 scalatra

    我尝试在我的 scalatra 项目中将 xsbt 插件升级到 1 0 0 M7 但 scalatra 似乎与此版本不兼容 当我尝试重新加载项目时 出现以下错误 我尝试过 scalatra 2 3 0 版本 问候 德斯 java lang
  • 承诺的反面是什么?

    承诺代表将来可能可用 或无法实现 的值 我正在寻找的是一种数据类型 它表示将来可能变得不可用的可用值 可能是由于错误 Promise a b TransitionFromTo
  • Scala 隐式转换范围问题

    采取这个代码 class Register var value Int 0 def getZeroFlag Boolean value 0x80 0 object Register implicit def reg2int r Regist
  • 为什么 Spark 退出并显示 exitCode: 16?

    我将 Spark 2 0 0 与 Hadoop 2 7 一起使用 并使用纱线集群模式 每次 我都会收到以下错误 17 01 04 11 18 04 INFO spark SparkContext Successfully stopped S
  • 在 Scala 中反转地图的优雅方法

    目前正在学习Scala 需要反转Map 来进行一些反转值 gt 键查找 我一直在寻找一种简单的方法来做到这一点 但只想到了 Map origMap map kvp gt kvp 2 gt kvp 1 有人有更优雅的方法吗 假设值是唯一的 则
  • Map 和 Set 的实际类(不是抽象类,也不是特征类)是什么?

    在 Scala 中 映射和集合文字可以通过以下方式创建 val m Map 1 gt a 以及引用的类型m字面意思都是Map Int String 然而 scala文档表明Map实际上是一个特征 具有需要实现才能实例化的抽象成员 scala
  • Spark scala:大量列上的简单 UDF 会导致性能下降

    我有一个包含 1 亿行和约 10 000 列的数据框 这些列有两种类型 标准 C i 和动态 X i 这个dataframe是经过一些处理后得到的 性能很快 现在只剩下2步了 Goal 需要使用 C i 列的相同子集对每个 X i 执行特定
  • 如何在scala中生成n-gram?

    我正在尝试在 scala 中编写基于 n gram 的分离新闻算法 如何为大文件生成 n gram 例如 对于包含 蜜蜂是蜜蜂中的蜜蜂 的文件 首先它必须选择一个随机的 n 元语法 例如 蜜蜂 然后它必须寻找以 n 1 个单词开头的 n 元
  • Spark Dataframe 中的分析

    在这个问题中 我们有两个经理 M1 和 M2 在经理 M1 的团队中有两个员工 e1 和 e2 在 M2 的团队中有两个员工 e4 和 e5 以下是经理和员工的层次结构 1 M1 a e1 b e2 2 M2 a e4 b e5 我们有以下

随机推荐