出于好奇,您使用的是哪个版本的 Apache Spark? Apache Spark 2.0+ 中进行了一些修复,其中包括对approxQuantile
.
如果我要运行下面的 pySpark 代码片段:
rdd = sc.parallelize([[1, 0.0], [1, 0.0], [1, 1.0], [1, 1.0], [1, 1.0], [1, 1.0]])
df = rdd.toDF(['id', 'num'])
df.createOrReplaceTempView("df")
与median
计算使用approxQuantile
as:
df.approxQuantile("num", [0.5], 0.25)
or
spark.sql("select percentile_approx(num, 0.5) from df").show()
结果是:
-
火花2.0.0: 0.25
-
火花2.0.1: 1.0
-
火花2.1.0: 1.0
请注意,因为这些是近似数字(通过approxQuantile
)虽然一般来说这应该运作良好。如果您需要精确的中位数,一种方法是使用numpy.median
。下面的代码片段已为此更新df
基于 gench 的 SO 响应的示例如何使用 Python Dataframe API 在 Apache Spark 中查找中位数?:
from pyspark.sql.types import *
import pyspark.sql.functions as F
import numpy as np
def find_median(values):
try:
median = np.median(values) #get the median of values in a list in each row
return round(float(median),2)
except Exception:
return None #if there is anything wrong with the given values
median_finder = F.udf(find_median,FloatType())
df2 = df.groupBy("id").agg(F.collect_list("num").alias("nums"))
df2 = df2.withColumn("median", median_finder("nums"))
# print out
df2.show()
输出为:
+---+--------------------+------+
| id| nums|median|
+---+--------------------+------+
| 1|[0.0, 0.0, 1.0, 1...| 1.0|
+---+--------------------+------+
更新:使用 RDD 的 Spark 1.6 Scala 版本
如果您使用的是 Spark 1.6,您可以计算median
通过 Eugene Zhulenev 的回复使用 Scala 代码如何使用 Apache Spark 计算准确的中位数。下面是适用于我们的示例的修改后的代码。
import org.apache.spark.SparkContext._
val rdd: RDD[Double] = sc.parallelize(Seq((0.0), (0.0), (1.0), (1.0), (1.0), (1.0)))
val sorted = rdd.sortBy(identity).zipWithIndex().map {
case (v, idx) => (idx, v)
}
val count = sorted.count()
val median: Double = if (count % 2 == 0) {
val l = count / 2 - 1
val r = l + 1
(sorted.lookup(l).head + sorted.lookup(r).head).toDouble / 2
} else sorted.lookup(count / 2).head.toDouble
输出为:
// output
import org.apache.spark.SparkContext._
rdd: org.apache.spark.rdd.RDD[Double] = ParallelCollectionRDD[227] at parallelize at <console>:34
sorted: org.apache.spark.rdd.RDD[(Long, Double)] = MapPartitionsRDD[234] at map at <console>:36
count: Long = 6
median: Double = 1.0
请注意,这是使用以下方法计算精确中位数RDDs
- 也就是说,您需要将 DataFrame 列转换为 RDD 才能执行此计算。