我对这个解决方案并不感到自豪,因为我认为可能有一个更有效的解决方案,但无论如何我都会将其留在这里。希望能帮助到你
import org.apache.spark.sql.functions._
val flatten_distinct = (array_distinct _) compose (flatten _)
val df = Seq(
("d1","a1"),
("d2","a1"),
("d1","a2"),
("d2","a3"),
("d3","a4"),
("d3","a5"),
("d4","a6")
).toDF("d_id","u_id")
val userDevices = df
.groupBy("u_id")
.agg(collect_list("d_id").alias("d_id_list"))
//+----+---------+
//|u_id|d_id_list|
//+----+---------+
//| a5| [d3]|
//| a3| [d2]|
//| a4| [d3]|
//| a2| [d1]|
//| a1| [d1, d2]|
//| a6| [d4]|
//+----+---------+
val accountsByDevice = df
.groupBy("d_id")
.agg(collect_list("u_id").alias("u_id_list"))
//+----+---------+
//|d_id|u_id_list|
//+----+---------+
//| d2| [a3, a1]|
//| d3| [a4, a5]|
//| d1| [a1, a2]|
//| d4| [a6]|
//+----+---------+
val ungroupedDf = userDevices
.join(accountsByDevice, expr("array_contains(d_id_list,d_id)"))
.groupBy("d_id_list")
.agg(collect_set("u_id_list") as "set")
.select(col("d_id_list") as "d_id", flatten_distinct(col("set")) as "u_id")
.select(explode(col("d_id")) as "d_id", col("u_id"), size(col("u_id")) as "size")
//+----+------------+----+
//|d_id| u_id|size|
//+----+------------+----+
//| d2| [a1, a3]| 2|
//| d1|[a1, a3, a2]| 3|
//| d2|[a1, a3, a2]| 3|
//| d3| [a4, a5]| 2|
//| d1| [a1, a2]| 2|
//| d4| [a6]| 1|
//+----+------------+----+
val finalDf = ungroupedDf
.join(ungroupedDf.groupBy("d_id").agg(max(col("size")) as "size"), Seq("size","d_id"))
.groupBy("u_id")
.agg(collect_set("d_id") as "d_id")
.withColumn("unique_id", monotonically_increasing_id())
//+------------+--------+-------------+
//| u_id| d_id| unique_id|
//+------------+--------+-------------+
//|[a1, a2, a3]|[d1, d2]|1228360646656|
//| [a4, a5]| [d3]|1297080123392|
//| [a6]| [d4]|1520418422784|
//+------------+--------+-------------+