我找不到更好的方法来描述我面临的错误,但每次我尝试对循环调用实现多重处理时,似乎都会出现此错误。
我使用了 sklearn.externals.joblib 和 multiprocessing.Process 但错误相似但不同。
想要应用多重处理的原始循环,其中一次迭代在单线程/进程中执行
for dd in final_col_dates:
idx1 = final_col_dates.tolist().index(dd)
dataObj = GetPrevDataByDate(d1, a, dd, self.start_hour_of_day)
data2 = dataObj.fit()
dataObj = GetAppointmentControlsSchedule(data2, idx1, d, final_col_dates_mod, dd, self.DC, frgt_typ_filter)
data3 = dataObj.fit()
if idx1 > 0:
data3['APPT_SCHD_ARVL_D_{}'.format(idx1)] = np.nan
iter += 1
days_out_vars.append(data3)
为了将上面的代码片段实现为多处理,我创建了一个方法,其中上面的代码除了for loop.
使用 Joblib,以下是我的代码片段。
Parallel(n_jobs=2)(
delayed(self.ParallelLoopTest)(dd, final_col_dates, d1, a, d, final_col_dates_mod, iter, return_list)
for dd in final_col_dates)
变量返回列表是在方法 ParallelLoopTest 内执行的共享变量。它被声明为:
manager = Manager()
return_list = manager.list()
使用上面的代码片段,我遇到以下错误:
Process SpawnPoolWorker-3:
Traceback (most recent call last):
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\process.py", line 249, in _bootstrap
self.run()
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\pool.py", line 108, in worker
task = get()
File "C:\Users\dkanhar\Anaconda3\lib\site-packages\sklearn\externals\joblib\pool.py", line 359, in get
return recv()
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\connection.py", line 251, in recv
return ForkingPickler.loads(buf.getbuffer())
TypeError: function takes at most 0 arguments (1 given)
我还尝试了多处理模块来执行上述代码,但仍然遇到类似的错误。以下代码用于使用多处理模块运行:
for dd in final_col_dates:
# multiprocessing.Pipe(False)
p = multiprocessing.Process(target=self.ParallelLoopTest, args=(dd, final_col_dates, d1, a, d, final_col_dates_mod, iter, return_list))
jobs.append(p)
p.start()
for proc in jobs:
proc.join()
而且,我面临以下错误回溯:
File "<string>", line 1, in <module>
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\spawn.py", line 106, in spawn_main
exitcode = _main(fd)
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\spawn.py", line 116, in _main
self = pickle.load(from_parent)
TypeError: function takes at most 0 arguments (1 given)
Traceback (most recent call last):
File "E:/Projects/Predictive Inbound Cartoon Estimation-MLO/Python/dataprep/DataPrep.py", line 457, in <module>
print(obj.fit())
File "E:/Projects/Predictive Inbound Cartoon Estimation-MLO/Python/dataprep/DataPrep.py", line 39, in fit
return self.__driver__()
File "E:/Projects/Predictive Inbound Cartoon Estimation-MLO/Python/dataprep/DataPrep.py", line 52, in __driver__
final = self.process_()
File "E:/Projects/Predictive Inbound Cartoon Estimation-MLO/Python/dataprep/DataPrep.py", line 135, in process_
sch_dat = self.inline_apply_(all_dates_schd, d1, d2, a)
File "E:/Projects/Predictive Inbound Cartoon Estimation-MLO/Python/dataprep/DataPrep.py", line 297, in inline_apply_
p.start()
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\process.py", line 105, in start
self._popen = self._Popen(self)
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\context.py", line 212, in _Popen
return _default_context.get_context().Process._Popen(process_obj)
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\context.py", line 313, in _Popen
return Popen(process_obj)
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\popen_spawn_win32.py", line 66, in __init__
reduction.dump(process_obj, to_child)
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\reduction.py", line 59, in dump
ForkingPickler(file, protocol).dump(obj)
BrokenPipeError: [Errno 32] Broken pipe
所以,我尝试取消注释该行多处理.Pipe(False)认为这可能是因为使用了 Pipe,我禁用了它,但问题仍然存在,我面临同样的错误。
如果有任何帮助,以下是我的方法 ParallerLoopTest:
def ParallelLoopTest(self, dd, final_col_dates, d1, a, d, final_col_dates_mod, iter, days_out_vars):
idx1 = final_col_dates.tolist().index(dd)
dataObj = GetPrevDataByDate(d1, a, dd, self.start_hour_of_day)
data2 = dataObj.fit()
dataObj = GetAppointmentControlsSchedule(data2, idx1, d, final_col_dates_mod, dd, self.DC, frgt_typ_filter)
data3 = dataObj.fit()
if idx1 > 0:
data3['APPT_SCHD_ARVL_D_{}'.format(idx1)] = np.nan
print("Iter ", iter)
iter += 1
days_out_vars.append(data3)
我之所以说类似的错误是因为如果你查看这两个错误的 Traceback,它们之间都有类似的错误行:
类型错误:函数最多接受 0 个参数(给定 1 个)从 Pickle 加载时,我不知道为什么会发生这种情况。
另请注意,我之前已经在其他项目中成功实现了这两个模块,但从未遇到过问题,所以我不知道为什么这个问题现在开始出现,也不知道这个问题到底意味着什么。
任何帮助将不胜感激,因为三天以来我一直在浪费时间来调试它。
Thanks
在上次回答后编辑 1
回答后,我尝试了以下内容。
添加装饰器@静态方法,删除 self,并使用 DataPrep.ParallelLoopTest(args) 调用该方法。
另外,将该方法移出类 DataPrep,并通过 ParallelLoopTest(args) 简单地调用,
但在这两种情况下,错误仍然相同。
PS:我尝试在这两种情况下使用 joblib 。
因此,这两种解决方案都不起作用。
新方法定义:
def ParallelLoopTest(dd, final_col_dates, d1, a, d, final_col_dates_mod, iter, days_out_vars, DC, start_hour):
idx1 = final_col_dates.tolist().index(dd)
dataObj = GetPrevDataByDate(d1, a, dd, start_hour_of_day)
data2 = dataObj.fit()
dataObj = GetAppointmentControlsSchedule(data2, idx1, d, final_col_dates_mod, dd, DC, frgt_typ_filter)
data3 = dataObj.fit()
if idx1 > 0:
data3['APPT_SCHD_ARVL_D_{}'.format(idx1)] = np.nan
print("Iter ", iter)
iter += 1
days_out_vars.append(data3)
Edit 2:
我遇到了错误,因为 Python 无法腌制一些大型数据帧。我的参数/参数中有 2 个 DataFrame,其中一个大约 20MB,另外 200MB 采用 pickle 格式。但这不应该是一个问题吧?我们应该能够传递 Pandas DataFrame。如我错了请纠正我。
另外,解决方法是我在使用随机名称调用方法之前将 DataFrame 保存为 csv,传递文件名并读取 csv,但这是一个缓慢的过程,因为它涉及推理巨大的 csv 文件。有什么建议么?