多线程检查队列中的成员资格并停止线程

2023-12-04

我想使用 2 个线程迭代列表。一个来自前导,另一个来自尾随,并将元素放在一个Queue在每次迭代中。但在将值放入之前Queue我需要检查其中的值是否存在Queue(当其中一个线程将该值放入Queue),所以当发生这种情况时,我需要停止线程并返回每个线程的遍历值列表。

这是我到目前为止所尝试过的:

from Queue import Queue
from threading import Thread, Event

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, Verbose=None):
        Thread.__init__(self, group, target, name, args, kwargs, Verbose)
        self._return = None
    def run(self):
        if self._Thread__target is not None:
            self._return = self._Thread__target(*self._Thread__args,
                                                **self._Thread__kwargs)
    def join(self):
        Thread.join(self)
        return self._return

main_path = Queue()

def is_in_queue(x, q):
   with q.mutex:
      return x in q.queue

def a(main_path,g,l=[]):
  for i in g:
    l.append(i)
    print 'a'
    if is_in_queue(i,main_path):
      return l
    main_path.put(i)

def b(main_path,g,l=[]):
  for i in g:
    l.append(i)
    print 'b'
    if is_in_queue(i,main_path):
      return l
    main_path.put(i)

g=['a','b','c','d','e','f','g','h','i','j','k','l']

t1 = ThreadWithReturnValue(target=a, args=(main_path,g))
t2 = ThreadWithReturnValue(target=b, args=(main_path,g[::-1]))
t2.start()
t1.start()
# Wait for all produced items to be consumed
print main_path.join()

I used ThreadWithReturnValue这将创建一个返回值的自定义线程。

为了进行成员资格检查,我使用了以下函数:

def is_in_queue(x, q):
   with q.mutex:
      return x in q.queue

现在如果我首先开始t1然后是t2我会得到12a然后一个b那么它不会做任何事情,我需要手动终止 python!

但如果我首先运行t2 then t1我会得到以下结果:

b
b
b
b
 ab

ab
b

b
b
 b
a
a

所以我的问题是为什么 python 在这种情况下表现不同?我怎样才能终止线程并使它们相互通信?


在我们遇到更大的问题之前,你没有使用Queue.join right.

该函数的全部要点是,将一堆项目添加到队列中的生产者可以等待,直到消费者完成对所有这些项目的处理。这是通过让消费者调用来实现的task_done当他们完成完成的每个项目后get。一旦有那么多task_done调用为put调用,队列完成。你不是在做一个get任何地方,更不用说task_done,所以队列不可能完成。所以,这就是为什么在两个线程完成后永远阻塞的原因。


这里的第一个问题是您的线程在实际同步之外几乎不做任何工作。如果他们唯一做的就是争夺队列,那么一次只有一个人能够运行。

当然,这在玩具问题中很常见,但你必须思考真正的问题:

  • 如果您正在执行大量 I/O 工作(侦听套接字、等待用户输入等),则线程可以很好地工作。
  • 如果您正在执行大量 CPU 工作(计算素数),则由于 GIL,线程在 Python 中无法工作,但进程可以。
  • 如果您实际上主要处理同步单独的任务,那么任何一个都不会很好地工作(并且流程会更糟)。可能仍然是simpler从线程的角度思考,但这将是最慢的做事方式。您可能想研究一下协程;格雷格·尤因有一个很棒的示范如何使用yield from使用协程来构建诸如调度程序或多参与者模拟之类的东西。

接下来,正如我在上一个问题中提到的,使线程(或进程)在共享状态下高效工作需要尽可能短地持有锁。

因此,如果您必须在锁下搜索整个队列,那么最好是恒定时间搜索,而不是线性时间搜索。这就是为什么我建议使用类似的东西OrderedSet食谱而不是list,就像 stdlib 中的那样Queue.Queue。那么这个函数:

def is_in_queue(x, q):
   with q.mutex:
      return x in q.queue

…仅阻塞队列一小部分时间 - 只需足够长的时间来查找表中的哈希值,而不是足够长的时间来比较队列中的每个元素x.


最后,我试图解释你的另一个问题的竞争条件,但让我再试一次。

您需要锁定代码中每个完整的“事务”,而不仅仅是单个操作。

例如,如果您这样做:

with queue locked:
    see if x is in the queue
