我有一个正在操作的 DataFrame,我想按一组列进行分组,并按组对其余列进行操作。正常情况下RDD
-land 我认为它看起来像这样:
rdd.map( tup => ((tup._1, tup._2, tup._3), tup) ).
groupByKey().
forEachPartition( iter => doSomeJob(iter) )
In DataFrame
-land我会这样开始:
df.groupBy("col1", "col2", "col3") // Reference by name
但如果我的操作比提供的平均值/最小/最大/计数更复杂,我不确定如何对组进行操作分组数据 https://spark.apache.org/docs/1.3.1/api/scala/index.html#org.apache.spark.sql.GroupedData.
例如,我想为每个项目构建一个 MongoDB 文档("col1", "col2", "col3")
组(通过迭代关联的Row
组中的 s),缩小到N
分区,然后将文档插入 MongoDB 数据库。这N
limit 是我想要的最大同时连接数。
有什么建议吗?
您可以自行加入。首先获取组:
val groups = df.groupBy($"col1", $"col2", $"col3").agg($"col1", $"col2", $"col3")
然后你可以将其连接回原始 DataFrame:
val joinedDF = groups
.select($"col1" as "l_col1", $"col2" as "l_col2", $"col3" as "l_col3)
.join(df, $"col1" <=> $"l_col1" and $"col2" <=> $"l_col2" and $"col3" <=> $"l_col3")
虽然这会为您提供与原来完全相同的数据(并且带有 3 个额外的冗余列),但您可以执行另一个联接,为与该行关联的 (col1、col2、col3) 组添加具有 MongoDB 文档 ID 的列。
无论如何,根据我的经验,连接和自连接是处理 DataFrame 中复杂内容的方式。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)