为什么我的构建挂起/需要很长时间才能生成包含许多联合的查询计划?

2023-12-21

我注意到当我运行与示例相同的代码时here https://stackoverflow.com/questions/68474926/why-do-i-see-repeated-materializations-of-a-dataframe-in-my-build但有一个union or unionByName or unionAll而不是join,我的查询计划需要显著地更长,并可能导致驱动程序 OOM。

此处包含的代码仅供参考,与内部发生的内容略有不同for() loop.

from pyspark.sql import types as T, functions as F, SparkSession
spark = SparkSession.builder.getOrCreate()

schema = T.StructType([
  T.StructField("col_1", T.IntegerType(), False),
  T.StructField("col_2", T.IntegerType(), False),
  T.StructField("measure_1", T.FloatType(), False),
  T.StructField("measure_2", T.FloatType(), False),
])
data = [
  {"col_1": 1, "col_2": 2, "measure_1": 0.5, "measure_2": 1.5},
  {"col_1": 2, "col_2": 3, "measure_1": 2.5, "measure_2": 3.5}
]

df = spark.createDataFrame(data, schema)

right_schema = T.StructType([
  T.StructField("col_1", T.IntegerType(), False)
])
right_data = [
  {"col_1": 1},
  {"col_1": 1},
  {"col_1": 2},
  {"col_1": 2}
]
right_df = spark.createDataFrame(right_data, right_schema)

df = df.unionByName(df)
df = df.join(right_df, on="col_1")
df.show()

"""
+-----+-----+---------+---------+
|col_1|col_2|measure_1|measure_2|
+-----+-----+---------+---------+
|    1|    2|      0.5|      1.5|
|    1|    2|      0.5|      1.5|
|    1|    2|      0.5|      1.5|
|    1|    2|      0.5|      1.5|
|    2|    3|      2.5|      3.5|
|    2|    3|      2.5|      3.5|
|    2|    3|      2.5|      3.5|
|    2|    3|      2.5|      3.5|
+-----+-----+---------+---------+
"""

df.explain()

"""
== Physical Plan ==
*(6) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803]
+- *(6) SortMergeJoin [col_1#1800], [col_1#1808], Inner
   :- *(3) Sort [col_1#1800 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#5454]
   :     +- Union
   :        :- *(1) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
   :        +- *(2) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
   +- *(5) Sort [col_1#1808 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#5460]
         +- *(4) Scan ExistingRDD[col_1#1808]
"""

filter_union_cols = ["col_1", "measure_1", "col_2", "measure_2"]
df = df.withColumn("found_filter", F.lit(None))
for filter_col in filter_union_cols:
  stats = df.filter(F.col(filter_col) < F.lit(1)).drop("found_filter")
  df = df.unionByName(
    stats.select(
      "*",
      F.lit(filter_col).alias("found_filter")
    )
  )

df.show()

"""
+-----+-----+---------+---------+------------+                                  
|col_1|col_2|measure_1|measure_2|found_filter|
+-----+-----+---------+---------+------------+
|    1|    2|      0.5|      1.5|        null|
|    1|    2|      0.5|      1.5|        null|
|    1|    2|      0.5|      1.5|        null|
|    1|    2|      0.5|      1.5|        null|
|    2|    3|      2.5|      3.5|        null|
|    2|    3|      2.5|      3.5|        null|
|    2|    3|      2.5|      3.5|        null|
|    2|    3|      2.5|      3.5|        null|
|    1|    2|      0.5|      1.5|   measure_1|
|    1|    2|      0.5|      1.5|   measure_1|
|    1|    2|      0.5|      1.5|   measure_1|
|    1|    2|      0.5|      1.5|   measure_1|
+-----+-----+---------+---------+------------+
"""

df.explain()

# REALLY long query plan.....

