高效的 p​​yspark join

2024-02-10

我读过很多关于如何在 pyspark 中进行高效连接的文章。我发现实现高效连接的方法基本上有:

  • 如果可以的话,使用广播连接。 (我通常不能因为数据框太大)
  • 考虑使用非常大的集群。 (我宁愿不因为$$$).
  • Use the 相同的分区器.

最后一个是我宁愿尝试的一个,但我找不到在 pyspark 中做到这一点的方法。我试过了:

df.repartition(numberOfPartitions,['parition_col1','partition_col2'])

但这没有帮助,我仍然需要很长时间才能停止它,因为火花被困在最后几项工作中。

那么,我如何在 pyspark 中使用相同的分区器并加速我的连接,甚至摆脱永远需要的洗牌?我需要使用哪个代码?

PD: 我查过其他文章,甚至堆栈溢出 https://stackoverflow.com/questions/43831387/how-to-avoid-shuffles-while-joining-dataframes-on-unique-keys,但我仍然看不到代码。


如果适合您的要求,您还可以使用两遍方法。首先,重新分区数据并使用分区表(dataframe.write.partitionBy())进行持久化。然后,在循环中连续连接子分区,“附加”到相同的最终结果表。 Sim 对此进行了很好的解释。请参阅下面的链接

在 pyspark 中加入大数据帧的两遍方法 https://stackoverflow.com/questions/37842595/what-is-an-optimized-way-of-joining-large-tables-in-spark-sql

根据上面解释的情况,我能够在循环中串行连接子分区,然后将连接的数据持久保存到配置单元表中。

这是代码。

from pyspark.sql.functions import *
emp_df_1.withColumn("par_id",col('emp_id')%5).repartition(5, 'par_id').write.format('orc').partitionBy("par_id").saveAsTable("UDB.temptable_1")
emp_df_2.withColumn("par_id",col('emp_id')%5).repartition(5, 'par_id').write.format('orc').partitionBy("par_id").saveAsTable("UDB.temptable_2")

因此,如果您要加入整数 emp_id,则可以按 ID 模某个数字进行分区,这样您就可以在 Spark 分区之间重新分配负载,并且具有相似键的记录将被分组在一起并驻留在同一分区上。 然后,您可以读取并循环每个子分区数据,并将两个数据帧连接起来并将它们保存在一起。

counter =0;
paritioncount = 4;
while counter<=paritioncount:
    query1 ="SELECT * FROM UDB.temptable_1 where par_id={}".format(counter)
    query2 ="SELECT * FROM UDB.temptable_2 where par_id={}".format(counter)
    EMP_DF1 =spark.sql(query1)
    EMP_DF2 =spark.sql(query2)
    df1 = EMP_DF1.alias('df1')
    df2 = EMP_DF2.alias('df2')
    innerjoin_EMP = df1.join(df2, df1.emp_id == df2.emp_id,'inner').select('df1.*')
    innerjoin_EMP.show()
    innerjoin_EMP.write.format('orc').insertInto("UDB.temptable")
    counter = counter +1

我已经尝试过了,效果很好。这只是演示两遍方法的示例。您的连接条件可能会有所不同,分区数量也取决于您的数据大小。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

高效的 p​​yspark join 的相关文章

随机推荐