我正在尝试为数据集的每个分区拟合一个 ML 模型,但我不知道如何在 Spark 中执行此操作。
我的数据集基本上是这样的按公司划分:
Company | Features | Target
A xxx 0.9
A xxx 0.8
A xxx 1.0
B xxx 1.2
B xxx 1.0
B xxx 0.9
C xxx 0.7
C xxx 0.9
C xxx 0.9
我的目标是以并行方式为每家公司训练一个回归器(我有几亿条记录,有 10 万家公司)。
我的直觉是我需要使用foreachPartition
并行处理分区(即我的公司)并训练和保存每个公司模型。我的主要问题是如何处理iterator
将在调用的函数中使用的类型foreachPartition
.
它看起来像这样:
dd.foreachPartition(
iterator => {var company_df = operator.toDF()
var rg = RandomForestRegressor()
.setLabelCol("target")
.setFeaturesCol("features")
.setNumTrees(10)
var model = rg.fit(company_df)
model.write.save(company_path)
}
)
据我了解,试图将iterator
into a dataframe
不可能,因为 RDD 的概念本身不能存在于foreachPartition
陈述。
我知道这个问题很开放,但我真的很困惑。
在 pyspark 中你可以执行如下操作
import statsmodels.api as sm
# df has four columns: id, y, x1, x2
group_column = 'id'
y_column = 'y'
x_columns = ['x1', 'x2']
schema = df.select(group_column, *x_columns).schema
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
# Input/output are both a pandas.DataFrame
def ols(pdf):
group_key = pdf[group_column].iloc[0]
y = pdf[y_column]
X = pdf[x_columns]
X = sm.add_constant(X)
model = sm.OLS(y, X).fit()
return pd.DataFrame([[group_key] + [model.params[i] for i in x_columns]], columns=[group_column] + x_columns)
beta = df.groupby(group_column).apply(ols)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)