"""
== Physical Plan ==
Union
:- *(6) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, null AS found_filter#1855]
:  +- *(6) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(3) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7637]
:     :     +- Union
:     :        :- *(1) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(2) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(5) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
:           +- *(4) Scan ExistingRDD[col_1#1808]
:- *(12) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, col_1 AS found_filter#1860]
:  +- *(12) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(9) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7654]
:     :     +- Union
:     :        :- *(7) Filter (col_1#1800 < 1)
:     :        :  +- *(7) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(8) Filter (col_1#1800 < 1)
:     :           +- *(8) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(11) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
:           +- *(10) Filter (col_1#1808 < 1)
:              +- *(10) Scan ExistingRDD[col_1#1808]
:- *(18) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_1 AS found_filter#1880]
:  +- *(18) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(15) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7671]
:     :     +- Union
:     :        :- *(13) Filter (measure_1#1802 < 1.0)
:     :        :  +- *(13) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(14) Filter (measure_1#1802 < 1.0)
:     :           +- *(14) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(17) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
:- *(24) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_1 AS found_filter#2022]
:  +- *(24) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(21) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7688]
:     :     +- Union
:     :        :- *(19) Filter ((col_1#1800 < 1) AND (measure_1#1802 < 1.0))
:     :        :  +- *(19) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(20) Filter ((col_1#1800 < 1) AND (measure_1#1802 < 1.0))
:     :           +- *(20) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(23) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
:- *(30) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, col_2 AS found_filter#1900]
:  +- *(30) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(27) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7705]
:     :     +- Union
:     :        :- *(25) Filter (col_2#1801 < 1)
:     :        :  +- *(25) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(26) Filter (col_2#1801 < 1)
:     :           +- *(26) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(29) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
:- *(36) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, col_2 AS found_filter#2023]
:  +- *(36) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(33) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7722]
:     :     +- Union
:     :        :- *(31) Filter ((col_1#1800 < 1) AND (col_2#1801 < 1))
:     :        :  +- *(31) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(32) Filter ((col_1#1800 < 1) AND (col_2#1801 < 1))
:     :           +- *(32) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(35) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
:- *(42) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, col_2 AS found_filter#2024]
:  +- *(42) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(39) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7739]
:     :     +- Union
:     :        :- *(37) Filter ((measure_1#1802 < 1.0) AND (col_2#1801 < 1))
:     :        :  +- *(37) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(38) Filter ((measure_1#1802 < 1.0) AND (col_2#1801 < 1))
:     :           +- *(38) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(41) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
:- *(48) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, col_2 AS found_filter#2028]
:  +- *(48) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(45) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7756]
:     :     +- Union
:     :        :- *(43) Filter (((col_1#1800 < 1) AND (measure_1#1802 < 1.0)) AND (col_2#1801 < 1))
:     :        :  +- *(43) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(44) Filter (((col_1#1800 < 1) AND (measure_1#1802 < 1.0)) AND (col_2#1801 < 1))
:     :           +- *(44) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(47) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
:- *(54) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#1920]
:  +- *(54) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(51) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7773]
:     :     +- Union
:     :        :- *(49) Filter (measure_2#1803 < 1.0)
:     :        :  +- *(49) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(50) Filter (measure_2#1803 < 1.0)
:     :           +- *(50) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(53) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
:- *(60) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2025]
:  +- *(60) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(57) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7790]
:     :     +- Union
:     :        :- *(55) Filter ((col_1#1800 < 1) AND (measure_2#1803 < 1.0))
:     :        :  +- *(55) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(56) Filter ((col_1#1800 < 1) AND (measure_2#1803 < 1.0))
:     :           +- *(56) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(59) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
:- *(66) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2026]
:  +- *(66) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(63) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7807]
:     :     +- Union
:     :        :- *(61) Filter ((measure_1#1802 < 1.0) AND (measure_2#1803 < 1.0))
:     :        :  +- *(61) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(62) Filter ((measure_1#1802 < 1.0) AND (measure_2#1803 < 1.0))
:     :           +- *(62) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(65) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
:- *(72) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2029]
:  +- *(72) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(69) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7824]
:     :     +- Union
:     :        :- *(67) Filter (((col_1#1800 < 1) AND (measure_1#1802 < 1.0)) AND (measure_2#1803 < 1.0))
:     :        :  +- *(67) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(68) Filter (((col_1#1800 < 1) AND (measure_1#1802 < 1.0)) AND (measure_2#1803 < 1.0))
:     :           +- *(68) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(71) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
:- *(78) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2027]
:  +- *(78) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(75) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7841]
:     :     +- Union
:     :        :- *(73) Filter ((col_2#1801 < 1) AND (measure_2#1803 < 1.0))
:     :        :  +- *(73) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(74) Filter ((col_2#1801 < 1) AND (measure_2#1803 < 1.0))
:     :           +- *(74) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(77) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
:- *(84) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2030]
:  +- *(84) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(81) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7858]
:     :     +- Union
:     :        :- *(79) Filter (((col_1#1800 < 1) AND (col_2#1801 < 1)) AND (measure_2#1803 < 1.0))
:     :        :  +- *(79) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(80) Filter (((col_1#1800 < 1) AND (col_2#1801 < 1)) AND (measure_2#1803 < 1.0))
:     :           +- *(80) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(83) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
:- *(90) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2031]
:  +- *(90) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(87) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7875]
:     :     +- Union
:     :        :- *(85) Filter (((measure_1#1802 < 1.0) AND (col_2#1801 < 1)) AND (measure_2#1803 < 1.0))
:     :        :  +- *(85) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(86) Filter (((measure_1#1802 < 1.0) AND (col_2#1801 < 1)) AND (measure_2#1803 < 1.0))
:     :           +- *(86) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(89) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
+- *(96) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2032]
   +- *(96) SortMergeJoin [col_1#1800], [col_1#1808], Inner
      :- *(93) Sort [col_1#1800 ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7892]
      :     +- Union
      :        :- *(91) Filter ((((col_1#1800 < 1) AND (measure_1#1802 < 1.0)) AND (col_2#1801 < 1)) AND (measure_2#1803 < 1.0))
      :        :  +- *(91) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
      :        +- *(92) Filter ((((col_1#1800 < 1) AND (measure_1#1802 < 1.0)) AND (col_2#1801 < 1)) AND (measure_2#1803 < 1.0))
      :           +- *(92) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
      +- *(95) Sort [col_1#1808 ASC NULLS FIRST], false, 0
         +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
"""

