这好像是Pool.imap_unordered()
启动一个新线程来迭代步骤 1 生成的输入序列,因此我们需要从运行步骤 3 的主线程中限制该线程。Semaphore class https://docs.python.org/2/library/threading.html#semaphore-objects旨在限制一个线程与另一个线程的连接,因此我们调用acquire()
在我们生产每一行之前,以及release()
当我们消耗每一行时。如果我们以某个任意值(例如 100)启动信号量,那么它将在阻塞并等待消费者赶上之前生成 100 行的缓冲区。
import logging
import os
import multiprocessing
from threading import Semaphore
from time import sleep
logging.basicConfig(level=logging.INFO,
format='%(asctime)s:%(process)d:%(thread)d:%(message)s')
logger = logging.getLogger()
def process_step1(semaphore):
data = 'a' * 100000
for i in xrange(10000):
semaphore.acquire()
sleep(.001) # Faster than step 3.
yield data
if i % 1000 == 0:
logger.info('Producing %d.', i)
logger.info('Finished producing.')
def process_step2(data):
return data.upper()
def process_step3(up_data, semaphore):
assert up_data == 'A' * 100000
sleep(.005) # Slower than step 1.
semaphore.release()
def main():
pool = multiprocessing.Pool(processes=10)
semaphore = Semaphore(100)
logger.info('Starting.')
loader = process_step1(semaphore)
processed = pool.imap_unordered(process_step2, loader)
for i, up_data in enumerate(processed):
process_step3(up_data, semaphore)
if i % 500 == 0:
logger.info('Consuming %d, using %0.1f MB.', i, get_memory())
logger.info('Done.')
def get_memory():
""" Look up the memory usage, return in MB. """
proc_file = '/proc/{}/status'.format(os.getpid())
scales = {'KB': 1024.0, 'MB': 1024.0 * 1024.0}
with open(proc_file, 'rU') as f:
for line in f:
if 'VmSize:' in line:
fields = line.split()
size = int(fields[1])
scale = fields[2].upper()
return size*scales[scale]/scales['MB']
return 0.0 # Unknown
main()
现在内存使用量稳定了,因为生产者并没有领先消费者太多。
2016-12-01 15:52:13,833:6695:140124578850560:Starting.
2016-12-01 15:52:13,835:6695:140124535109376:Producing 0.
2016-12-01 15:52:13,841:6695:140124578850560:Consuming 0, using 255.0 MB.
2016-12-01 15:52:16,424:6695:140124578850560:Consuming 500, using 255.0 MB.
2016-12-01 15:52:18,498:6695:140124535109376:Producing 1000.
2016-12-01 15:52:19,015:6695:140124578850560:Consuming 1000, using 255.0 MB.
2016-12-01 15:52:21,602:6695:140124578850560:Consuming 1500, using 255.0 MB.
2016-12-01 15:52:23,675:6695:140124535109376:Producing 2000.
2016-12-01 15:52:24,192:6695:140124578850560:Consuming 2000, using 255.0 MB.
2016-12-01 15:52:26,776:6695:140124578850560:Consuming 2500, using 255.0 MB.
2016-12-01 15:52:28,846:6695:140124535109376:Producing 3000.
2016-12-01 15:52:29,362:6695:140124578850560:Consuming 3000, using 255.0 MB.
2016-12-01 15:52:31,951:6695:140124578850560:Consuming 3500, using 255.0 MB.
2016-12-01 15:52:34,022:6695:140124535109376:Producing 4000.
2016-12-01 15:52:34,538:6695:140124578850560:Consuming 4000, using 255.0 MB.
2016-12-01 15:52:37,128:6695:140124578850560:Consuming 4500, using 255.0 MB.
2016-12-01 15:52:39,193:6695:140124535109376:Producing 5000.
2016-12-01 15:52:39,704:6695:140124578850560:Consuming 5000, using 255.0 MB.
2016-12-01 15:52:42,291:6695:140124578850560:Consuming 5500, using 255.0 MB.
2016-12-01 15:52:44,361:6695:140124535109376:Producing 6000.
2016-12-01 15:52:44,878:6695:140124578850560:Consuming 6000, using 255.0 MB.
2016-12-01 15:52:47,465:6695:140124578850560:Consuming 6500, using 255.0 MB.
Update
如果您正在使用multiprocessing.Pool
,考虑升级到concurrent.futures.process.ProcessPoolExecutor
,因为它处理杀害工人 https://stackoverflow.com/q/61492362/4794更好的。它不影响这个问题描述的问题。