有没有办法手动设置RDD分区的preferredLocations?
是的,有,但它是 RDD 特定的,因此不同类型的 RDD 有不同的方法来做到这一点。
火花用途RDD.preferredLocations
获取计算每个分区/分割的首选位置列表(例如 HDFS 文件的块位置)。
最终 def PreferredLocations(split: Partition): Seq[String]
获取分区的首选位置,同时考虑 RDD 是否设置了检查点。
正如你所看到的方法是final
这意味着没有人可以超越它。
当你看着源代码 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala?utf8=%E2%9C%93#L273-L277 of RDD.preferredLocations
您将看到 RDD 如何知道其首选位置。它正在使用受保护的RDD.getPreferredLocations https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala?utf8=%E2%9C%93#L137自定义 RDD 可以(但不必)重写以指定放置首选项的方法。
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
因此,现在的问题已经“演变”为另一个问题,即哪些 RDD 允许设置其首选位置。找到你的并查看源代码。
我使用数组和“Parallelize”方法从中创建 RDD。
If you parallelize
你的本地数据集不再是分布式的,而且可以是这样的,但是......为什么你想使用 Spark 来处理可以在单个计算机/节点上本地处理的东西?
然而,如果您坚持并且确实想将 Spark 用于本地数据集,那么背后的 RDDSparkContext.parallelize
是...让我们看一下源代码...并行集合RDD https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala?utf8=%E2%9C%93#L715 which 确实允许位置偏好 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala#L89.
然后让我们将您的问题改写为以下内容(希望我不会丢失任何重要事实):
允许创建的运算符有哪些ParallelCollectionRDD
并明确指定位置偏好?
令我惊讶的是(因为我不知道该功能),有这样一个运算符,即SparkContext.makeRDD https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala?utf8=%E2%9C%93#L804-L814,...接受每个对象的一个或多个位置首选项(Spark 节点的主机名)。
makeRDD[T](seq: Seq[(T, Seq[String])]): RDD[T]分发本地 Scala 集合以形成 RDD,每个对象具有一个或多个位置首选项(Spark 节点的主机名)。为每个集合项创建一个新分区。
换句话说,而不是使用parallelise
你必须使用makeRDD
(Scala 的 Spark Core API 中提供了该功能,但我不确定我将作为家庭练习留给您的 Python :))
我将同样的推理应用于创建某种 RDD 的任何其他 RDD 运算符/转换。