我曾经认为 Spark 物理计划和 Spark Web UI SQL 选项卡中显示为 DAG 的两者应该完全相同,只是 SQL 选项卡中的 DAG 将填充实际运行时的统计信息。因此,Web UI 中 SQL 选项卡的内容甚至仅在调用操作后才可见(因为它需要这些运行时统计信息)
然而,这种情况并非如此:
端到端可运行示例:
import pandas as pd
import numpy as np
df1= pd.DataFrame(np.arange(1_000).reshape(-1,10))
df1.index = np.random.choice(range(10),size=100)
df1.to_csv("./df1.csv",index_label = "index")
############################################################################
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType, StructField
spark = SparkSession.builder.config("spark.sql.autoBroadcastJoinThreshold","-1").\
config("spark.sql.adaptive.enabled","false").getOrCreate()
schema = StructType([StructField('index', StringType(), True),
StructField('0', StringType(), True),
StructField('1', StringType(), True),
StructField('2', StringType(), True),
StructField('3', StringType(), True),
StructField('4', StringType(), True),
StructField('5', StringType(), True),
StructField('6', StringType(), True),
StructField('7', StringType(), True),
StructField('8', StringType(), True),
StructField('9', StringType(), True)])
df1 = spark.read.csv("./df1.csv", header=True, schema = schema)
df2 = df1.select("index","0","1")
df3 = df1.select("index","2","3")
df4 = df1.join(df2,on='index').join(df3,on="index")
df4.explain(mode="formatted")
df4.count()
输出(物理计划):
== Physical Plan ==
* Project (16)
+- * SortMergeJoin Inner (15)
:- * Project (10)
: +- * SortMergeJoin Inner (9)
: :- * Sort (4)
: : +- Exchange (3)
: : +- * Filter (2)
: : +- Scan csv (1)
: +- * Sort (8)
: +- Exchange (7)
: +- * Filter (6)
: +- Scan csv (5)
+- * Sort (14)
+- Exchange (13)
+- * Filter (12)
+- Scan csv (11)
上面的物理计划中显示了 3 次单独的数据扫描。
然而,the SQLWeb UI 中的选项卡如下所示,仅扫描一次数据:
Spark 物理计划与 Spark Web UI SQL 选项卡中显示为 DAG 的计划有何区别?