恐怕你不能以一种好的方式做到这一点。想想它在幕后是如何工作的——它将要计数的数据分割成块并将其发送到不同的进程,每个进程计算它的块,然后单个减速器最后将它们全部加起来。虽然每个进程都在计数,但它不知道整个大小,因此无法添加该字段。唯一的方法是在知道整个大小后返回并将其添加到数据中(即连接)。
如果每个组都适合内存(并且您可以配置内存),您可以:
Tsv(args("input"), ('id1, 'id2))
.groupBy('id2)(_.size.toList[(String, String)](('id1, 'id2) -> 'list))
.flatMapTo[(Iterable[(String, String)], Int), (String, String, Int)](('list, 'size) -> ('id1, 'id2, 'size)) {
case (list, size) => list.map(record => (record._1, record._2, size))
}
.write(Tsv(args("output")))
但如果您的系统没有足够的内存,您将不得不使用昂贵的连接。
评论:
您可以使用 Tsv 代替 TextLine,然后使用 mapTo 和拆分。