第一个问题可以通过一直使用类型化列来解决(KeyValueGroupedDataset.agg
期望TypedColumn(-s)
)
您可以将聚合结果定义为:
val eight = lit(8.0)
.as[Double] // Not necessary
val sumByEight = typedSum[MyClass](_.c4)
.divide(eight)
.as[Double] // Required
.name("div(sum(c4), 8)")
并将其插入以下代码:
val myCaseClass = Seq(
MyClass("a", "b", "c", 2.0),
MyClass("a", "b", "c", 3.0)
).toDS
myCaseClass
.groupByKey(myCaseClass => (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3))
.agg(sumByEight)
to get
+-------+---------------+
| key|div(sum(c4), 8)|
+-------+---------------+
|[a,b,c]| 0.625|
+-------+---------------+
第二个问题是由于使用了不符合数据形状的类而导致的。正确的表示可能是:
case class AnotherClass(key: (String, String, String), sum: Double)
与上面定义的数据一起使用:
myCaseClass
.groupByKey(myCaseClass => (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3))
.agg(typedSum[MyClass](_.c4).name("sum"))
.as[AnotherClass]
会给出:
+-------+---+
| key|sum|
+-------+---+
|[a,b,c]|5.0|
+-------+---+
but .as[AnotherClass]
如果这里没有必要Dataset[((String, String, String), Double)]
是可以接受的。
你当然可以跳过所有这些,然后mapGroups
(尽管并非没有性能损失):
import shapeless.syntax.std.tuple._ // A little bit of shapeless
val tuples = myCaseClass
.groupByKey(myCaseClass => (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3))
.mapGroups((group, iter) => group :+ iter.map(_.c4).sum)
有结果
+---+---+---+---+
| _1| _2| _3| _4|
+---+---+---+---+
| a| b| c|5.0|
+---+---+---+---+
reduceGroups
可能是一个更好的选择:
myCaseClass
.groupByKey(myCaseClass => (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3))
.reduceGroups((x, y) => x.copy(c4=x.c4 + y.c4))
由此产生的Dataset
:
+-------+-----------+
| _1| _2|
+-------+-----------+
|[a,b,c]|[a,b,c,5.0]|
+-------+-----------+