Python 多处理:类型错误:__new__() 缺少 1 个必需的位置参数:'path'


我目前正在尝试使用 joblib 库和多处理后端在 python 3.5 中运行并行进程。但是,每次运行时我都会收到此错误:

Process ForkServerPoolWorker-5:
Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.5/multiprocessing/", line 249, in _bootstrap
  File "/opt/anaconda3/lib/python3.5/multiprocessing/", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/anaconda3/lib/python3.5/multiprocessing/", line 108, in worker
    task = get()
  File "/opt/anaconda3/lib/python3.5/site-packages/joblib/", line 362, in get
    return recv()
  File "/opt/anaconda3/lib/python3.5/multiprocessing/", line 251, in recv
    return ForkingPickler.loads(buf.getbuffer())
TypeError: __new__() missing 1 required positional argument: 'path'

这是我用来运行它的 joblib 代码:

from joblib import Parallel, delayed
results = Parallel(n_jobs=6) (
             delayed(func)(i) for i in array)



from multiprocessing import Pool
with Pool(5) as p:
    results =, array)

EDIT:这是我如何尝试使用 joblib 的一个较小示例。我的实现将 joblib 函数包装在类函数内。下面的代码有效,但是当我使用实际数据运行相同的函数时,出现上述错误。如果使用大量数据,下面的函数会导致问题吗?

import numpy as np
import pandas as pd
from joblib import Parallel, delayed
import multiprocessing

class ABTest(object):

    def __init__(self, df, test_comps, test_name):
        self.df = df
        self.test_comps = test_comps
        self.test_name = test_name

        self.variant_dict = {}
        for variant in self.df['test_variant'].unique():
            self.variant_dict[variant] = self.df[self.df['test_variant'] == variant]

        self.test_cols = ['clicks', 'opens', 'buys']
        self.cpu_count = multiprocessing.cpu_count()

    def bootstrap_ci(self, arguments):
        Finds the confidence interval for the difference in means for
        two test groups using bootstrap

        In: self
            arugments (list) - A list with elements [test_comp, cols], where test_comp
                                is a tuple of two test variants and cols is a list
                                of columns to bootstrap means for.  A single argument
                                must be used for parallel processing
        Out: results (matrix) - confidence interval information for the difference
                        in means of the two groups
        Creates: None
        Modifies: None
        test_comp = arguments[0]
        cols = arguments[1]

        test_a_df = self.variant_dict[test_comp[0]]
        test_b_df = self.variant_dict[test_comp[1]]

        results = []

        print('Getting Confidence Intervals for Test Groups: {}, {}...'.format(test_comp[0], test_comp[1]))

        for col in cols:
            test_a_sample_mean = []
            test_b_sample_mean = []

            test_a_len = test_a_df.shape[0]
            test_b_len = test_b_df.shape[0]

            for j in range(5000):
                # Get sample means for both test variants
                test_a_bs_mean = test_a_df[col].sample(n=test_a_len, replace=True).mean()

                test_b_bs_mean = test_b_df[col].sample(n=test_b_len, replace=True).mean()

            test_a_s = pd.Series(test_a_sample_mean)
            test_b_s = pd.Series(test_b_sample_mean)

            # Gets confidence interval for the difference in distribution of means
            test_diffs = test_b_s-test_a_s
            z = test_diffs.quantile([.025, 0.05, 0.5, 0.95, 0.975])

            results.append([self.test_name, test_comp[0], test_comp[1], col, z.iloc[0], z.iloc[1], z.iloc[2], z.iloc[3], z.iloc[4]])

        return results

    def run_parallel(self, func, array):
        Runs a function (func) on each item in array and returns the results

            func (function that takes one argument) - the function to run in parallel
            array (list or array-like object) - the array to iterate over
        Out: results (list) - The results of running each item in array through func
        Creates: None
        Modifies: None
        # Never uses more than 6 cores
        n_jobs = min(self.cpu_count - 1, 6)
        results = Parallel(n_jobs=n_jobs) ( \
                            delayed(func) (i) for i in array)

        return results

    def confidence_intervals(self):
        results = self.run_parallel(self.bootstrap_ci, [(x, self.test_cols) for x in self.test_comps])

        results = np.array([y for x in results for y in x])

        return results

if __name__ == '__main__':
    columns = ['id', 'test_variant', 'clicks', 'opens', 'buys']
    data = [[0, 'control', 10, 60, 2], \
            [1, 'test_1', 5, 50, 1], \
            [2, 'test_2', 11, 50, 3], \
            [3, 'control', 8, 55, 1], \
            [4, 'test_1', 5, 40, 0], \
            [5, 'test_2', 15, 100, 5], \
            [6, 'control', 2, 30, 0], \
            [7, 'test_1', 1, 60, 1], \
            [8, 'test_2', 11, 50, 3], \
            [9, 'control', 10, 60, 2], \
            [10, 'test_1', 5, 50, 1], \
            [11, 'test_2', 11, 50, 3], \
            [12, 'control', 10, 60, 2], \
            [13, 'test_1', 5, 50, 1], \
            [14, 'test_2', 11, 50, 3], \
            [15, 'control', 10, 60, 2], \
            [16, 'test_1', 5, 50, 1], \
            [17, 'test_2', 11, 50, 3], \
            [18, 'control', 10, 60, 2], \
            [19, 'test_1', 5, 50, 1], \
            [20, 'test_2', 11, 50, 3]]

    df = pd.DataFrame(data, columns=columns)
    test_comps = [['control', 'test_1'], ['control', 'test_2'], ['test_1', 'test_2']]

    ab = ABTest(df, test_comps, test_name='test')
    results = ab.confidence_intervals()



