Python 多处理:类型错误:__new__() 缺少 1 个必需的位置参数:'path'

2024-02-28

我目前正在尝试使用 joblib 库和多处理后端在 python 3.5 中运行并行进程。但是,每次运行时我都会收到此错误:

Process ForkServerPoolWorker-5:
Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
    self.run()
  File "/opt/anaconda3/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/anaconda3/lib/python3.5/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/opt/anaconda3/lib/python3.5/site-packages/joblib/pool.py", line 362, in get
    return recv()
  File "/opt/anaconda3/lib/python3.5/multiprocessing/connection.py", line 251, in recv
    return ForkingPickler.loads(buf.getbuffer())
TypeError: __new__() missing 1 required positional argument: 'path'

这是我用来运行它的 joblib 代码:

from joblib import Parallel, delayed
results = Parallel(n_jobs=6) (
             delayed(func)(i) for i in array)

默认情况下,后端是多处理的。当我将后端更改为“线程”时,代码运行良好,但对于我的用例来说,与使用多处理相比,线程效率低下。

我也尝试过直接使用以下代码使用多处理,但仍然遇到相同的错误:

from multiprocessing import Pool
with Pool(5) as p:
    results = p.map(func, array)

EDIT:这是我如何尝试使用 joblib 的一个较小示例。我的实现将 joblib 函数包装在类函数内。下面的代码有效,但是当我使用实际数据运行相同的函数时,出现上述错误。如果使用大量数据,下面的函数会导致问题吗?

import numpy as np
import pandas as pd
from joblib import Parallel, delayed
import multiprocessing

class ABTest(object):


    def __init__(self, df, test_comps, test_name):
        self.df = df
        self.test_comps = test_comps
        self.test_name = test_name

        self.variant_dict = {}
        for variant in self.df['test_variant'].unique():
            self.variant_dict[variant] = self.df[self.df['test_variant'] == variant]


        self.test_cols = ['clicks', 'opens', 'buys']
        self.cpu_count = multiprocessing.cpu_count()


    def bootstrap_ci(self, arguments):
        '''
        Finds the confidence interval for the difference in means for
        two test groups using bootstrap

        In: self
            arugments (list) - A list with elements [test_comp, cols], where test_comp
                                is a tuple of two test variants and cols is a list
                                of columns to bootstrap means for.  A single argument
                                must be used for parallel processing
        Out: results (matrix) - confidence interval information for the difference
                        in means of the two groups
        Creates: None
        Modifies: None
        '''
        test_comp = arguments[0]
        cols = arguments[1]

        test_a_df = self.variant_dict[test_comp[0]]
        test_b_df = self.variant_dict[test_comp[1]]

        results = []

        print('Getting Confidence Intervals for Test Groups: {}, {}...'.format(test_comp[0], test_comp[1]))

        for col in cols:
            test_a_sample_mean = []
            test_b_sample_mean = []

            test_a_len = test_a_df.shape[0]
            test_b_len = test_b_df.shape[0]

            for j in range(5000):
                # Get sample means for both test variants
                test_a_bs_mean = test_a_df[col].sample(n=test_a_len, replace=True).mean()
                test_a_sample_mean.append(test_a_bs_mean)

                test_b_bs_mean = test_b_df[col].sample(n=test_b_len, replace=True).mean()
                test_b_sample_mean.append(test_b_bs_mean)

            test_a_s = pd.Series(test_a_sample_mean)
            test_b_s = pd.Series(test_b_sample_mean)

            # Gets confidence interval for the difference in distribution of means
            test_diffs = test_b_s-test_a_s
            z = test_diffs.quantile([.025, 0.05, 0.5, 0.95, 0.975])

            results.append([self.test_name, test_comp[0], test_comp[1], col, z.iloc[0], z.iloc[1], z.iloc[2], z.iloc[3], z.iloc[4]])

        return results


    def run_parallel(self, func, array):
        '''
        Runs a function (func) on each item in array and returns the results

        In:
            func (function that takes one argument) - the function to run in parallel
            array (list or array-like object) - the array to iterate over
        Out: results (list) - The results of running each item in array through func
        Creates: None
        Modifies: None
        '''
        # Never uses more than 6 cores
        n_jobs = min(self.cpu_count - 1, 6)
        results = Parallel(n_jobs=n_jobs) ( \
                            delayed(func) (i) for i in array)

        return results


    def confidence_intervals(self):
        results = self.run_parallel(self.bootstrap_ci, [(x, self.test_cols) for x in self.test_comps])

        results = np.array([y for x in results for y in x])

        return results

