如果您使用的是 pyspark 2.x,则可以使用分位数离散化器 https://spark.apache.org/docs/latest/ml-features.html#quantilediscretizer来自使用的 ml lib近似分位数() https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameStatFunctions.approxQuantile and 铲斗机 https://spark.apache.org/docs/latest/ml-features.html#bucketizer在引擎盖下。
但是,由于您使用的是 pyspark 1.6.x,您需要:
1. 查找列的分位数值
您可以通过两种方式找到分位数值:
通过计算列的百分位数百分比_排名() https://spark.apache.org/docs/1.6.0/api/python/pyspark.sql.html#pyspark.sql.functions.percent_rank并提取百分位值接近您想要的分位数的列值
Follow 这个答案中的方法 https://stackoverflow.com/questions/31432843/how-to-find-median-and-quantiles-using-spark这解释了如何使用 pyspark
这是我的近似分位数值的示例实现:
from pyspark.sql import functions as F
from pyspark.sql import Window
def compute_quantiles(df, col, quantiles):
quantiles = sorted(quantiles)
# 1. compute percentile
df = df.withColumn("percentile", F.percent_rank().over(Window.orderBy(col)))
# 2. categorize quantile based on the desired quantile and compute errors
df = df.withColumn("percentile_cat1", F.lit(-1.0))
df = df.withColumn("percentile_err1", F.lit(-1.0))
df = df.withColumn("percentile_cat2", F.lit(-1.0))
df = df.withColumn("percentile_err2", F.lit(-1.0))
# check percentile with the lower boundaries
for idx in range(0, len(quantiles)-1):
q = quantiles[idx]
df = df.withColumn("percentile_cat1", F\
.when( (F.col("percentile_cat1") == -1.0) &
(F.col("percentile") <= q), q)\
.otherwise(F.col("percentile_cat1")))
df = df.withColumn("percentile_err1", F\
.when( (F.col("percentile_err1") == -1.0) &
(F.col("percentile") <= q),
F.pow(F.col("percentile") - q, 2))\
.otherwise(F.col("percentile_err1")))
# assign the remaining -1 values in the error to the largest squared error of 1
df = df.withColumn("percentile_err1", F\
.when(F.col("percentile_err1") == -1.0, 1)\
.otherwise(F.col("percentile_err1")))
# check percentile with the upper boundaries
for idx in range(1, len(quantiles)):
q = quantiles[idx]
df = df.withColumn("percentile_cat2", F\
.when((F.col("percentile_cat2") == -1.0) &
(F.col("percentile") <= q), q)\
.otherwise(F.col("percentile_cat2")))
df = df.withColumn("percentile_err2",F\
.when((F.col("percentile_err2") == -1.0) &
(F.col("percentile") <= q),
F.pow(F.col("percentile") - q, 2))\
.otherwise(F.col("percentile_err2")))
# assign the remaining -1 values in the error to the largest squared error of 1
df = df.withColumn("percentile_err2", F\
.when(F.col("percentile_err2") == -1.0, 1)\
.otherwise(F.col("percentile_err2")))
# select the nearest quantile to the percentile
df = df.withColumn("percentile_cat", F\
.when(F.col("percentile_err1") < F.col("percentile_err2"),
F.col("percentile_cat1"))\
.otherwise(F.col("percentile_cat2")))
df = df.withColumn("percentile_err", F\
.when(F.col("percentile_err1") < F.col("percentile_err2"),
F.col("percentile_err1"))\
.otherwise(F.col("percentile_err2")))
# 3. approximate quantile values by choosing the value with the lowest error at each percentile category
df = df.withColumn("approx_quantile", F\
.first(col).over(Window\
.partitionBy("percentile_cat")\
.orderBy(F.asc("percentile_err"))))
return df
def extract_quantiles(df):
df_quantiles = df.select("percentile_cat", "approx_quantile").distinct()
rows = df_quantiles.collect()
quantile_values = [ row.approx_quantile for row in rows ]
return quantile_values
我想要从上面实现的是计算列中每行的百分位数,并将其分类到最近的分位数。将百分位数分类到最接近的分位数可以通过选择与百分位数差异(平方误差)最小的最低分位数类别来完成。
1. Computing Percentile
首先,我使用以下方法计算列的百分位数百分比_排名() https://spark.apache.org/docs/1.6.0/api/python/pyspark.sql.html#pyspark.sql.functions.percent_rank, a 窗函数 https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html在 pyspark 中。您可以将 Window 视为数据的分区规范。自从percent_rank()
是一个Window函数,所以需要传入Window。
2. Categorize percentile to quantile boundaries and compute errors
最接近百分位数的分位数类别可以是以下,等于 or above it。因此,我需要计算误差两次:第一次将百分位数与下分位数界限进行比较,第二次将其与上分位数界限进行比较。注意 ≤ 运算符用于检查百分位数是否小于或equal to边界。在知道百分位数的直接上分位数边界和下分位数边界后,我们可以通过选择误差最低的低于或等于或高于或等于类别的分位数,将百分位数分配给最近的分位数类别。
3. Approximate quantile values
一旦我们知道每个百分位数的所有最接近的分位数类别,我们就可以近似分位数值:它是每个分位数类别具有最低误差的值。可以使用以下方法计算该近似分位数值first() http://spark.apache.org/docs/1.6.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.first每个类别分区的函数使用Window https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html。接下来,要提取分位数值,我们只需从数据框中选择唯一的percentileCategory-approxQuantileValue 对。
测试我的数据(~10000 行)后desired_quantiles = [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]
,我发现我的示例实现非常接近approxQuantile
结果。当我减少提供给的误差时,两个结果值变得更加接近approxQuantile
.
Using extract_quantiles(compute_quantile(df, col, quantiles))
:
Using approxQuantile
:
2.使用分桶器
找到分位数值后,您可以使用 pyspark 的 Bucketizer 根据分位数对值进行分桶。 Bucketizer 在 pyspark 1.6.x 中均可用[1] https://spark.apache.org/docs/1.6.0/ml-features.html#bucketizer[2] https://spark.apache.org/docs/1.6.0/api/python/pyspark.ml.html#pyspark.ml.feature.Bucketizer和 2.x[3] https://spark.apache.org/docs/2.4.0/ml-features.html#bucketizer[4] https://spark.apache.org/docs/2.4.0/api/python/pyspark.ml.html#pyspark.ml.feature.Bucketizer
以下是如何执行分桶化的示例:
from pyspark.ml.feature import Bucketizer
bucketedData = df
desired_quantiles = [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0] # must be sorted
for col in df.columns:
quantile_values = extract_quantiles(compute_quantiles(df, col, desired_quantiles))
splits = [ boundary_values ] # replace this with quantile_values
bucketizer = Bucketizer()\
.setInputCol(col)\
.setOutputCol("{}_quantile".format(col))\
.setSplits(splits)
bucketedData = bucketizer.transform(bucketedData)
您可以更换value_boundaries
使用您在步骤 1 中找到的分位数值或您想要的任何存储桶分割范围。当您使用铲斗机时,整个列值范围必须包含在分割内。否则,指定分割之外的值将被视为错误。无限值,例如-float("inf")
, float("inf")
如果您不确定数据的值边界,则必须显式提供以覆盖所有浮动值。