更改 multiprocessing.Queue 中的缓冲区大小

2024-02-09

所以我有一个生产者和消费者通过无限大小的队列连接的系统,但是如果消费者重复调用 get 直到抛出 Empty 异常,它不会清除队列。

我相信这是因为一旦套接字缓冲区已满,消费者端队列中将对象序列化到套接字中的线程就会被阻塞,因此它会等待直到缓冲区有空间,但是,对于消费者来说这是可能的调用 get“太快”,因此它认为队列为空,而实际上另一侧的线程有更多数据要发送,但无法足够快地序列化它以防止套接字对消费者显示为空。

我相信如果我可以改变底层套接字上的缓冲区大小(我是基于Windows的),这个问题将会得到缓解。据我所知,我需要做的事情是这样的:

import multiprocessing.connections as conns
conns.BUFSIZE = 2 ** 16 # is typically set as 2 ** 13 for windows
import multiprocessing.Queue as q

如果我执行上述操作,这是否意味着当 multirprocssing 初始化队列时,它将使用我在已导入的 multiprocessing.connections 版本中设置的新缓冲区大小?那是对的吗?

另外我相信这只会影响 Windows,因为 BUFSIZE 在 Linux 机器上不使用,因为它们的所有套接字默认设置为 60 KB?

以前有人尝试过这个吗?这会对 Windows 产生副作用吗? Windows 上套接字缓冲区大小的基本限制是什么?

==================演示代码示例==================

# import multiprocessing.connection as conn
# conn.BUFSIZE = 2 ** 19
import sys
import multiprocessing as mp
from Queue import Empty
from time import sleep

total_length = 10**8

def supplier(q):
    print "Starting feeder"
    for i in range(total_length) :
        q.put(i)


if __name__=="__main__":

    queue = mp.Queue()

    p = mp.Process(target=supplier, args=(queue,))

    p.start()

    sleep(120)

    returned = []
    while True :
        try :
            returned.append(queue.get(block=False))
        except Empty :
            break

    print len(returned)
    print len(returned) == total_length

    p.terminate()
    sys.exit()

该示例在 Windows 上运行时,通常只会从队列中提取大约 160,000 个项目,因为主线程清空缓冲区的速度比供应商重新填充缓冲区的速度更快,并且最终当缓冲区为空时,它会尝试从队列中提取数据,并且报告说它是空的。

理论上,您可以通过更大的缓冲区大小来改善这个问题。我相信,在 Windows 系统上,顶部的两行将增加管道的默认缓冲区大小。

如果您对它们进行评论,那么该脚本将在退出之前提取更多数据,因为它的 .我的主要问题是: 1)这真的有效吗? 2)有没有办法让这段代码在windows和linux中使用相同大小的底层缓冲区 3) 为管道设置较大的缓冲区大小是否会产生任何意外的副作用。

我知道,一般来说,没有办法知道您是否已从队列中提取所有数据(- 考虑到供应商永久运行并产生非常不均匀的数据),但我正在寻找改进方法尽力而为的基础。


Update:

