Spark SQL 中是否有执行“INSERT IF NOT EXISTS ELSE UPDATE”的规定。
我有 Spark SQL 表“ABC”,其中有一些记录。
然后我有另一批记录,我想根据它们是否存在于该表中来插入/更新该表中。
我可以在 SQL 查询中使用 SQL 命令来实现这一点吗?
在常规 Spark 中,这可以通过以下方式实现join
随后是一个map
像这样:
import spark.implicits._
val df1 = spark.sparkContext.parallelize(List(("id1", "orginal"), ("id2", "original"))).toDF("df1_id", "df1_status")
val df2 = spark.sparkContext.parallelize(List(("id1", "new"), ("id3","new"))).toDF("df2_id", "df2_status")
val df3 = df1
.join(df2, 'df1_id === 'df2_id, "outer")
.map(row => {
if (row.isNullAt(2))
(row.getString(0), row.getString(1))
else
(row.getString(2), row.getString(3))
})
这产生:
scala> df3.show
+---+--------+
| _1| _2|
+---+--------+
|id3| new|
|id1| new|
|id2|original|
+---+--------+
你也可以使用select
with udfs
代替map
,但在这种带有空值的特殊情况下,我个人更喜欢map
变体。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)