无法在 pyspark 数据帧上使用 Sklearn 模型进行预测

2024-01-11

我已成功加载 sklearn 模型,但无法对 pyspark 数据帧进行预测。运行下面给定的代码时,出现下面提到的错误。请帮助我获取在 pyspark 上使用 sklearn 模型进行预测的代码。我也搜索过相关问题,但没有找到解决方案。

sc = spark.sparkContext
braodcast_model = sc.broadcast(loaded_model)
braodcast_model.value


#update prediction method
def predictor(cols):
    #call predict method for model
    return model.value.predict(*cols)

udf_predictor = udf(predictor, FloatType())

#apply the udf to dataframe
df_prediction = df.withColumn("prediction", udf_predictor(df.select(list_of_columns)))

我收到以下错误消息

TypeError: Invalid argument, not a string or column. For column literals, use 'lit', 'array',
'struct' or 'create_map' function.

我认为您在达到预期产出方面处于正确的轨道上。
我设法找到解决此类问题的两种可能的解决方案:一种使用火花UDF,其他用途熊猫 UDF.


火花UDF

from pyspark.sql.functions import udf

@udf('integer')
def predict_udf(*cols):
    return int(braodcast_model.value.predict((cols,)))

list_of_columns = df.columns
df_prediction = df.withColumn('prediction', predict_udf(*list_of_columns))

熊猫 UDF

import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf('integer')
def predict_pandas_udf(*cols):
    X = pd.concat(cols, axis=1)
    return pd.Series(braodcast_model.value.predict(X))

list_of_columns = df.columns
df_prediction = df.withColumn('prediction', predict_pandas_udf(*list_of_columns))

可重现的例子

在这里,我使用了 Databricks Community 集群Spark 3.1.2, pandas==1.2.4 and pyarrow==4.0.0.
broadcasted_model是来自 scikit-learn 的简单逻辑回归,经过训练乳腺癌数据集 https://scikit-learn.org/stable/modules/generated/sklearn.datasets.load_breast_cancer.html.

import pandas as pd
import joblib
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import make_pipeline
from pyspark.sql.functions import udf, pandas_udf


# load dataset
X, y = load_breast_cancer(return_X_y=True, as_frame=True)

# split in training and test set
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=28)

# create a small pipeline with standardization and model
pipe = make_pipeline(StandardScaler(), LogisticRegression())

# save and reload the model
path = '/databricks/driver/test_model.joblib'
joblib.dump(model, path)
loaded_model = joblib.load(path)

# sample of unseen data
df = spark.createDataFrame(X_test.sample(50, random_state=42))

# create broadcasted model
sc = spark.sparkContext
braodcast_model = sc.broadcast(loaded_model)

然后我使用了上面说明的两种方法,你会看到输出df_prediction在两种情况下都是相同的。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