Windows Pipe 的有用链接,供将来需要它的人使用(该链接由 OP 提供,菲尔_20686 https://stackoverflow.com/users/2882136/phil-20686): https://msdn.microsoft.com/en-us/library/windows/desktop/aa365150(v=vs.85).aspx https://msdn.microsoft.com/en-us/library/windows/desktop/aa365150(v=vs.85).aspx

原来的:

BUFSIZE 仅在平台为 win32 时才起作用。

multiprocessing.Queue 构建在 Pipe 之上,如果更改 BUFSIZE,生成的 Queue 将使用更新后的值。见下文:

class Queue(object):

    def __init__(self, maxsize=0):
        if maxsize <= 0:
            maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX
        self._maxsize = maxsize
        self._reader, self._writer = Pipe(duplex=False)

当平台为win32时,Pipe代码将调用以下代码:

def Pipe(duplex=True):
    '''
    Returns pair of connection objects at either end of a pipe
    '''
    address = arbitrary_address('AF_PIPE')
    if duplex:
        openmode = win32.PIPE_ACCESS_DUPLEX
        access = win32.GENERIC_READ | win32.GENERIC_WRITE
        obsize, ibsize = BUFSIZE, BUFSIZE
    else:
        openmode = win32.PIPE_ACCESS_INBOUND
        access = win32.GENERIC_WRITE
        obsize, ibsize = 0, BUFSIZE

    h1 = win32.CreateNamedPipe(
        address, openmode,
        win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
        win32.PIPE_WAIT,
        1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL
        )

你可以看到,当duplex为 False,输出缓冲区大小为 0,输入缓冲区大小为 BUFSIZE。

inbuffer 是为输入缓冲区保留的字节数。 2**16=65536,是一次操作可以无阻塞写入的最大字节数,但是不同系统的缓冲区大小的容量不同,即使在同一个系统上也不同,所以很难说侧面当您将 Pipe 设置为最大数量时的效果。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

更改 multiprocessing.Queue 中的缓冲区大小 的相关文章

  • 从 Django 调用 Postgres SQL 存储过程

    我正在开发一个带有 Postgresql 数据库的 Django 项目 我编写了一个可以在 Postgres 上完美运行的存储过程 现在我想从 Django 1 5 调用该存储过程 我已经编写了代码 但它提示错误 CREATE FUNCTI
  • 从 SHAP 值中获取特征重要性

    我想要获得重要功能的数据框 通过下面的代码 我得到了 shap values 但我不确定这些值的含义是什么 在我的 df 中有 142 个特征和 67 个实验 但得到了一个带有 ca 的数组 2500 个值 explainer shap T
  • 如何使用 colorchecker 在 opencv 中进行颜色校准?

    我有数码相机获取的色彩检查器图像 我如何使用它来使用 opencv 校准图像 按照以下颜色检查器图像操作 您是想问如何进行颜色校准或如何使用 OpenCV 进行校准 为了进行颜色校准 您可以使用校准板的最后一行 灰色调 以下是您应该逐步进行
  • numpy python 中的“AttributeError:'matrix'对象没有属性'strftime'”错误

    我有一个维度为 72000 1 的矩阵 该矩阵涉及时间戳 我想使用 strftime 如下所示 strftime d m y 为了得到像这样的输出 11 03 02 我有这样一个矩阵 M np matrix timestamps 我使用了
  • 无故运行测试时 PyCharm 抛出“AttributeError: 'module' object has no attribute”

    因此 我有一个 Django REST Framework 项目 有一天它无法在 PyCharm 中运行测试 从命令行我可以使用它们来运行它们paver or the manage py直接地 曾经有一段时间 当我们没有在文件顶部导入类的超
  • PyTorch 给出 cuda 运行时错误

    我对我的代码做了一些小小的修改 以便它不使用 DataParallel and DistributedDataParallel 代码如下 import argparse import os import shutil import time
  • 小部件之间的自定义信号

    尝试将信号从一个 gtk EventBox 子级发送到另一个 在 init HeadMode 第 75 行 上出现错误 类型错误 未知信号名称 消息发送 why usr bin env python coding utf8 import p
  • Pandas:如何将数据框插入 Clickhouse

    我正在尝试将 Pandas 数据框插入 Clickhouse 这是我的代码 import pandas import sqlalchemy as sa uri clickhouse default localhost default ch
  • 如何在Python中按AaB而不是ABa顺序对字符串进行排序

    我正在尝试对字符串进行排序 为 punnetsquare 制作基因型 我目前的实现是 unsorted genotype ABaB sorted genotype sorted list unsorted genotype sorted s
  • Python、subprocess、call()、check_call 和 returncode 来查找命令是否存在

    我已经弄清楚如何使用 call 让我的 python 脚本运行命令 import subprocess mycommandline lumberjack sleep all night work all day subprocess cal
  • 迭代列表的奇怪速度差异

    我创建了两个重复两个不同值的长列表 在第一个列表中 值交替出现 在第二个列表中 一个值出现在另一个值之前 a1 object object 10 6 a2 a1 2 a1 1 2 然后我迭代它们 不对它们执行任何操作 for in a1 p
  • 为什么我应该使用 WSGI?

    使用 mod python 一段时间了 我读了越来越多关于 WSGI 有多好的文章 但没有真正理解为什么 那么我为什么要切换到它呢 有什么好处 这很难吗 学习曲线值得吗 为了用 Python 开发复杂的 Web 应用程序 您可能会使用更全面
  • 使用 NLP 进行地址分割

    我目前正在开发一个项目 该项目应识别地址的每个部分 例如来自 str Jack London 121 Corvallis ARAD ap 1603 973130 输出应如下所示 street name Jack London no 121
  • 从 python 检测 macOS 中的暗模式

    我正在编写一个 PyQt 应用程序 我必须添加一个补丁 以便在启用暗模式的 Macos 上可以读取字体 app QApplication Fix for the font colours on macos when running dark
  • Flask 应用程序的测试覆盖率不起作用

    您好 想在终端的 Flask 应用程序中测试 删除路由 我可以看到测试已经过去 它说 test user delete test app LayoutTestCase ok 但是当我打开封面时 它仍然是红色的 这意味着没有覆盖它 请有人向我
  • PIL - 需要抖动,但限制调色板会导致问题

    我是 Python 新手 正在尝试使用 PIL 来执行 Arduino 项目所需的解析任务 这个问题涉及到Image convert 方法以及调色板 抖动等选项 我有一些硬件能够一次仅显示 16 种颜色的图像 但它们可以指定为 RGB 三元
  • Python问题:打开和关闭文件返回语法错误

    大家好 我发现了这个有用的 python 脚本 它允许我从网站获取一些天气数据 我将创建一个文件和其中的数据集 有些东西不起作用 它返回此错误 File
  • 通过 Web 界面执行 python 单元测试

    是否可以通过 Web 界面执行单元测试 如果可以 如何执行 EDIT 现在我想要结果 对于测试 我希望它们是自动化的 可能每次我对代码进行更改时 抱歉我忘了说得更清楚 EDIT 这个答案此时已经过时了 Use Jenkins https j
  • 使用Multiprocessing和Pool时如何访问全局变量?

    我试图避免将变量冗余地传递到dataList e g 1 globalDict 2 globalDict 3 globalDict 并在全球范围内使用它们 global globalDict然而 在下面的代码中并不是这样做的解决方案 是否有
  • python 中的 after() 与 update()

    我是 python 新手 开始使用 tkinter 作为画布 到目前为止 我使用 update 来更新我的画布 但还有一个 after 方法 谁能给我解释一下这个函数 请举个例子 两者之间有什么区别 root after integer c

随机推荐

  • 如何使用命令行将 TFS 工作项链接到另一个工作项

    如何使用命令提示符将现有 TFS 工作项链接到另一个工作项 TFS 中有此命令行选项吗 我知道我可以使用 tfpt exe 创建工作项或修改它 但我找不到将工作项链接到另一个工作项的选项 假设您将使用 相关 链接类型 这应该链接您的工作项目
  • 查找连续的两行

    我正在尝试编写一个查询 该查询将从 Bill 表中拉回 Estimated 标志为 true 的最新两行 问题是这些需要是连续的账单 简而言之 如果在过去两个账单周期中估算了账单 我需要在另一个表中输入一行 如果可能的话 我想在没有游标的情
  • PHP/CSS 在字符串中查找单词,更改其颜色

    PHP CSS 在字符串中查找单词 更改其显示颜色 遇到问题 找不到解决方案 有什么建议吗 谢谢 pre span style color red span pre
  • 将变量类型的 json 转换为字符串

    我正在从 API 响应中读取 json 但遇到了一个问题 因为 json 值中有多种数据类型 字符串 null bool 此外 某些键的值可以是字符串或 null 这使得将数据读入类型变得更加困难 我想将所有内容都转换为字符串以便于处理 我
  • .net 应用程序无法连接到 DB2 数据库

    错误 08001 IBM SQL30081N 检测到通信错误 使用的通信协议 TCP IP 使用的通信 API SOCKETS 检测到错误的位置 10 66 180 30 通信功能检测到错误 连接 协议特定错误代码 10061 SQLSTA
  • 忽略 Cordova iOS 启动画面配置

    我刚刚将我的 iOS Cordova 初始屏幕重新配置为启动 Storyboard 以支持 iPhone X 这可行 但现在我在启动 Storyboard 和应用程序的第一个屏幕之间出现白色闪光 我认为这是因为在视口完全加载之前闪屏被隐藏
  • 如何在 ASP.NET 中使用 Profile?

    我尝试学习 asp net 配置文件管理 但我在下面添加了 xml 名字 姓氏等 但我不能写个人资料 如果我尝试编写 Profile 属性 卓尔我的编辑简介 错误 1 当前上下文中不存在名称 配置文件 C Documents and Set
  • 根据 GUID 获取站点 URL? (SharePoint)

    有没有代码示例向我展示如何在我知道 guid 的情况下获取站点的 url 目前 我有此代码来获取网站集中的所有网站 private void getSites SPSite oSiteCollection SPContext Current
  • 如何将“for”循环的结果保存到单个变量中? [复制]

    这个问题在这里已经有答案了 我有一个for loop for x in range 1 13 print This was the average temperature in month number str x in Boston 20
  • jQuery Chosen div 落后于 Twitter Bootstrap 手风琴

    我在 Twitter Bootstrap 手风琴中使用 jQuery Chosen 插件 我遇到的问题是所选插件的下拉菜单出现在div的手风琴菜单 我尝试设置z index到更高的值 但这并没有达到目的 我举了一个我的问题的例子 http
  • VBScript WScript.Shell Run() - 系统找不到指定的文件

    我正在尝试编写一个使用 WScript Shell 的 VBScript vbs 脚本Run 方法 但 Run 似乎找不到我传入的文件 我已将脚本简化为以下代码 该代码将重现结果 可以将其复制到文本编辑器 另存为test vbs然后跑了 T
  • scikit-learn中predict与predict_proba的区别

    假设我创建了一个模型 并且我的目标变量是0 1 or 2 看来如果我使用predict 答案是 0 1 或 2 但是如果我使用predict proba 我得到一行 每行 3 列 如下所示 例如 model Classifier It co
  • 动态 JSP 编译中导入失败

    我们有一个大型 Web 应用程序安装 使用 Apache Tomcat Jasper 和 jboss 在开发环境中 JSP 是动态编译的 不幸的是 有一个包特别是即时编译似乎无法导入 对于某些类 使用完全限定的引用而不是导入是可行的 但不适
  • Masonry JS 重叠项目

    我这里有一个非常奇怪的问题 推荐链接已删除 第一行产品项目与第二行中的项目重叠 砌体项目位于主页下方页脚上方 如您所见 Chrome 看起来有所不同 在火狐浏览器中 看起来不错 这是它在我的 chrome 中的样子 http clip2ne
  • Python属性错误:类型对象'_socketobject'没有属性'gethostbyname'

    我正在尝试在我的程序中执行此操作 dest socket gethostbyname host 我已经包括了这一行 from socket import 在文件的开头 我收到此错误 属性错误 类型对象 socketobject 没有属性 g
  • 在控制台中更改 Google Cloud Compute Engine 实例的内部静态 IP 地址

    我有一个现有的云计算引擎实例 该实例被错误地分配了错误的静态内部 IP 我无法找到使用控制台或其他方式将内部 IP 地址修改为正确值的方法 我尝试保留一个新的静态内部 IP 这很容易 但无法将其分配给现有资源 我说的是 Google Con
  • numpy.digitize 返回的值超出范围?

    我使用以下代码将数组数字化为 16 个容器 numpy digitize array bins numpy histogram array bins 16 1 我预计输出在 1 16 范围内 因为有 16 个 bin 然而 返回数组中的一个
  • 如何在Matlab中的多维数组中应用corr2函数?

    假设我有两个矩阵 A 和 B A rand 4 5 3 B rand 4 5 6 我想应用函数 corr2 来计算相关系数 corr2 A 1 B 1 corr2 A 1 B 2 corr2 A 1 B 3 corr2 A 1 B 6 co
  • Android:java.io.IOException:主机名未验证

    运行我的应用程序时出现 java io IOException 主机名未验证 我该如何解决它 java io IOException Hostname 178 61 62 140 was not verified 01 03 16 34 3
  • 更改 multiprocessing.Queue 中的缓冲区大小

    所以我有一个生产者和消费者通过无限大小的队列连接的系统 但是如果消费者重复调用 get 直到抛出 Empty 异常 它不会清除队列 我相信这是因为一旦套接字缓冲区已满 消费者端队列中将对象序列化到套接字中的线程就会被阻塞 因此它会等待直到缓