我想出了一个使用 pandas_udf 的解决方案。纯 Spark 或 scala 解决方案是首选,但尚未提供。
假设我的数据是
import pandas as pd
df_pd = pd.DataFrame([['cat1',10.],['cat1',20.],['cat1',11.],['cat1',21.],['cat1',22.],['cat1',9.],['cat2',101.],['cat2',201.],['cat2',111.],['cat2',214.],['cat2',224.],['cat2',99.]],columns=['cat','val'])
df_sprk = spark.createDataFrame(df_pd)
首先解决pandas中的问题:
from sklearn.cluster import KMeans
kmeans = KMeans(n_clusters=2,random_state=0)
def skmean(kmeans,x):
X = np.array(x)
kmeans.fit(X)
return(kmeans.predict(X))
您可以将 skmean() 应用于 panda 数据框(以确保其正常工作):
df_pd.groupby('cat').apply(lambda x:skmean(kmeans,x)).reset_index()
为了将该函数应用于 pyspark 数据帧,我们使用 pandas udf。但首先为输出数据帧定义一个模式:
from pyspark.sql.types import *
schema = StructType(
[StructField('cat',StringType(),True),
StructField('clusters',ArrayType(IntegerType()))])
将上面的函数转换为 pandas_udf:
from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType
@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def skmean_udf(df):
result = pd.DataFrame(
df.groupby('cat').apply(lambda x: skmean(kmeans,x))
result.reset_index(inplace=True, drop=False)
return(result)
您可以按如下方式使用该功能:
df_spark.groupby('cat').apply(skmean_udf).show()