我正在尝试根据我的要求执行并行处理,并且代码似乎可以按预期并行处理 4k-5k 元素。但是,一旦要处理的元素开始增加,代码就会处理一些列表,然后在没有抛出任何错误的情况下,程序突然停止运行。
我检查过,程序没有挂起,RAM 可用(我有 16 Gb RAM),CPU 利用率甚至不到 30%。似乎无法弄清楚发生了什么。我有 100 万个元素需要处理。
def get_items_to_download():
#iterator to fetch all items that are to be downloaded
yield download_item
def start_download_process():
multiproc_pool = multiprocessing.Pool(processes=10)
for download_item in get_items_to_download():
multiproc_pool.apply_async(start_processing, args = (download_item, ), callback = results_callback)
multiproc_pool.close()
multiproc_pool.join()
def start_processing(download_item):
try:
# Code to download item from web API
# Code to perform some processing on the data
# Code to update data into database
return True
except Exception as e:
return False
def results_callback(result):
print(result)
if __name__ == "__main__":
start_download_process()
UPDATE -
发现错误 - BrokenPipeError: [Errno 32] Broken pipeline
Trace -
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/pool.py", line 125, in worker
put((job, i, result))
File "/usr/lib/python3.6/multiprocessing/queues.py", line 347, in put
self._writer.send_bytes(obj)
File "/usr/lib/python3.6/multiprocessing/connection.py", line 200, in send_bytes
self._send_bytes(m[offset:offset + size])
File "/usr/lib/python3.6/multiprocessing/connection.py", line 404, in _send_bytes
self._send(header + buf)
File "/usr/lib/python3.6/multiprocessing/connection.py", line 368, in _send
n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe