如何使用 foreachPartition 在 Spark 中为每个分区高效构建一个 ML 模型?

2024-01-08

我正在尝试为数据集的每个分区拟合一个 ML 模型,但我不知道如何在 Spark 中执行此操作。

我的数据集基本上是这样的按公司划分:

Company | Features | Target

A         xxx        0.9
A         xxx        0.8
A         xxx        1.0
B         xxx        1.2
B         xxx        1.0
B         xxx        0.9
C         xxx        0.7
C         xxx        0.9
C         xxx        0.9

我的目标是以并行方式为每家公司训练一个回归器(我有几亿条记录,有 10 万家公司)。 我的直觉是我需要使用foreachPartition并行处理分区(即我的公司)并训练和保存每个公司模型。我的主要问题是如何处理iterator将在调用的函数中使用的类型foreachPartition.

它看起来像这样:

dd.foreachPartition(

    iterator => {var company_df = operator.toDF()
                 var rg = RandomForestRegressor()
                                 .setLabelCol("target")
                                 .setFeaturesCol("features")
                                 .setNumTrees(10)
                 var model = rg.fit(company_df)
                 model.write.save(company_path)
                 }
)

据我了解,试图将iterator into a dataframe不可能,因为 RDD 的概念本身不能存在于foreachPartition陈述。

我知道这个问题很开放,但我真的很困惑。


在 pyspark 中你可以执行如下操作

import statsmodels.api as sm
# df has four columns: id, y, x1, x2

group_column = 'id'
y_column = 'y'
x_columns = ['x1', 'x2']
schema = df.select(group_column, *x_columns).schema

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
# Input/output are both a pandas.DataFrame
def ols(pdf):
    group_key = pdf[group_column].iloc[0]
    y = pdf[y_column]
    X = pdf[x_columns]
      X = sm.add_constant(X)
    model = sm.OLS(y, X).fit()

    return pd.DataFrame([[group_key] + [model.params[i] for i in   x_columns]], columns=[group_column] + x_columns)

beta = df.groupby(group_column).apply(ols)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何使用 foreachPartition 在 Spark 中为每个分区高效构建一个 ML 模型? 的相关文章