if x was not in the queue:
    with queue locked:
        add x to the queue

…那么当您检查时 x 总是可能不在队列中,但在您解锁它和重新锁定它之间的时间里,有人添加了它。这正是两个线程可能提前停止的原因。

要解决这个问题,您需要对整个事情加锁:

with queue locked:
    if x is not in the queue:
        add x to the queue

当然这直接违背了我之前所说的尽可能短的锁定队列的时间。事实上,简而言之,这就是多线程变得困难的原因。编写安全代码很容易,只需在可能需要的时间内锁定所有内容,但随后您的代码最终仅使用单个核心,而所有其他线程都被阻止等待锁定。编写快速代码很容易,只需尽可能简短地锁定所有内容,但这样就不安全了,你会得到垃圾值,甚至到处崩溃。弄清楚什么需要成为事务,以及如何最大程度地减少这些事务内的工作,以及如何处理您可能需要的多个锁,以使其工作而不造成死锁……这并不那么容易。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

多线程检查队列中的成员资格并停止线程 的相关文章

  • Virtualenv 在 OS X Yosemite 上失败并出现 OSError

    我最近更新到 OSX Yosemite 现在无法使用virtualenv pip 每当我执行 virtualenv env 它抛出一个 OSError Command Users administrator ux env bin pytho
  • 多处理中的动态池大小?

    有没有办法动态调整multiprocessing Pool尺寸 我正在编写一个简单的服务器进程 它会产生工作人员来处理新任务 使用multiprocessing Process对于这种情况可能更适合 因为工作人员的数量不应该是固定的 但我需
  • 如何返回 cost, grad 作为 scipy 的 fmin_cg 函数的元组

    我怎样才能使 scipy 的fmin cg使用一个返回的函数cost and gradient作为元组 问题是有f对于成本和fprime对于梯度 我可能必须执行两次操作 非常昂贵 grad and cost被计算 此外 在它们之间共享变量可
  • Series.sort() 和 Series.order() 有什么区别?

    s pd Series nr randint 0 10 5 index nr randint 0 10 5 s Output 1 3 7 6 2 0 9 7 1 6 order 按值排序并返回一个新系列 s order Output 2 0
  • 创建上下文后将 jar 文件添加到 pyspark

    我正在笔记本上使用 pyspark 并且不处理 SparkSession 的创建 我需要加载一个包含一些我想在处理 rdd 时使用的函数的 jar 您可以使用 jars 轻松完成此操作 但在我的特定情况下我无法做到这一点 有没有办法访问sp
  • 使用 Paramiko 进行 DSA 密钥转发?

    我正在使用 Paramiko 在远程服务器上执行 bash 脚本 在其中一些脚本中 存在与其他服务器的 ssh 连接 如果我只使用 bash 不使用 Python 我的 DSA 密钥将被第一个远程服务器上的 bash 脚本转发并使用 以连接
  • 使用python从gst管道抓取帧到opencv

    我在用着OpenCV http opencv org 和GStreamer0 10 我使用此管道通过自定义套接字通过 UDP 接收 MPEG ts 数据包sockfd由 python 提供并显示它xvimagesink 而且效果很好 以下命
  • 在Python上获取字典的前x个元素

    我是Python的新手 所以我尝试用Python获取字典的前50个元素 我有一本字典 它按值降序排列 k 0 l 0 for k in len dict d l 1 if l lt 51 print dict 举个小例子 dict d m
  • python 中的 <> 运算符有什么作用?

    我刚刚遇到这个here http www feedparser org feedparser py 总是这样使用 if string1 find string2 lt gt 1 pass 什么是 lt gt 运算符这样做 为什么不使用通常的
  • python中basestring和types.StringType之间的区别?

    有什么区别 isinstance foo types StringType and isinstance foo basestring 对于Python2 basestring是两者的基类str and unicode while type
  • 从 Flask 运行 NPM 构建

    我有一个 React 前端 我想在与我的 python 后端 API 相同的源上提供服务 我正在尝试使用 Flask 来实现此目的 但我遇到了 Flask 找不到我的静态文件的问题 我的前端构建是用生成的npm run build in s
  • pandas 相当于 np.where

    np where具有向量化 if else 的语义 类似于 Apache Spark 的when otherwise数据帧方法 我知道我可以使用np where on pandas Series but pandas通常定义自己的 API
  • 如何查找或安装适用于 Python 的主题 tkinter ttk

    过去 3 个月我一直在制作一个机器人 仅用代码就可以完美运行 现在我的下一个目标是为它制作一个 GUI 但是我发现了一些障碍 主要的一个是能够看起来不像一个 30 年前的程序 我使用的是 Windows 7 我仅使用 Python 3 3
  • Ubuntu systemd 自定义服务因 python 脚本而失败

    希望获得有关 Ubuntu 中的 systemd 守护进程服务的一些帮助 我写了一个 python 脚本来禁用 Dell XPS 上的触摸屏 这更像是一个问题 而不是一个有用的功能 该脚本可以工作 但我不想一直启动它 这就是为什么我想到编写
  • 检测是否从psycopg2游标获取?

    假设我执行以下命令 insert into hello username values me 我跑起来就像 cursor fetchall 我收到以下错误 psycopg2 ProgrammingError no results to fe
  • AWS Lambda 不读取环境变量

    我正在编写一个 python 脚本来查询 Qualys API 中的漏洞元数据 我在 AWS 中将其作为 lambda 函数执行 我已经在控制台中设置了环境变量 但是当我执行函数时 出现以下错误 module initialization
  • 每个客户端一个线程与线程服务器的排队线程模型之间的相对优点?

    假设我们正在构建一个线程服务器 旨在在具有四个核心的系统上运行 我能想到的两种线程管理方案是每个客户端连接一个线程和一个排队系统 正如第一个系统的名称所暗示的那样 我们将为每个连接到服务器的客户端生成一个线程 假设一个线程始终专用于程序的主
  • 将 Keras 集成到 SKLearn 管道?

    我有一个 sklearn 管道 对异构数据类型 布尔 分类 数字 文本 执行特征工程 并想尝试使用神经网络作为我的学习算法来拟合模型 我遇到了输入数据形状的一些问题 我想知道我想做的事情是否可能 或者我是否应该尝试不同的方法 我尝试了几种不
  • 如何使用 python 定位和读取 Data Matrix 代码

    我正在尝试读取微管底部的数据矩阵条形码 我试过libdmtx http libdmtx sourceforge net 它有 python 绑定 当矩阵的点是方形时工作得相当好 但当矩阵的点是圆形时工作得更糟 如下所示 另一个复杂问题是在某
  • 如何(安全)将 Python 对象发送到我的 Flask API?

    我目前正在尝试构建一个 Flask Web API 它能够在 POST 请求中接收 python 对象 我使用 Python 3 7 1 创建请求 使用 Python 2 7 运行 API 该 API 设置为在我的本地计算机上运行 我试图发

