Spark join 基本原理
Spark join的基本实现流程如下图所示,Spark将参与Join的两张表抽象为流式表(StreamTable)和查找表(BuildTable),通常系统会默认设置StreamTable为大表,BuildTable为小表。流式表的迭代器为streamItr,查找表迭代器为BuidIter。Join操作就是遍历streamIter中每条记录,然后从buildIter中查找相匹配的记录。
SortMergeJoin
SortMergeJoin是spark默认的join方式。
步骤:
-
对两张表分别进行shuffle重分区,之后将相同key的记录分到对应分区,每个分区内的数据在join之前都要进行排序,这一步对应Exchange节点和sort节点。也就是spark 的sort merge shuffle过程。
-
遍历流式表,对每条记录都采用顺序查找的方式从查找表中搜索,每遇到一条相同的key就进行join关联。每次处理完一条记录,只需从上一次结束的位置开始继续查找。
BroadcastJoin
BroadcastJoin也叫map join,适用于存在小表的情况。其将小表进行广播,避免shuffle的产生。web ui的sql图可以看到driver collect的时间,build建表压缩时间,broadcast广播时间。需要注意的是:在Outer类型的Join中,基表不能被广播,比如A left outer join B时,只能广播右表B。
触发场景:
ShuffledHashJoin
ShuffledHashJoin避免将小表分发到各个executor上,可以减少driver和executor端的压力。
步骤:
-
对两张表分别进行shuffle重分区,将相同key的记录分到对应分区中,这一步对应Exchange节点
-
将查找表分区构造一个HashMap,然后在流式表中一行行对应查找。
要将来自BuildTable每个分区的记录放到hash表中,那么BuildTable就不能太大,否则就存不下,默认情况下hash join的实现是关闭状态,如果要使用hash join,原生spark必须满足以下四个条件:
-
查找表总体估计大小超过spark.sql.autoBroadcastJoinThreshold
设定的值,即不满足BroadcastJoin 条件
-
关闭优先使用SortMergeJoin开关,spark.sql.join.preferSortMergeJoin=false
-
每个分区的平均大小不超过spark.sql.autoBroadcastJoinThreshold
设定的值,查找表数据量 < 广播数据阈值 * shuffle的partition数。
-
streamIter
的大小是buildIter
三倍以上
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)