我正在做一些文件解析,这是一个 CPU 密集型任务。无论我向该进程放入多少文件,它使用的 RAM 都不会超过 50MB。
该任务是可并行的,我已将其设置为使用下面的并发 future 将每个文件解析为单独的进程:
from concurrent import futures
with futures.ProcessPoolExecutor(max_workers=6) as executor:
# A dictionary which will contain a list the future info in the key, and the filename in the value
jobs = {}
# Loop through the files, and run the parse function for each file, sending the file-name to it.
# The results of can come back in any order.
for this_file in files_list:
job = executor.submit(parse_function, this_file, **parser_variables)
jobs[job] = this_file
# Get the completed jobs whenever they are done
for job in futures.as_completed(jobs):
# Send the result of the file the job is based on (jobs[job]) and the job (job.result)
results_list = job.result()
this_file = jobs[job]
# delete the result from the dict as we don't need to store it.
del jobs[job]
# post-processing (putting the results into a database)
post_process(this_file, results_list)
问题是,当我使用 futures 运行这个程序时,RAM 使用量猛增,不久之后我就用完了,Python 就崩溃了。这可能在很大程度上是因为 parse_function 的结果有几 MB 大小。一旦结果通过post_processing
,应用程序不再需要它们。正如你所看到的,我正在努力del jobs[job]
清除其中的项目jobs
,但这没有什么区别,内存使用量保持不变,并且似乎以相同的速度增加。
我也确认这不是因为它正在等待post_process
仅使用单个进程即可实现功能,再加上time.sleep(1)
.
futures 文档中没有任何关于内存管理的内容,虽然简短的搜索表明它之前已经在 futures 的实际应用中出现过(python循环中的清除内存 https://stackoverflow.com/questions/31720674/clear-memory-in-python-loop and http://grokbase.com/t/python/python-list/1458ss5etz/real-world-use-of-concurrent-futures http://grokbase.com/t/python/python-list/1458ss5etz/real-world-use-of-concurrent-futures) - 答案并不能转化为我的用例(它们都与超时等有关)。
那么,如何在不耗尽 RAM 的情况下使用并发 future?
(Python 3.5)