首先,你需要知道histogram
生成两个独立的连续作业。一种用于检测数据的最小值和最大值,另一种用于计算实际的直方图。您可以使用 Spark UI 进行检查。
我们可以按照相同的方案在您希望的任意数量的列上构建直方图,只需两项工作。然而,我们不能使用histogram
函数仅用于处理一组双精度数。需要我们自己去实现。第一份工作非常简单。
val Row(min_trx : Double, max_trx : Double) = df.select(min('trx), max('trx)).head
然后我们在本地计算直方图的范围。请注意,我对所有列使用相同的范围。它允许轻松比较列之间的结果(通过将它们绘制在同一图上)。不过,每列具有不同的范围只是对此代码的一个小修改。
val hist_size = 10
val hist_step = (max_trx - min_trx) / hist_size
val hist_ranges = (1 until hist_size)
.scanLeft(min_trx)((a, _) => a + hist_step) :+ max_trx
// I add max_trx manually to avoid rounding errors that would exclude the value
这是第一部分。然后,我们可以使用 UDF 来确定每个值的最终范围,并与 Spark 并行计算所有直方图。
val range_index = udf((x : Double) => hist_ranges.lastIndexWhere(x >= _))
val hist_df = df
.withColumn("rangeIndex", range_index('trx))
.groupBy("M1", "rangeIndex")
.count()
// And voilà, all the data you need is there.
hist_df.show()
+---+----------+-----+
| M1|rangeIndex|count|
+---+----------+-----+
| M2| 2| 2|
| M1| 0| 2|
| M2| 5| 1|
| M1| 3| 2|
| M2| 3| 1|
| M1| 7| 1|
| M2| 10| 1|
+---+----------+-----+
作为奖励,您可以使用 RDD API 或通过收集数据帧并在 scala 中修改它来调整数据以在本地(在驱动程序内)使用它。
这是使用 Spark 的一种方法,因为这是一个关于 Spark 的问题;-)
val hist_map = hist_df.rdd
.map(row => row.getAs[String]("M1") ->
(row.getAs[Int]("rangeIndex"), row.getAs[Long]("count")))
.groupByKey
.mapValues( _.toMap)
.mapValues( hists => (1 to hist_size)
.map(i => hists.getOrElse(i, 0L)).toArray )
.collectAsMap
编辑:如何为每列值构建一个范围:
我们不是计算 M1 的最小值和最大值,而是为列的每个值计算它groupBy
.
val min_max_map = df.groupBy("M1")
.agg(min('trx), max('trx))
.rdd.map(row => row.getAs[String]("M1") ->
(row.getAs[Double]("min(trx)"), row.getAs[Double]("max(trx)")))
.collectAsMap // maps each column value to a tuple (min, max)
然后我们调整 UDF 以便它使用这个映射,我们就完成了。
// for clarity, let's define a function that generates histogram ranges
def generate_ranges(min_trx : Double, max_trx : Double, hist_size : Int) = {
val hist_step = (max_trx - min_trx) / hist_size
(1 until hist_size).scanLeft(min_trx)((a, _) => a + hist_step) :+ max_trx
}
// and use it to generate one range per column value
val range_map = min_max_map.keys
.map(key => key ->
generate_ranges(min_max_map(key)._1, min_max_map(key)._2, hist_size))
.toMap
val range_index = udf((x : Double, m1 : String) =>
range_map(m1).lastIndexWhere(x >= _))
最后只需替换即可range_index('trx)
by range_index('trx, 'M1)
每列值都有一个范围。