随机推荐

  • 如何使 std::istream_iterator 只读直到行尾?

    有以下代码 std vector
  • 如何保持打开的 xml 文档的样式

    我使用开放 XML Microsoft Word docx 作为文件模板来自动生成其他文档 在模板文档中 我定义了内容控件 并编写了代码来替换这些内容控件中的内容 内容被替换并生成文档 但我正在努力保持风格 在Word中 在检查内容控件的属
  • 将一个 div 放在两个垂直 div 旁边

    我试图让一个 div 在其容器中的高度为 100 其高度为 50 并在其旁边有两个 div 每个 div 的高度为 50 这是我的意思的一个例子 我还想在所有 div 之间留有边距 如上图所示 到目前为止 这是我的代码 div style
  • 使用正则表达式来匹配具有特定ID的div块[重复]

    这个问题在这里已经有答案了 我正在尝试匹配具有特定 id 的 div 块 这是我的正则表达式代码
  • 两个同一时间谷歌表格的价值差异[重复]

    这个问题在这里已经有答案了 为什么两个相同时间的值不同 因此无法使用vlookup 注意 时间取自两个不同的来源 一个是从网站生成的 CSV 文件 另一个是手动输入的 Example 时间值 上午 10 00 0 4166666666666
  • 将 PHPExcel 与 Composer 和 Symfony2.2 结合使用

    我在SO上找到了这个 如何在 Symfony 2 中正确使用 PHPExcel 这可行 但我想与作曲家一起使用它 我已经解决的第一部分 为特殊标签加载 PHPExcel 最后一个稳定版本 我不知道如何使用以下语法获取标签 repositor
  • 枚举中的数字常量 (c#)

    我正在 SharePoint Web 部件中创建此选择框 并且需要包含当前版本的下拉菜单 因此我需要使用枚举 public enum SelectVersionEnum 2010 2007 好吧 你可以看到它在哪里中断 有没有办法在枚举中使
  • 为什么 XmlDocument Validate 事件处理程序没有被命中?

    我有这个代码 Load the document XmlDocument xmlDocument new XmlDocument use the stream and have it close when it is finished us
  • C++ 可变参数模板和求值顺序

    我有以下代码 lib hxx template
  • Ado.net (2.0+) 连接池是在应用程序域之前还是每个进程

    我试图理解汇集理论 ADO NET 和 SQL Server 之间的交互要好得多 但尚未找到明确的答案 我一直假设每个进程 但我突然想到它可能是每个应用程序域 任何深入的参考文献也将不胜感激 连接池是一个复杂的野兽 因为它们是在几个不同的范
  • 如何使用 Node 中的 Promises 一次并行异步多个请求

    数组和循环 但我希望能够并行运行所有它们 因为我不想一个接一个地运行 我基本上想将所有端点调用状态代码 正文和时间存储为数组 并将它们作为结果返回 无论端点中是否存在错误 我正在使用 Bluebird 如何利用它的功能来解决这个问题 您可以
  • 每个子图的旋转轴文本

    我正在尝试绘制散点矩阵 我正在建立这个线程中给出的示例matplotlib中有制作散点图矩阵的函数吗 在这里 我只是稍微修改了代码 使轴对所有子图都可见 修改后的代码如下 import itertools import numpy as n
  • 如何将 c# 的 byte[] 转换为 java byte[]

    我有这段 C 代码 byte t 6 250 215 但在Java中是 byte t 6 6 41 如何解决这个问题呢 如何解决这个问题呢 第一个是关于java如何表示数据类型 byte 字节数据类型是 8 位有符号二进制补码整数 它的最小
  • 如何使用 redux 存储变量更新 FieldArray 元素

    我将 redux form 与 FieldArray 一起使用 默认情况下 数组中将有 1 个元素 并且它是从 JSON 填充的 我最多可以添加 3 个 FieldArray 组件中的元素 在下面的代码中 elementList 属性来自
  • Android模拟器UDP无法接收;在手机上运行良好

    您好 我正在尝试连接到网络上的盒子 它上面有一个正在运行的 UDP 服务器 使用下面的代码 我可以与盒子通信并从我的手机发送 接收 UDP 数据包 但是 我不知道如何使用 android 模拟器进行设置 我在 StackOverflow 以
  • Firebase 合并 Android 中的类似通知

    我们在 SIP 应用程序中使用 Firebase 以便在应用程序离线时向我们发送未接来电通知和聊天通知 虽然发送和接收工作正常 但我们对 Android 客户端产生了影响 5 个未接来电 obv 会生成 5 个未接来电通知 填满客户端设备上
  • SQL语句证明R(ABCD)中的A->B

    如何编写一个 SQL 语句来证明函数依赖 A B 在已知没有记录具有 NULL 值的情况下与属性 ABCD 存在关系 SELECT from R r1 R r2 where r1 A r2 A and r1 B lt gt r2 B 如果
  • 如何隐藏 Google Blogger 主页上具有特定标签的所有帖子?

    让我们考虑一下 编码 技术 日记 是博客上的一些标签 我不想在主页上显示所有 日记 标签的帖子 以及我其余的标签帖子 那么我想做什么 我希望 XML 代码能够隐藏主页中带有一些标签的帖子 我在 yahoo google yandex 等搜索
  • 使用 python 文件启动 ipython 笔记本

    我对 python ipython 不太熟悉 但有人问我是否可以使用特定的 python 文件启动 ipython 笔记本 然后它可以用于调试 然后 另一个软件会在临时文件夹中创建一个 py 文件 并使用该文件调用 ipython 笔记本
  • 多线程检查队列中的成员资格并停止线程

    我想使用 2 个线程迭代列表 一个来自前导 另一个来自尾随 并将元素放在一个Queue在每次迭代中 但在将值放入之前Queue我需要检查其中的值是否存在Queue 当其中一个线程将该值放入Queue 所以当发生这种情况时 我需要停止线程并返