无法在 pyspark 数据帧上使用 Sklearn 模型进行预测 的相关文章

  • 学习Python中的解析器

    我记得我读过有关解析器的内容 您只需提供一些示例行 它就知道如何解析某些文本 它只是确定两条线之间的差异 以了解可变部分是什么 我以为它是用 python 写的 但我不确定 有谁知道那是什么图书馆吗 可能你的意思是模板制作器 http co
  • 有什么好的适用于 Google App Engine 应用程序的 AJAX 框架吗? [关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 我正在尝试在我的 Google App Engine 应用程序中实现 AJAX 因此我正在寻找一个好的
  • 使用ideone时如何传入命令行参数?

    我正在使用 ideone 在线解释器 http ideone com http ideone com 来测试一些 C 和 Python 程序 如何指定命令行参数而不是使用 STDIN 输入 看起来你不能 但是快速破解应该做的伎俩 stati
  • 在 Python 中延迟转置列表

    所以 我有一个延迟生成的可迭代的三元组 我试图弄清楚如何将其转换为 3 个可迭代对象 分别由元组的第一个 第二个和第三个元素组成 然而 我希望这件事能懒惰地完成 所以 举例来说 我希望 1 2 3 4 5 6 7 8 9 将变成 1 4 7
  • 如何在plotly(python)中的刻度标签和图形之间添加空格?

    如果我使用绘图创建水平条形图 则每个条形的标签都与图表相对应 我想在标签和图表之间添加一些空间 填充 边距 我怎样才能做到这一点 Example import plotly offline as py import plotly graph
  • 获取字符串模板中所有标识符列表的函数(Python)

    对于标准库string template在Python中 有没有一个函数可以获取所有标识符的列表 例如 使用以下 xml 文件
  • pandas 数据框的最大大小

    我正在尝试使用读取一个有点大的数据集pandas read csv or read stata功能 但我不断遇到Memory Errors 数据帧的最大大小是多少 我的理解是 只要数据适合内存 数据帧就应该没问题 这对我来说不应该是问题 还
  • 使用 python 从 CSV 创建字典

    我有一个 CSV 格式的文件 其中 A B 和 C 是标题 我如何以Python方式将此CSV转换为以下形式的字典 A 1 B 4 C 7 A 2 B 5 C 8 A 3 B 6 C 9 到目前为止我正在尝试以下代码 import csv
  • matplotlib vlines 图中未应用 y 轴的最小值

    我正在 matplotlib 中绘制 vlines 图 数据集中的所有 y 值如下 gt 0 我希望 y 轴最底部的刻度能够读取0 但相反 我得到 500 这是代码 usr bin env python import numpy as np
  • Python GTK3 Treeview 向上或向下移动选择

    如何在树视图中向上或向下移动所选内容 我的想法是 我可以使用向上和向下按钮将选择向上移动一行或向下移动一行 我的 Treeview 使用 ListStore 不确定这是否重要 首先 我将使用我熟悉的 C 代码 如果您在将其翻译为 Pytho
  • 将 str.contains 映射到 pandas DataFrame

    python 初学者 我正在寻找创建字符串的字典映射以及关联的值 我有一个数据框 想要创建一个新列 如果字符串匹配 则会将该列标记为 x df pd DataFrame comp dell notebook dell notebook S3
  • 在Python中随机交错2个数组

    假设我有两个数组 a 1 2 3 4 b 5 6 7 8 9 我想将这两个数组交错为变量 c 注意 a 和 b 不一定具有相同的长度 但我不希望它们以确定性的方式交错 简而言之 仅仅压缩这两个数组是不够的 我不想要 c 1 5 2 6 3
  • 为什么我会在 Python 字符串格式中使用除 %r 之外的其他内容?

    我偶尔会使用 Python 字符串格式 这可以像这样完成 print int i Float f String s 54 34 434 some text 但是 这也可以这样做 print int r Float r String r 54
  • python 中的 F 字符串前缀给出语法错误[重复]

    这个问题在这里已经有答案了 我有一个名为 method 的变量 它的值是 POST 但是当我尝试运行时print f method method is used 它不断在最后一个双引号处给出语法错误 我找不到它这样做的原因 我正在使用 py
  • Django 中使用外键的抽象基类继承

    我正在尝试在 Django 支持的网站上进行模型继承 以遵守 DRY 我的目标是使用一个名为 BasicCompany 的抽象基类来为三个子类提供通用信息 Butcher Baker CandlestickMaker 它们位于各自的应用程序
  • 尝试 numba 时出现巨大错误

    我在使用 numba 时遇到了大量错误 讽刺的是 正确的结果是在错误之后打印的 我正在使用最新的 Anaconda python 并安装了 numba conda install numba 一次在 Ubuntu 13 64 位和 anac
  • JSONDecodeError:额外数据:Python [重复]

    这个问题在这里已经有答案了 我使用以下代码从文件加载 json file file name obj list with open file as f for json obj in f obj list append loads json
  • 如何仅读取 CSV 文件每行的第一列 [重复]

    这个问题在这里已经有答案了 如何在Python中读取CSV文件每行的第一列 我的数据是这样的 1 abc 2 bcd 3 cde 我只需要循环第一列的值 另外 当我在 calc 中打开 csv 文件时 每行中的数据都在同一个单元格中 这正常
  • 如何使用 keras.backend.gradients() 获取梯度值

    我试图获得 Keras 模型的输出相对于模型输入 x 而不是权重 的导数 似乎最简单的方法是使用 keras backend 中的 梯度 它返回梯度张量 https keras io backend https keras io backe
  • 在Python中使用os.makedirs创建目录时出现权限问题

    我只是想处理上传的文件并将其写入工作目录中 该目录的名称是系统时间戳 问题是我想以完全权限创建该目录 777 但我不能 使用以下代码创建的目录755权限 def handle uploaded file upfile cTimeStamp

随机推荐