if __name__ == '__main__':
    columns = ['id', 'test_variant', 'clicks', 'opens', 'buys']
    data = [[0, 'control', 10, 60, 2], \
            [1, 'test_1', 5, 50, 1], \
            [2, 'test_2', 11, 50, 3], \
            [3, 'control', 8, 55, 1], \
            [4, 'test_1', 5, 40, 0], \
            [5, 'test_2', 15, 100, 5], \
            [6, 'control', 2, 30, 0], \
            [7, 'test_1', 1, 60, 1], \
            [8, 'test_2', 11, 50, 3], \
            [9, 'control', 10, 60, 2], \
            [10, 'test_1', 5, 50, 1], \
            [11, 'test_2', 11, 50, 3], \
            [12, 'control', 10, 60, 2], \
            [13, 'test_1', 5, 50, 1], \
            [14, 'test_2', 11, 50, 3], \
            [15, 'control', 10, 60, 2], \
            [16, 'test_1', 5, 50, 1], \
            [17, 'test_2', 11, 50, 3], \
            [18, 'control', 10, 60, 2], \
            [19, 'test_1', 5, 50, 1], \
            [20, 'test_2', 11, 50, 3]]


    df = pd.DataFrame(data, columns=columns)
    test_comps = [['control', 'test_1'], ['control', 'test_2'], ['test_1', 'test_2']]

    ab = ABTest(df, test_comps, test_name='test')
    results = ab.confidence_intervals()
    print(results)

