这是一个使用工作进程集合的版本。每个工人得到source, target
从队列中配对,并将路径收集到列表中。当找到所有路径后,结果将放入输出队列中,并由主进程进行整理。
import networkx as nx
import multiprocessing as mp
import random
import sys
import itertools as IT
import logging
logger = mp.log_to_stderr(logging.DEBUG)
def worker(inqueue, output):
result = []
count = 0
for pair in iter(inqueue.get, sentinel):
source, target = pair
for path in nx.all_simple_paths(G, source = source, target = target,
cutoff = None):
result.append(path)
count += 1
if count % 10 == 0:
logger.info('{c}'.format(c = count))
output.put(result)
def test_workers():
result = []
inqueue = mp.Queue()
for source, target in IT.product(sources, targets):
inqueue.put((source, target))
procs = [mp.Process(target = worker, args = (inqueue, output))
for i in range(mp.cpu_count())]
for proc in procs:
proc.daemon = True
proc.start()
for proc in procs:
inqueue.put(sentinel)
for proc in procs:
result.extend(output.get())
for proc in procs:
proc.join()
return result
def test_single_worker():
result = []
count = 0
for source, target in IT.product(sources, targets):
for path in nx.all_simple_paths(G, source = source, target = target,
cutoff = None):
result.append(path)
count += 1
if count % 10 == 0:
logger.info('{c}'.format(c = count))
return result
sentinel = None
seed = 1
m = 1
N = 1340//m
G = nx.gnm_random_graph(N, int(1.7*N), seed)
random.seed(seed)
sources = [random.randrange(N) for i in range(340//m)]
targets = [random.randrange(N) for i in range(1000//m)]
output = mp.Queue()
if __name__ == '__main__':
test_workers()
# test_single_worker()
# assert set(map(tuple, test_workers())) == set(map(tuple, test_single_worker()))
test_workers
使用多处理,test_single_worker
使用单一进程。
Running test.py
不会引发 AssertionError,因此看起来两个函数返回相同的结果(至少对于我运行的有限测试而言)。
以下是时间结果:
% python -mtimeit -s'import test as t' 't.test_workers()'
10 loops, best of 3: 6.71 sec per loop
% python -mtimeit -s'import test as t' 't.test_single_worker()'
10 loops, best of 3: 12.2 sec per loop
因此,在这种情况下,test_workers 在 2 核系统上能够比 test_single_worker 实现 1.8 倍的加速。希望该代码也能很好地解决您的实际问题。我很想知道结果。
一些兴趣点:
- Calling
pool.apply_async
在一个短暂的函数上非常慢,
因为花费了太多的时间来传递参数并导致结果
通过队列而不是使用 CPU 进行有用的计算。
- 最好将结果收集在列表中并将完整结果放入
这
output
排队而不是将结果放入output
一处
时间。放入Queue中的每个对象都会被pickle,并且速度更快
pickle 一个大列表而不是许多小列表。
- 我认为仅通过一个过程进行打印更安全,因此打印
语句不会相互影响(导致输出混乱)。