我想知道多重处理是如何正确完成的。假设我有一个清单[1,2,3,4,5]
由函数生成f1
这是写到Queue
(左绿色圆圈)。现在我启动两个从该队列中提取的进程(通过执行f2
在过程中)。他们处理数据,例如:将值加倍,并将其写入第二个队列。现在,函数f3
读取该数据并将其打印出来。
在函数内部有一种循环,试图永远从队列中读取。我该如何停止这个过程?
Idea 1
f1
不仅发送列表,还发送None
对象或自定义对象,class PipelineTerminator: pass
或者一些这样的东西正在一路向下传播。f3
现在等待None
来了,当它在那里时,它就脱离了循环。问题:两者之一可能f2
s 读取并传播None
而另一个仍在处理数字。然后最后一个值就丢失了。
Idea 2
f3
is f1
。所以函数f1
生成数据和管道,生成进程f2
并提供所有数据。在产卵和进食后,它会监听第二个管道,简单地计算和处理接收到的对象。因为它知道输入了多少数据,所以它可以终止正在执行的进程f2
。但如果目标是建立处理管道,则不同的步骤应该是可分离的。所以f1
, f2
and f3
是管道的不同元素,并且昂贵的步骤是并行完成的。
Idea 3
管道的每个部分都是一个函数,该函数根据需要生成进程并负责管理它们。它知道有多少数据传入以及有多少数据已返回(使用yield
或许)。所以传播是安全的None
object.
setup child processes
execute thread one and two and wait until both finished
thread 1:
while True:
pull from input queue
if None: break and set finished_flag
else: push to queue1 and increment counter1
thread 2:
while True:
pull from queue2
increment counter2
yield result
if counter1 == counter2 and finished_flag: break
when both threads finished: kill process pool and return.
(除了使用线程之外,也许人们可以想到一种更聪明的解决方案。)
So ...
我已经按照想法 2 实现了一个解决方案,输入并等待结果到达,但它并不是真正的将独立功能插入在一起的管道。它可以完成我必须管理的任务,但很难维护。
我现在想听听您如何实现管道(在一个进程中使用生成器函数等很容易,但在多个进程中?)并通常管理它们。