随机推荐

  • 如何仅切换 *next* .class 或 div(而不是其余的)

    我想知道如何仅切换 子 div 单击在 html 上重复的按钮 如下所示 div class button div class hide toggle Blah1 div div div class button div class hid
  • asp.net mvc 3 中 DataAnnotations 的行为是否发生了变化?

    我有一个带有属性的模型 ReadOnly true public decimal BodyMassIndex get private set 当我打电话时在我看来 Html EditorForModel 我仍然得到该属性的标准可编辑文本框
  • 从 HttpResponseMessage 获取内容/消息

    我正在尝试获取 HttpResponseMessage 的内容 它应该是 message Action does not exist success false 但我不知道如何从 HttpResponseMessage 中获取它 HttpC
  • 使用 log4net 或 NLog 的 WCF 日志记录/跟踪和活动 ID 传播

    我见过很多关于日志记录的其他问题 最佳实践 什么日志平台最好 等等 这里有一些关于 SO 的链接 其中对这个主题进行了很好的讨论 记录最佳实践 https stackoverflow com questions 576185 logging
  • 文件中的 Python 3 unicode 到 utf-8

    我试图解析日志文件 但文件格式始终为 unicode 我想要自动化的通常流程 我在记事本中提取文件 另存为 更改编码unicode to UTF 8 然后在上面运行python程序 这就是我想在 Python 3 4 中自动化的过程 几乎只
  • 谷歌地图 V2“不幸的是应用程序已停止”

    I just begin to learn Android take 4 days to try work maps view but not work that error Unfortunately the app has stoppe
  • 如何使用X509证书和C#进行非对称加密?

    我希望使用公钥和私钥使用 X509 证书加密文件 并将它们发送到远程服务器 我该怎么做 这可能吗 如何生成证书以及公钥和私钥对 See 这个问题 https stackoverflow com questions 1623189 rsacr
  • 在 Python 中使用列表理解查找最小/最大日期

    所以我有这个清单 snapshots 2014 04 05 2014 04 06 2014 04 07 2014 04 08 2014 04 09 我想使用列表理解找到最早的日期 这就是我现在所拥有的 earliest date snaps
  • Apache - 限制 IP 不起作用

    我有一个子域 我只想在内部访问 我试图通过编辑该域的 VirtualHost 块来在 Apache 中实现此目的 有人能看出我哪里出错了吗 注意 我这里的内部IP地址是192 168 10 xxx 我的代码如下
  • 在 PyQt 应用程序上使用 cx_freeze 时出现语法错误

    当尝试使用 PyQt4 从 Python 3 脚本构建 exe 文件时 这是一个非常烦人的问题 我认为这与使用有关uic动态加载模块 ui files cx freeze返回 File E Python32 32 lib site pack
  • 使用 array_multisort() 对多维 PHP 数组进行不区分大小写的排序

    经过大量搜索后 我无法找到有关如何使用 array multisort 按一个字段不区分大小写地对多维数组进行排序的良好解释 我发现在处理数据库查询信息时这是一个非常有用的功能 因此我想分享一下 我应该注意这仅适用于 php 5 4 Exa
  • ANDROID:不同尺寸的ImageView

    我是 android studio 的新手 因此 如果问题很琐碎 请原谅 我的问题是理解布局 我的布局和相应的值文件夹如下所示 我的问题是没有选择正确的布局 例如 对于 Nexus 4 4 7 英寸 768x1280 xhdpi 在横向模式
  • R Shiny:删除 ggplot2 背景以使其透明

    我想让 R Shiny Server 上的 ggplots 透明 我的绘图 ui R 如下 plotOutput malPie width 95 在 server R 中我的绘图函数如下 c lt ggplot dataFrame aes
  • 在pentaho中休息客户端

    我对 pentaho 数据集成工具非常陌生 我想从我的 pentaho 中使用一个安静的服务 post web 服务 为此 我发现我应该使用休息客户端 但是当我给出网址和正文 因为我想使用后期服务 时 它并没有在数据库中进行必要的更改 谁能
  • Moment.js 包含日期格式中间的文本

    我的格式为 2015 年 1 月 27 日上午 8 17 我需要使用 moment js 显示它 我正在使用格式 moment format MMM D YYYY at h mm A z 除了 at 这个词之外 一切都很好 我怎样才能让这个
  • 在oracle中创建密码字段

    安全外部密码存储 有什么用 我可以使用 安全外部密码存储 在 Oracle 表中创建密码字段吗 或者如何在不使用 安全外部密码存储 的情况下在 Oracle 表中创建密码字段 一种不使用 安全外部密码存储 无论是什么 的方法是将 RAW 1
  • 休眠级联持续

    我有一个关于 Hibernate 的一般性问题正在解决 我有 A 类和 B 类 其中 B 依赖于 A 在我的代码中 当我调用 em persist objOfTypeA 时 我希望插入并插入到表 AAA 和 BBB 中 如果我手动保留 A
  • Powershell:递归移动文件

    我正在尝试将所有构建输出文件和文件夹复制到Bin文件夹 输出目录 Bin 除了一些保留在输出目录 The Bin文件夹永远不会被删除 初始条件 Output config log4net file1 txt file2 txt file3
  • 删除 R 图形设备中的所有边距

    所以我在摆脱图形设备的整个边距时遇到了一些麻烦 我已将 mar 设置为 0 但边缘周围仍然存在一些持久空间 例如 plot new par mar c 0 0 0 0 plot window c 0 1 c 0 1 points c 1 1
  • 如何使用 foreachPartition 在 Spark 中为每个分区高效构建一个 ML 模型?

    我正在尝试为数据集的每个分区拟合一个 ML 模型 但我不知道如何在 Spark 中执行此操作 我的数据集基本上是这样的按公司划分 Company Features Target A xxx 0 9 A xxx 0 8 A xxx 1 0 B