您可以使用Pebble https://pypi.python.org/pypi/Pebble为此的图书馆。
from pebble import concurrent
from concurrent.futures import TimeoutError
TIMEOUT_IN_SECONDS = 10
@concurrent.process(timeout=TIMEOUT_IN_SECONDS)
def function(foo, bar=0):
return foo + bar
future = function(1, bar=2)
try:
result = future.result() # blocks until results are ready or timeout
except TimeoutError as error:
print "Function took longer than %d seconds" % error.args[1]
result = 'timeout'
The 文档 https://pebble.readthedocs.io有更完整的例子。
如果超时,库将终止该函数,因此您无需担心 IO 或 CPU 被浪费。
EDIT:
如果您正在做作业,您仍然可以查看its https://github.com/noxdafox/pebble执行。
简短的例子:
from multiprocessing import Pipe, Process
def worker(pipe, function, args, kwargs):
try:
results = function(*args, **kwargs)
except Exception as error:
results = error
pipe.send(results)
pipe = Pipe(duplex=False)
process = Process(target=worker, args=(pipe, function, args, kwargs))
if pipe.poll(timeout=5):
process.terminate()
process.join()
results = 'timeout'
else:
results = pipe.recv()
Pebble 提供了简洁的 API,处理极端情况并使用更强大的机制。但这或多或少就是它在幕后所做的事情。