TL;DR可能会出现一些性能下降或损失,但可以忽略不计。
你能解释一下为什么吗?
看到你的问题带有“explain”,这很有趣,这正是用来查看 Spark SQL 背后发生的情况以及它如何执行查询的方法的名称:)
So, use Dataset.explain
甚至更详细的版本Dataset.explain(extended = true)
查看所有优化(以及可能的性能下降)。
def inc = udf( (i: Double) => i + 1)
def double = udf( (i: Double) => i * 2)
val df = Seq(1,2,3).toDF("c")
val q = df.withColumn("r", double(inc($"c")))
由两个 UDF 组成的计划如下所示。
scala> q.explain(extended = true)
== Parsed Logical Plan ==
'Project [c#3, UDF(UDF('c)) AS r#10]
+- AnalysisBarrier Project [value#1 AS c#3]
== Analyzed Logical Plan ==
c: int, r: double
Project [c#3, if (isnull(if (isnull(cast(c#3 as double))) null else UDF(cast(c#3 as double)))) null else UDF(if (isnull(cast(c#3 as double))) null else UDF(cast(c#3 as double))) AS r#10]
+- Project [value#1 AS c#3]
+- LocalRelation [value#1]
== Optimized Logical Plan ==
LocalRelation [c#3, r#10]
== Physical Plan ==
LocalTableScan [c#3, r#10]
让我们看看一个 UDF(两个 UDF 的组合)的计划是什么样子的。
def incAndDouble = udf( (i: Double) => (i + 1) * 2)
val q = df.withColumn("r", incAndDouble($"c"))
scala> q.explain(extended = true)
== Parsed Logical Plan ==
'Project [c#3, UDF('c) AS r#16]
+- AnalysisBarrier Project [value#1 AS c#3]
== Analyzed Logical Plan ==
c: int, r: double
Project [c#3, if (isnull(cast(c#3 as double))) null else UDF(cast(c#3 as double)) AS r#16]
+- Project [value#1 AS c#3]
+- LocalRelation [value#1]
== Optimized Logical Plan ==
LocalRelation [c#3, r#16]
== Physical Plan ==
LocalTableScan [c#3, r#16]
在这种特殊情况下,差异是没有的,因为跨查询的物理计划是相同的,即LocalTableScan
.
它可能与文件或 JDBC 等其他数据源不同,但我个人的建议是开发尽可能小的 UDF,因为它们是 Spark 优化器的黑匣子。
总是如此吗?
不,一点也不,因为这在很大程度上取决于您在 UDF 中所做的事情(但这更多地与是否首先编写 UDF 有关)。
在以下 UDF 为谓词的情况下(即返回布尔值):
def filter = udf((s: Seq[String]) => s.startsWith("A"))
Spark 可以优化 UDF 的使用(如果是的话)not一个 UDF 但很简单filter
操作)并将其下推到数据源以加载更少的数据。这可能会对性能产生巨大影响。