我在这里看到一个明显更长的查询计划,特别是随着for()循环增加,性能急剧下降。

我怎样才能提高我的表现?


这是 Spark 中迭代算法的一个已知限制。目前,循环的每次迭代都会导致内部节点重新评估并堆叠在外部节点上df多变的。

这意味着您的查询规划过程正在进行O(exp(n))其中 n 是循环的迭代次数。

Palantir Foundry 中有一个名为 Transforms Verbs 的工具可以帮助解决这个问题。

只需导入即可transforms.verbs.dataframes.union_many并根据您希望实现的数据帧的总集调用它(假设您的逻辑允许这样做,即循环的一次迭代不依赖于循环的前一迭代的结果。

上面的代码应该修改为:

from pyspark.sql import types as T, functions as F, SparkSession
from transforms.verbs.dataframes import union_many

spark = SparkSession.builder.getOrCreate()

schema = T.StructType([
  T.StructField("col_1", T.IntegerType(), False),
  T.StructField("col_2", T.IntegerType(), False),
  T.StructField("measure_1", T.FloatType(), False),
  T.StructField("measure_2", T.FloatType(), False),
])
data = [
  {"col_1": 1, "col_2": 2, "measure_1": 0.5, "measure_2": 1.5},
  {"col_1": 2, "col_2": 3, "measure_1": 2.5, "measure_2": 3.5}
]

df = spark.createDataFrame(data, schema)

right_schema = T.StructType([
  T.StructField("col_1", T.IntegerType(), False)
])
right_data = [
  {"col_1": 1},
  {"col_1": 1},
  {"col_1": 2},
  {"col_1": 2}
]
right_df = spark.createDataFrame(right_data, right_schema)

df = df.unionByName(df)
df = df.join(right_df, on="col_1")
df.show()

"""
+-----+-----+---------+---------+
|col_1|col_2|measure_1|measure_2|
+-----+-----+---------+---------+
|    1|    2|      0.5|      1.5|
|    1|    2|      0.5|      1.5|
|    1|    2|      0.5|      1.5|
|    1|    2|      0.5|      1.5|
|    2|    3|      2.5|      3.5|
|    2|    3|      2.5|      3.5|
|    2|    3|      2.5|      3.5|
|    2|    3|      2.5|      3.5|
+-----+-----+---------+---------+
"""

df.explain()

"""
== Physical Plan ==
*(6) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803]
+- *(6) SortMergeJoin [col_1#1800], [col_1#1808], Inner
   :- *(3) Sort [col_1#1800 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#5454]
   :     +- Union
   :        :- *(1) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
   :        +- *(2) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
   +- *(5) Sort [col_1#1808 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#5460]
         +- *(4) Scan ExistingRDD[col_1#1808]
"""

filter_union_cols = ["col_1", "measure_1", "col_2", "measure_2"]
df = df.withColumn("found_filter", F.lit(None))
union_dfs = []
for filter_col in filter_union_cols:
  stats = df.filter(F.col(filter_col) < F.lit(1)).drop("found_filter")
  union_df = stats.select(
    "*",
    F.lit(filter_col).alias("found_filter")
  )
  union_dfs += [union_df]

df = df.unionByName(
  union_many(union_dfs)
)

这将优化您的工会并显着减少时间。

底线:谨防使用任何union在 for/while 循环内部调用。如果您必须使用此行为,请使用transforms.verbs.dataframes.union_many动词来优化您的最终数据帧集

查看您的平台文档以获取更多信息和更有用的动词。

专业提示:使用包含的优化here https://stackoverflow.com/questions/68474926/why-do-i-see-repeated-materializations-of-a-dataframe-in-my-build进一步提高您的表现

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

为什么我的构建挂起/需要很长时间才能生成包含许多联合的查询计划? 的相关文章

随机推荐