我有兴趣了解 Spark 在从本地文件系统加载文件时如何创建分区。
我正在使用 Databricks 社区版来学习 Spark。当我使用 sc.textfile 命令加载一个大小只有几千字节(大约 300 kb)的文件时,spark 默认情况下会创建 2 个分区(如partitions.length 给出的)。当我加载大约 500 MB 的文件时,它会创建 8 个分区(等于机器中的核心数量)。
在此输入图像描述 https://i.stack.imgur.com/7Lvwi.png
这里的逻辑是什么?
另外,我从文档中了解到,如果我们从本地文件系统加载并使用集群,则该文件必须位于属于该集群的所有计算机上的同一位置。这不会创建重复项吗? Spark 如何处理这种场景?如果你能指出一些文章来阐明这一点,那将会有很大的帮助。
Thanks!
当 Spark 读取时本地文件系统默认分区数(由defaultParallelism标识)是所有可用核心的数量.
sc.textFile 将分区数计算为 defaultParallelism(本地 FS 情况下的可用核心数)和 2 之间的最小值。
def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
参考自:火花代码 https://github.com/apache/spark/blob/e9f983df275c138626af35fd263a7abedf69297f/core/src/main/scala/org/apache/spark/SparkContext.scala#L2329
第一种情况:文件大小 - 300KB
由于文件大小非常小,因此分区数计算为 2。
第二种情况:文件大小 - 500MB
分区数等于默认并行度。在你的例子中,它是 8。
从 HDFS 读取时,sc.textFile 将采用 minPartitions 和基于 hadoop 输入分割大小除以块大小计算得出的分割数之间的最大值。
但是,当将 textFile 与压缩文件(file.txt.gz 而不是 file.txt 或类似文件)一起使用时,Spark 会禁用拆分,从而导致 RDD 仅具有 1 个分区(因为对 gzip 压缩文件的读取无法并行化)。
对于有关从集群中的本地路径读取数据的第二个查询:
文件需要在集群中的所有机器上可用,因为 Spark 可能会在集群中的机器上启动执行器,并且执行器将使用 (file://) 读取文件。
为了避免将文件复制到所有机器,如果您的数据已经位于 NFS、AFS 和 MapR 的 NFS 层等网络文件系统之一中,那么您只需指定 file:// 路径即可将其用作输入;只要文件系统安装在每个节点上的相同路径上,Spark 就会处理它。每个节点都需要有相同的路径。
请参阅:https://community.hortonworks.com/questions/38482/loading-local-file-to-apache-spark.html https://community.hortonworks.com/questions/38482/loading-local-file-to-apache-spark.html
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)