None

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Python 多处理:类型错误:__new__() 缺少 1 个必需的位置参数:'path' 的相关文章

  • 多处理与 gevent

    目前我正在使用带有发布 订阅模式的 Zeromq 我有一个要发布的工作人员和许多 8 个订阅者 所有人都会订阅 相同的模式 现在我尝试使用多处理来生成订阅者 它可以工作 我错过了一些消息 我使用多重处理的原因是在每条消息到达时对其进行处理
  • Spark 中的广播 Annoy 对象(对于最近邻居)?

    由于 Spark 的 mllib 没有最近邻居功能 我正在尝试使用Annoy https github com spotify annoy为近似最近邻 我尝试广播 Annoy 对象并将其传递给工人 然而 它并没有按预期运行 下面是可重复性的
  • 我知道 scipy curve_fit 可以做得更好

    我使用 python numpy scipy 来实现此算法 用于根据地形坡向和坡度对齐两个数字高程模型 DEM 用于量化冰川厚度变化的卫星高程数据集的联合配准和偏差校正 C Nuth 和 A K b doi 10 5194 tc 5 271
  • 在 python 中读取具有恶意字节 0xc0 的文件,导致 utf-8 和 ascii 出错

    尝试将制表符分隔的文件读入 pandas 数据帧 gt gt gt df pd read table fn na filter False error bad lines False 它会出错 如下所示 b Skipping line 58
  • Python:球体的交集

    我对编程非常陌生 但我决定承担一个有趣的项目 因为我最近学会了如何以参数形式表示球体 当三个球体相交时 有两个不同的交点 除非它们仅在一个奇点处重叠 球体的参数表示 我的代码是根据答案修改的Python matplotlib 绘制 3d 立
  • 在Python中获取目录基名的优雅方法?

    我有几个脚本将目录名称作为输入 并且我的程序在这些目录中创建文件 有时我想获取给程序的目录的基本名称 并用它在目录中创建各种文件 例如 directory name given by user via command line output
  • 在散景中隐藏轴

    如何在散景图中隐藏 x 轴和 y 轴 我已经根据此进行了检查和尝试 p1 figure visible None p1 select type Axis visible 0 xaxis Axis plot p1 visible 0 和喜欢h
  • matplotlib pyplot:子图大小

    如果我绘制如下所示的单个图 它将具有 x y 大小 import matplotlib pyplot as plt plt plot 1 2 1 2 但是 如果我在同一行中绘制 3 个子图 则每个子图的大小均为 x 3 y fig ax p
  • 如何以最大窗口形式保存 matplotlib 图而不是默认大小?

    有人知道我应该如何解决这个问题吗 我知道有一个保存按钮 我可以手动执行此操作 但我正在绘制 100 多个图表 所以我希望有一种方法可以自动执行此操作 我正在使用 TkAgg 后端 并寻找任何可能的解决方案 通过在我的绘图函数末尾使用以下内容
  • Python:如何使用 struct.pack_into 将不同类型的数据打包到字符串缓冲区中

    我正在尝试将一些无符号 int 数据打包到使用创建的字符串缓冲区中ctypes create string buffer 这是以下代码段 以及显示错误的运行示例在键盘上 http codepad org S8nUWMcW import st
  • 使用 nditer 进行浅层迭代

    我有这样一个数组 gt gt gt y np random randint 0 255 2 2 3 gt gt gt array 242 14 211 198 7 0 235 60 81 164 64 236 我必须迭代每个triplet元
  • pandas:如何将嵌套 JSON 解包为数据帧?

    我有这样的 JSON 输出 json json SeriousDlqin2yrs prediction 0 prediction probs 0 0 95 1 0 04 SeriousDlqin2yrs prediction 0 predi
  • 在组织内部分发我的 python 模块

    我用 python 制作了一些模块 我想将它们分发到我的组织内 这些模块已经存储在BitBucket中 例如 有什么方法可以使用 pip install 来分发它们吗 正确的方法是什么 您可以从 GitHub 进行 pip 安装 并且应该能
  • 填充 MultiIndex Pandas Dataframe 中的日期空白

    我想修改 pandas MultiIndex DataFrame 以便每个索引组都包含指定范围内的日期 我希望每个组都用值 0 或NaN Group A Group B Date Value loc a group a 2013 06 11
  • Python 类:通过传递值实现单例还是非单例?

    我有一个 Python 3 类 目前是使用 a 定义的单例 singleton装饰器 但有时需要not成为单身人士 问题 是否可以在从类实例化对象时执行类似于传递参数的操作 并且该参数确定该类是否是单例 我试图找到一种替代方法来复制类并使其
  • 如何在给定目标大小的情况下在 python 中调整图像大小,同时保留纵横比?

    首先 我觉得这是一个愚蠢的问题 对此感到抱歉 目前 我发现计算最佳缩放因子 目标像素数的最佳宽度和高度 同时保留纵横比 的最准确方法是迭代并选择最佳缩放因子 但是必须有更好的方法来做到这一点 一个例子 import cv2 numpy as
  • 从Python中的URL中提取域[重复]

    这个问题在这里已经有答案了 我有一个像这样的网址 http abc hostname com somethings anything 我想得到 hostname com 我可以使用什么模块来完成此任务 我想在python2中使用相同的模块和
  • 使用 python 提取 MP3 URL 的 ID3 标签并进行部分下载

    我需要提取远程 mp3 文件的 ID3 标签和元数据 我写了几行可以获取本地文件的ID3标签 from mutagen mp3 import MP3 import urllib2 audio MP3 Whistle mp3 songtitl
  • 在 Pandas 中按索引分组

    如何使用 groupby by 索引 1 2 3 它们的顺序相同 并获得属于每个索引范围的列分数的总和 基本上我有这个 index score 1 2 2 2 3 2 1 3 2 3 3 3 我想要的是 index score sum 1
  • 使用 PyDrive 将图像上传到 Google Drive

    我有一个关于 PyDrive 的愚蠢问题 我尝试使用 FastAPI 制作一个 REST API 它将使用 PyDrive 将图像上传到 Google Drive 这是我的代码 from fastapi import FastAPI Fil

随机推荐