为什么火花计数动作分三个阶段执行

2024-04-01

我已经加载了一个 csv 文件。将其重新分区为 4,然后对 DataFrame 进行计数。当我查看 DAG 时,我发现此操作分 3 个阶段执行。

为什么这个简单的动作要分三个阶段执行。我想第一阶段是加载文件,第二阶段是查找每个分区的计数。

那么第三阶段发生了什么?

这是我的代码

val sample = spark.read.format("csv").option("header", "true").option("inferSchema", "true").option("delimiter", ";").load("sample_data.csv")

sample.repartition(4).count()

  1. 第一阶段=读取文件。由于重新分区(因为它是需要洗牌的广泛转换),它无法通过partial_count(第二阶段)加入到单个阶段中

  2. 第二阶段=本地计数(计算每个分区的计数)

  3. 第三阶段 = 驱动程序上的结果聚合。

Spark 为每个操作或广泛的转换生成单独的阶段。要了解有关窄/宽转换的更多详细信息以及为什么宽转换需要单独的阶段,请查看“宽依赖与窄依赖、高性能 Spark、Holden Karau” https://learning.oreilly.com/library/view/high-performance-spark/9781491943199/ch02.html#narrow_wide_intro or 本文 https://blog.cloudera.com/how-to-tune-your-apache-spark-jobs-part-1/.

让我们在本地测试这个假设。首先您需要创建一个数据集:

数据集/测试数据.json

[
  { "key":  1, "value":  "a" },
  { "key":  2, "value":  "b" },
  { "key":  3, "value":  "c" },
  { "key":  4, "value":  "d" },
  { "key":  5, "value":  "e" },
  { "key":  6, "value":  "f" },
  { "key":  7, "value":  "g" },
  { "key":  8, "value":  "h" }
]

然后运行以下代码:

    StructType schema = new StructType()
            .add("key", DataTypes.IntegerType)
            .add("value", DataTypes.StringType);

    SparkSession session = SparkSession.builder()
            .appName("sandbox")
            .master("local[*]")
            .getOrCreate();

    session
            .read()
            .schema(schema)
            .json("file:///C:/<you_path>/dataset")
            .repartition(4) // comment on the second run
            .registerTempTable("df");

    session.sqlContext().sql("SELECT COUNT(*) FROM df").explain();

输出将是:

== Physical Plan ==
*(3) HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
   +- *(2) HashAggregate(keys=[], functions=[partial_count(1)])
      +- Exchange RoundRobinPartitioning(4)
         +- *(1) FileScan json [] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/C:/Users/iaroslav/IdeaProjects/sparksandbox/src/main/resources/dataset], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>

但如果您注释/删除 .repartition(4) 字符串,请注意 TableScan 和partial_count 是在单个阶段内完成的,输出将如下所示:

== Physical Plan ==
*(2) HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
   +- *(1) HashAggregate(keys=[], functions=[partial_count(1)])
      +- *(1) FileScan json [] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/C:/Users/iaroslav/IdeaProjects/sparksandbox/src/main/resources/dataset], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>

附:请注意,额外的阶段可能会对性能产生重大影响,因为它需要磁盘 I/O(看看here https://stackoverflow.com/questions/58699907/spark-disk-i-o-on-stage-boundaries)并且是某种影响并行化的同步障碍,意味着在大多数情况下 Spark 在第 1 阶段完成之前不会启动第 2 阶段。还是如果repartition提高并行度可能是值得的。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

为什么火花计数动作分三个阶段执行 的相关文章

随机推荐