所以我有一个生产者和消费者通过无限大小的队列连接的系统,但是如果消费者重复调用 get 直到抛出 Empty 异常,它不会清除队列。
我相信这是因为一旦套接字缓冲区已满,消费者端队列中将对象序列化到套接字中的线程就会被阻塞,因此它会等待直到缓冲区有空间,但是,对于消费者来说这是可能的调用 get“太快”,因此它认为队列为空,而实际上另一侧的线程有更多数据要发送,但无法足够快地序列化它以防止套接字对消费者显示为空。
我相信如果我可以改变底层套接字上的缓冲区大小(我是基于Windows的),这个问题将会得到缓解。据我所知,我需要做的事情是这样的:
import multiprocessing.connections as conns
conns.BUFSIZE = 2 ** 16 # is typically set as 2 ** 13 for windows
import multiprocessing.Queue as q
如果我执行上述操作,这是否意味着当 multirprocssing 初始化队列时,它将使用我在已导入的 multiprocessing.connections 版本中设置的新缓冲区大小?那是对的吗?
另外我相信这只会影响 Windows,因为 BUFSIZE 在 Linux 机器上不使用,因为它们的所有套接字默认设置为 60 KB?
以前有人尝试过这个吗?这会对 Windows 产生副作用吗? Windows 上套接字缓冲区大小的基本限制是什么?
==================演示代码示例==================
# import multiprocessing.connection as conn
# conn.BUFSIZE = 2 ** 19
import sys
import multiprocessing as mp
from Queue import Empty
from time import sleep
total_length = 10**8
def supplier(q):
print "Starting feeder"
for i in range(total_length) :
q.put(i)
if __name__=="__main__":
queue = mp.Queue()
p = mp.Process(target=supplier, args=(queue,))
p.start()
sleep(120)
returned = []
while True :
try :
returned.append(queue.get(block=False))
except Empty :
break
print len(returned)
print len(returned) == total_length
p.terminate()
sys.exit()
该示例在 Windows 上运行时,通常只会从队列中提取大约 160,000 个项目,因为主线程清空缓冲区的速度比供应商重新填充缓冲区的速度更快,并且最终当缓冲区为空时,它会尝试从队列中提取数据,并且报告说它是空的。
理论上,您可以通过更大的缓冲区大小来改善这个问题。我相信,在 Windows 系统上,顶部的两行将增加管道的默认缓冲区大小。
如果您对它们进行评论,那么该脚本将在退出之前提取更多数据,因为它的 .我的主要问题是:
1)这真的有效吗?
2)有没有办法让这段代码在windows和linux中使用相同大小的底层缓冲区
3) 为管道设置较大的缓冲区大小是否会产生任何意外的副作用。
我知道,一般来说,没有办法知道您是否已从队列中提取所有数据(- 考虑到供应商永久运行并产生非常不均匀的数据),但我正在寻找改进方法尽力而为的基础。