跨多个进程使用双端队列对象

2024-02-18

我试图减少读取大约 100,000 个条目的数据库的处理时间,但我需要它们以特定的方式格式化,为了尝试做到这一点,我尝试使用 pythonmultiprocessing.map函数工作完美,只是我似乎无法获得任何形式的队列引用来跨它们工作。

我一直在使用来自在 python 中填充队列并管理多处理 https://stackoverflow.com/questions/17241663/filling-a-queue-and-managing-multiprocessing-in-python指导我跨多个进程使用队列,以及在线程中使用全局变量 https://stackoverflow.com/questions/19790570/using-a-global-variable-with-a-thread指导我跨线程使用全局变量。我已经让软件可以工作了,但是当我在运行该过程后检查列表/队列/字典/映射长度时,它总是返回零

我写了一个简单的例子来说明我的意思: 您必须将脚本作为文件运行,map's initialize函数在解释器中不起作用。

from multiprocessing import Pool
from collections import deque

global_q = deque()

def my_init(q):
    global global_q
    global_q = q
    q.append("Hello world")


def map_fn(i):
    global global_q
    global_q.append(i)


if __name__ == "__main__":
    with Pool(3, my_init, (global_q,)) as pool:
        pool.map(map_fn, range(3))
    for p in range(len(global_q)):
        print(global_q.pop())

理论上,当我使用以下方法将队列对象引用从主线程传递到工作线程时pool函数,然后使用给定函数初始化该线程的全局变量,然后当我从map函数之后,该对象引用应该仍然指向原始队列对象引用(长话短说,所有东西都应该在同一个队列中结束,因为它们都指向内存中的相同位置)。

所以,我期望:

Hello World
Hello World
Hello World
1
2
3

当然,1, 2, 3的顺序是任意的,但是您将在输出中看到的是''.

为什么当我将对象引用传递给pool函数,什么也没发生?


这是一个如何通过扩展在进程之间共享某些内容的示例multiprocessing.managers.BaseManager支持类deques.

有一个定制经理人 https://docs.python.org/3/library/multiprocessing.html#customized-managers文档中有关创建它们的部分。

import collections
from multiprocessing import Pool
from multiprocessing.managers import BaseManager


class DequeManager(BaseManager):
    pass

class DequeProxy(object):
    def __init__(self, *args):
        self.deque = collections.deque(*args)
    def __len__(self):
        return self.deque.__len__()
    def appendleft(self, x):
        self.deque.appendleft(x)
    def append(self, x):
        self.deque.append(x)
    def pop(self):
        return self.deque.pop()
    def popleft(self):
        return self.deque.popleft()

# Currently only exposes a subset of deque's methods.
DequeManager.register('DequeProxy', DequeProxy,
                      exposed=['__len__', 'append', 'appendleft',
                               'pop', 'popleft'])


process_shared_deque = None  # Global only within each process.

def my_init(q):
    """ Initialize module-level global. """
    global process_shared_deque
    process_shared_deque = q
    q.append("Hello world")


def map_fn(i):
    process_shared_deque.append(i)  # deque's don't have a "put()" method.


if __name__ == "__main__":
    manager = DequeManager()
    manager.start()
    shared_deque = manager.DequeProxy()

    with Pool(3, my_init, (shared_deque,)) as pool:
        pool.map(map_fn, range(3))

    for p in range(len(shared_deque)):  # Show left-to-right contents.
        print(shared_deque.popleft())

Output:

Hello world
0
1
2
Hello world
Hello world
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

