据我所知,没有可以直接从 Glue 到 Redshift 执行的直接 UPSERT 查询。是否可以在粘合脚本本身中实现临时表概念?
所以我的期望是创建临时表,将其与目标表合并,最后删除它。可以在Glue脚本中实现吗?
通过将“postactions”选项传递给 JDBC 接收器,可以使用 Glue 中的临时表将 upsert 实现到 Redshift 中:
val destinationTable = "upsert_test"
val destination = s"dev_sandbox.${destinationTable}"
val staging = s"dev_sandbox.${destinationTable}_staging"
val fields = datasetDf.toDF().columns.mkString(",")
val postActions =
s"""
DELETE FROM $destination USING $staging AS S
WHERE $destinationTable.id = S.id
AND $destinationTable.date = S.date;
INSERT INTO $destination ($fields) SELECT $fields FROM $staging;
DROP TABLE IF EXISTS $staging
"""
// Write data to staging table in Redshift
glueContext.getJDBCSink(
catalogConnection = "redshift-glue-connections-test",
options = JsonOptions(Map(
"database" -> "conndb",
"dbtable" -> staging,
"overwrite" -> "true",
"postactions" -> postActions
)),
redshiftTmpDir = s"$tempDir/redshift",
transformationContext = "redshift-output"
).writeDynamicFrame(datasetDf)
确保用于写入 Redshift 的用户有足够的权限在暂存架构中创建/删除表。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)