您可以通过以下方法来执行此操作,而无需更改您的worker
功能。需要两个步骤:
- Use the
maxtasksperchild
您可以传递给的选项multiprocessing.Pool
确保池中的工作进程在每次任务执行后重新启动。
- 将现有的工作函数包装在另一个函数中,该函数将调用
worker
在守护线程中,然后等待该线程的结果timeout
秒。使用守护线程很重要,因为进程在退出之前不会等待守护线程完成。
如果超时到期,您将退出(或中止 - 这取决于您)包装器函数,这将结束任务,并且因为您已经设置maxtasksperchild=1
,导致Pool
终止工作进程并启动一个新的进程。这将意味着执行实际工作的后台线程也会被中止,因为它是一个守护线程,并且它所在的进程被关闭。
import multiprocessing
from multiprocessing.dummy import Pool as ThreadPool
from functools import partial
def worker(x, y, z):
pass # Do whatever here
def collectMyResult(result):
print("Got result {}".format(result))
def abortable_worker(func, *args, **kwargs):
timeout = kwargs.get('timeout', None)
p = ThreadPool(1)
res = p.apply_async(func, args=args)
try:
out = res.get(timeout) # Wait timeout seconds for func to complete.
return out
except multiprocessing.TimeoutError:
print("Aborting due to timeout")
raise
finally:
p.close()
p.join()
if __name__ == "__main__":
pool = multiprocessing.Pool(maxtasksperchild=1)
featureClass = [[1000,k,1] for k in range(start,end,step)] #list of arguments
for f in featureClass:
abortable_func = partial(abortable_worker, worker, timeout=3)
pool.apply_async(abortable_func, args=f,callback=collectMyResult)
pool.close()
pool.join()
任何会引发超时的函数multiprocessing.TimeoutError
。请注意,这意味着发生超时时您的回调将不会执行。如果这不可接受,只需更改except
块的abortable_worker
返回一些东西而不是调用raise
.
另请记住,每次任务执行后重新启动工作进程都会对性能产生负面影响Pool
,由于开销增加。您应该针对您的用例进行衡量,看看是否值得为了能够中止工作而进行权衡。如果出现问题,您可能需要尝试其他方法,例如合作打断worker
如果它运行的时间太长,而不是试图从外部杀死它。 SO 有很多涉及这个主题的问题。