跨多个进程使用双端队列对象 的相关文章

  • Firefox 中的文件下载对话框

    我正在使用firefox进行selenium python编程 自动开始下载并保存文件 我已经完成了所有操作 但无法下载csv文件 我的python版本是2 6 6 我的selenium版本是最新版本 我也尝试使用以下链接 即 fp web
  • c++11 正则表达式比 python 慢

    嗨我想了解为什么以下代码使用正则表达式进行分割字符串分割 include
  • 如何在anaconda python 3.6上安装tensorflow

    我使用 anaconda 包安装了新版本的 python 3 6 但是我无法安装张量流 总是收到这样的错误 tensorflow gpu 1 0 0rc2 cp35 cp35m win amd64 whl 在此平台上不受支持 如何在 ana
  • Native TF 与 Keras TF 性能比较

    我使用本机和后端张量流创建了完全相同的网络 但在使用多个不同参数进行了多个小时的测试后 仍然无法弄清楚为什么 keras 优于本机张量流并产生更好 稍微但更好 的结果 Keras 是否实现了不同的权重初始化方法 或者执行除 tf train
  • 熊猫加入具有不同索引级别/日期时间的数据帧?

    嗨 我有两个 DataFrame 如下所示 dineType menuName unique columns date y m d
  • Python 正则表达式从文本中提取域

    我有以下正则表达式 r a zA Z0 9 a zA Z0 9 61 a zA Z0 9 a zA Z 2 6 当我将其应用于文本字符串时 比方说 这是 www website1 com 这是 website2 com 我得到 www we
  • 绘制对数轴

    我想使用 matplotlib 绘制一张带有一个对数轴的图 我一直在阅读文档 但无法弄清楚语法 我知道这可能很简单 scale linear 在情节争论中 但我似乎无法正确理解 示例程序 import pylab import matplo
  • 为什么 pandas.DataFrame.update 会更改更新后的数据帧的数据类型?

    出于显而易见的原因 我想在更新后将列的数据类型保留为 int 有什么想法为什么这不能按预期工作吗 import pandas as pd df1 pd DataFrame a 1 b 2 c foo a 3 b 4 c baz df2 pd
  • 如何使用格式保存 Tkinter 文本小部件的内容

    我在 python 中使用 Tkinter 在文本窗口中显示输出 我发现使用 get 功能我可以从此窗口检索文本内容 但我有用不同背景颜色标记的文本部分 是否可以将内容与这些颜色一起复制到文件 例如 html 或 doc 中 没有对你想要的
  • 在Python中整齐地绘制PMF

    有没有一个库可以帮助我在 python 中整齐地绘制样本的概率质量函数 如下所示 通过matplotlib pyplot的stem模块 matplotlib pyplot stem args kwargs from matplotlib p
  • R.scale() 和 sklearn.preprocessing.scale() 之间的区别

    我目前正在将数据分析从 R 转移到 Python 当在 R 中缩放数据集时 我将使用 R scale 根据我的理解 它将执行以下操作 x mean x sd x 为了替换该函数 我尝试使用 sklearn preprocessing sca
  • 使用 python boto3 管理 Route53 中具有多个 IP 的 A 记录

    我的route53中有一条A记录 后面有多个IP 例子 A record dummy xyz com 点IPs 1 1 1 1 2 2 2 2 和 3 3 3 3路由策略 Simple 我使用下面的代码来更新单个 IP 的记录 Change
  • LogRecord 没有预期的字段

    在使用 logging 模块的Python中 文档承诺LogRecord实例将具有许多属性 这些属性在文档中明确列出 然而 情况似乎并不总是如此 当我不使用日志记录模块的 basicConfig 方法时 下面的程序显示属性 asctime
  • __author__ 的起源是什么?

    使用私有元数据变量的约定在哪里 author 一个模块内部从何而来 This http mail python org pipermail python dev 2001 March 013328 htmlPython 邮件列表线程似乎暗示
  • 在python中检测按下了哪些键

    我需要知道现在按下的是哪个键 我不想捕获一些特定的按键来触发事件或类似的事情 我想知道现在按下了哪些键并显示它们的列表 我还需要捕获特殊键 如 F1 F12 shift alt home windows 等 基本上是键盘上的所有键 我如何在
  • 如果任何单元测试失败,如何使 Python 的覆盖率工具失败?

    我想使用 shell 脚本来确保我的单元测试通过and我的代码有足够的测试覆盖率 我只想运行我的测试代码once 我希望我可以通过coverage https coverage readthedocs io 工具和单次运行的工具 如果一项或
  • Python:Factory Boy 生成对象创建时指定长度的列表

    我正在尝试使用 Factoryboy 在创建时指定长度的对象中创建一个列表 我可以创建列表 但由于提供的长度 大小的惰性性质 每次尝试创建具有指定长度的列表都会导致问题 这是我到目前为止所拥有的 class FooFactory facto
  • Django - 渲染到字符串无法加载 CSS

    我正在尝试使用 Django 1 8 render to string 通过管理命令将 html 转换为 pdf 而不是使用 View request 以下代码可以将模板转换为 pdf 但它无法将 CSS 加载到模板中 def html t
  • Python struct.pack() 'struct.error: bad char in struct format' 尝试保存字节顺序时

    我正在尝试打包一个字符串和字符串的长度 fmt
  • 如何通过解析导入来组合并获取单个 Python 文件

    我正在尝试获取单个 Python 文件作为输出 我有一个 Python 脚本 其中有多个此类导入 from that import sub 导入来自所有本地模块 而不是来自系统或 Python 库 有什么方法可以解决这些问题并获得一个完整的

随机推荐