我将接近print
首先是函数问题,因为它是理解 Spark 的基础。然后limit
vs sample
. Then repartition
vs coalesce
.
其原因有print
函数以这种方式花费这么长时间是因为coalesce
是一个惰性转换。 Spark 中的大多数转换都是惰性的,直到action被叫。
行动是做事的事情并且(大部分)dont返回一个新的数据帧作为结果。喜欢count
, show
。它们返回一个数字和一些数据,而coalesce
返回具有 1 个分区的数据帧(有点,见下文)。
发生的情况是您正在重新运行 sql 查询并且coalesce
每次调用操作时调用tiny
数据框。这就是为什么他们每次调用都使用 25k 映射器。
为了节省时间,请添加.cache()
方法到第一行(对于你的print
无论如何代码)。
然后,数据帧转换实际上在第一行执行,结果保留在 Spark 节点的内存中。
这不会对第一行的初始查询时间产生任何影响,但至少您不会再运行该查询两次,因为结果已被缓存,然后操作可以使用该缓存的结果。
要将其从内存中删除,请使用.unpersist()
方法。
现在对于您尝试执行的实际查询...
这实际上取决于数据的分区方式。例如,它是否按特定字段等进行分区...
你在问题中提到了这一点,但是sample
可能是正确的方法。
为什么是这样?
limit
必须搜索 500 个first行。除非您的数据按行号(或某种递增 id)进行分区,否则前 500 行可以存储在 25k 分区中的任何一个中。
因此 Spark 必须搜索所有这些值,直到找到所有正确的值。不仅如此,它还必须执行一个额外的步骤,对数据进行排序以获得正确的顺序。
sample
只获取 500 个随机值。这样做更容易,因为所涉及的数据没有顺序/排序,并且不必在特定分区中搜索特定行。
While limit
可以更快,但它也有它的,呃,局限性。我通常只将它用于非常小的子集,例如 10/20 行。
现在进行分区....
我认为的问题coalesce
is it 几乎更改分区。现在我对此不确定,所以有点盐。
根据pyspark
docs:
此操作会导致狭窄的依赖性,例如如果从 1000 个分区增加到 100 个分区,则不会出现随机播放,而是 100 个新分区中的每一个都会占用当前分区中的 10 个。
因此,您的 500 行实际上仍然位于 25k 个物理分区中,这些分区被 Spark 视为 1 个虚拟分区。
引起洗牌(通常是坏的)并保留在火花内存中.repartition(1).cache()
这里可能是个好主意。因为当您write
,它应该只会导致 1 个映射器查看 Spark 内存中的内容。然后write
变得容易。您还要处理一小部分,因此任何洗牌都应该(希望)是可控的。
显然,这通常是不好的做法,并且不会改变 Spark 在执行原始 sql 查询时可能需要运行 25k 个映射器的事实。希望sample
照顾这个。
编辑以澄清洗牌,repartition
and coalesce
您在 4 节点集群的 16 个分区中有 2 个数据集。您想要将它们加入并写入 16 个分区中的新数据集。
数据 1 的第 1 行可能位于节点 1 上,数据 2 的第 1 行可能位于节点 4 上。
为了将这些行连接在一起,spark 必须身体上的移动其中一个或两个,然后写入新分区。
这是一种洗牌,在集群中物理移动数据。
所有内容都按 16 分区并不重要,重要的是数据位于集群中的位置。
data.repartition(4)
会将数据从每个节点的每 4 组分区物理移动到每个节点的 1 个分区中。
Spark 可能会将所有 4 个分区从节点 1 移动到其他 3 个节点,在这些节点上的一个新的单个分区中,反之亦然。
我不认为它会这样做,但这是一个证明这一点的极端案例。
A coalesce(4)
调用虽然不移动数据,但它更聪明。相反,它会识别“我已经每个节点有 4 个分区,总共有 4 个节点……我只是将每个节点的所有 4 个分区称为单个分区,然后我总共就有 4 个分区!”
因此它不需要移动任何数据,因为它只是将现有分区组合成一个连接分区。