将 Spark 数据帧写入单个 Parquet 文件

2024-01-11

我想做一些非常简单的事情,但我遇到了一些非常愚蠢的挣扎。我认为这一定与对 Spark 所做的事情的根本误解有关。我将非常感谢任何帮助或解释。

我有一个非常大的表(~3 TB,~300MM 行,25k 分区),在 s3 中保存为 parquet,我想将其作为单个 parquet 文件的小样本提供给某人。不幸的是,这需要很长时间才能完成,我不明白为什么。我已经尝试过以下方法:

tiny = spark.sql("SELECT * FROM db.big_table LIMIT 500")
tiny.coalesce(1).write.saveAsTable("db.tiny_table")

然后当那不起作用时我尝试了这个,我thought应该是一样的,但我不确定。 (我添加了print正在努力调试。)

tiny = spark.table("db.big_table").limit(500).coalesce(1)
print(tiny.count())
print(tiny.show(10))
tiny.write.saveAsTable("db.tiny_table")

当我观看 Yarn UI 时,both打印报表and the write使用 25k 映射器。这count花了3分钟,show花了 25 分钟,write花了大约 40 分钟,虽然最后did写出我正在寻找的单个文件表。

在我看来,第一行应该获取前 500 行并将它们合并到单个分区,然后其他行应该非常快地发生(在单个映射器/减速器上)。有人能看到我在这里做错了什么吗?有人告诉我也许我应该使用sample代替limit但据我了解limit应该快得多。是对的吗?

预先感谢您的任何想法!


我将接近print首先是函数问题,因为它是理解 Spark 的基础。然后limit vs sample. Then repartition vs coalesce.

其原因有print函数以这种方式花费这么长时间是因为coalesce是一个惰性转换。 Spark 中的大多数转换都是惰性的,直到action被叫。

行动是做事的事情并且(大部分)dont返回一个新的数据帧作为结果。喜欢count, show。它们返回一个数字和一些数据,而coalesce返回具有 1 个分区的数据帧(有点,见下文)。

发生的情况是您正在重新运行 sql 查询并且coalesce每次调用操作时调用tiny数据框。这就是为什么他们每次调用都使用 25k 映射器。

为了节省时间,请添加.cache()方法到第一行(对于你的print无论如何代码)。

然后,数据帧转换实际上在第一行执行,结果保留在 Spark 节点的内存中。

这不会对第一行的初始查询时间产生任何影响,但至少您不会再运行该查询两次,因为结果已被缓存,然后操作可以使用该缓存的结果。

要将其从内存中删除,请使用.unpersist()方法。

现在对于您尝试执行的实际查询...

这实际上取决于数据的分区方式。例如,它是否按特定字段等进行分区...

你在问题中提到了这一点,但是sample可能是正确的方法。

为什么是这样?

limit必须搜索 500 个first行。除非您的数据按行号(或某种递增 id)进行分区,否则前 500 行可以存储在 25k 分区中的任何一个中。

因此 Spark 必须搜索所有这些值,直到找到所有正确的值。不仅如此,它还必须执行一个额外的步骤,对数据进行排序以获得正确的顺序。

sample只获取 500 个随机值。这样做更容易,因为所涉及的数据没有顺序/排序,并且不必在特定分区中搜索特定行。

While limit可以更快,但它也有它的,呃,局限性。我通常只将它用于非常小的子集,例如 10/20 行。

现在进行分区....

我认为的问题coalesce is it 几乎更改分区。现在我对此不确定,所以有点盐。

根据pyspark docs:

此操作会导致狭窄的依赖性,例如如果从 1000 个分区增加到 100 个分区,则不会出现随机播放,而是 100 个新分区中的每一个都会占用当前分区中的 10 个。

因此,您的 500 行实际上仍然位于 25k 个物理分区中,这些分区被 Spark 视为 1 个虚拟分区。

引起洗牌(通常是坏的)并保留在火花内存中.repartition(1).cache()这里可能是个好主意。因为当您write,它应该只会导致 1 个映射器查看 Spark 内存中的内容。然后write变得容易。您还要处理一小部分,因此任何洗牌都应该(希望)是可控的。

显然,这通常是不好的做法,并且不会改变 Spark 在执行原始 sql 查询时可能需要运行 25k 个映射器的事实。希望sample照顾这个。

编辑以澄清洗牌,repartition and coalesce

您在 4 节点集群的 16 个分区中有 2 个数据集。您想要将它们加入并写入 16 个分区中的新数据集。

数据 1 的第 1 行可能位于节点 1 上,数据 2 的第 1 行可能位于节点 4 上。

为了将这些行连接在一起,spark 必须身体上的移动其中一个或两个,然后写入新分区。

这是一种洗牌,在集群中物理移动数据。

所有内容都按 16 分区并不重要,重要的是数据位于集群中的位置。

data.repartition(4)会将数据从每个节点的每 4 组分区物理移动到每个节点的 1 个分区中。

Spark 可能会将所有 4 个分区从节点 1 移动到其他 3 个节点,在这些节点上的一个新的单个分区中,反之亦然。

我不认为它会这样做,但这是一个证明这一点的极端案例。

A coalesce(4)调用虽然不移动数据,但它更聪明。相反,它会识别“我已经每个节点有 4 个分区,总共有 4 个节点……我只是将每个节点的所有 4 个分区称为单个分区,然后我总共就有 4 个分区!”

因此它不需要移动任何数据,因为它只是将现有分区组合成一个连接分区。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

将 Spark 数据帧写入单个 Parquet 文件 的相关文章

随机推荐