如果适合您的要求,您还可以使用两遍方法。首先,重新分区数据并使用分区表(dataframe.write.partitionBy())进行持久化。然后,在循环中连续连接子分区,“附加”到相同的最终结果表。
Sim 对此进行了很好的解释。请参阅下面的链接
在 pyspark 中加入大数据帧的两遍方法 https://stackoverflow.com/questions/37842595/what-is-an-optimized-way-of-joining-large-tables-in-spark-sql
根据上面解释的情况,我能够在循环中串行连接子分区,然后将连接的数据持久保存到配置单元表中。
这是代码。
from pyspark.sql.functions import *
emp_df_1.withColumn("par_id",col('emp_id')%5).repartition(5, 'par_id').write.format('orc').partitionBy("par_id").saveAsTable("UDB.temptable_1")
emp_df_2.withColumn("par_id",col('emp_id')%5).repartition(5, 'par_id').write.format('orc').partitionBy("par_id").saveAsTable("UDB.temptable_2")
因此,如果您要加入整数 emp_id,则可以按 ID 模某个数字进行分区,这样您就可以在 Spark 分区之间重新分配负载,并且具有相似键的记录将被分组在一起并驻留在同一分区上。
然后,您可以读取并循环每个子分区数据,并将两个数据帧连接起来并将它们保存在一起。
counter =0;
paritioncount = 4;
while counter<=paritioncount:
query1 ="SELECT * FROM UDB.temptable_1 where par_id={}".format(counter)
query2 ="SELECT * FROM UDB.temptable_2 where par_id={}".format(counter)
EMP_DF1 =spark.sql(query1)
EMP_DF2 =spark.sql(query2)
df1 = EMP_DF1.alias('df1')
df2 = EMP_DF2.alias('df2')
innerjoin_EMP = df1.join(df2, df1.emp_id == df2.emp_id,'inner').select('df1.*')
innerjoin_EMP.show()
innerjoin_EMP.write.format('orc').insertInto("UDB.temptable")
counter = counter +1
我已经尝试过了,效果很好。这只是演示两遍方法的示例。您的连接条件可能会有所不同,分区数量也取决于您的数据大小。