Python 中的线程队列挂起

2024-02-21

我正在尝试通过队列使解析器成为多线程。它似乎有效,但我的队列挂起。如果有人能告诉我如何解决这个问题,我将不胜感激,因为我很少编写多线程代码。

此代码从 Q 中读取:

from silk import *
import json
import datetime
import pandas
import Queue
from threading import Thread

l = []
q = Queue.Queue()

def parse_record():
    d = {}
    while not q.empty():
        rec = q.get()
        d['timestamp'] = rec.stime.strftime("%Y-%m-%d %H:%M:%S")
        # ... many ops like this
        d['dport'] = rec.dport
        l.append(d) # l is global

这填补了问题:

def parse_records():
    ffile = '/tmp/query.rwf'
    flows = SilkFile(ffile, READ)
    numthreads = 2

    # fill queue
    for rec in flows:
        q.put(rec)
    # work on Queue    
    for i in range(numthreads):
        t = Thread(target = parse_record)
        t.daemon = True
        t.start()

    # blocking
    q.join()

    # never reached    
    data_df = pandas.DataFrame.from_records(l)
    return data_df

我只打电话parse_records()在我的主要。它永远不会终止。


The 队列.空文档 https://docs.python.org/3.6/library/queue.html#queue.Queue.empty says:

...如果empty()返回False,它不能保证对get()的后续调用不会阻塞。

至少你应该使用get_nowait或面临数据丢失的风险。但更重要的是,只有当所有排队项目都被标记为完成时,连接才会释放队列.task_done https://docs.python.org/3.6/library/queue.html#queue.Queue.task_done call:

如果 join() 当前处于阻塞状态,它将在处理完所有项目后恢复(这意味着对于已 put() 到队列中的每个项目都会收到一个 task_done() 调用)。

作为旁注,l.append(d)不是原子的,应该用锁来保护。

from silk import *
import json
import datetime
import pandas
import Queue
from threading import Thread, Lock

l = []
l_lock = Lock()
q = Queue.Queue()

def parse_record():
    d = {}
    while 1:
        try:
            rec = q.getnowait()
            d['timestamp'] = rec.stime.strftime("%Y-%m-%d %H:%M:%S")
            # ... many ops like this
            d['dport'] = rec.dport
            with l_lock():
                l.append(d) # l is global
            q.task_done()
        except Queue.Empty:
            return

通过使用标准库中的线程池,您可以大大缩短代码。

from silk import *
import json
import datetime
import pandas
import multiprocessing.pool

def parse_record(rec):
    d = {}
    d['timestamp'] = rec.stime.strftime("%Y-%m-%d %H:%M:%S")
    # ... many ops like this
    d['dport'] = rec.dport
    return d

def parse_records():
    ffile = '/tmp/query.rwf'
    flows = SilkFile(ffile, READ)
    pool = multiprocessing.pool.Pool(2)
    data_df = pandas.DataFrame.from_records(pool.map(parse_record), flows)
    pool.close()
    return data_df
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Python 中的线程队列挂起 的相关文章

随机推荐

  • 如何从外部函数更改局部静态变量值

    include
  • JQuery .ressized,如何为alsoResize属性选择子级

    我有弹出的窗口 可以拖动和调整大小 一切都很好 除了我需要调整大小窗口来调整其中的 div 大小 这可以通过设置轻松完成alsoResize selector 但是 此窗口的每个实例都具有相同的 div 和相同的类名 如果调整一个窗口的大小
  • openOptionsMenu 不适用于全屏

    我有一个全屏模式的活动 android theme android style Theme NoTitleBar Fullscreen 我用按钮打开选项菜单 dmenu setOnClickListener new OnClickListe
  • ggsurvplot - 轴交叉于 0,0

    Survminer产生不错的情节 但有没有办法进一步改变常规的结果ggplot 命令 我尝试做的是使 y 轴从原点开始 如上所述here https stackoverflow com questions 13701347 force th
  • 使用 ObjectDB 搜索空用户数据库时出现问题

    我正在创建一个 java 应用程序 它使用 ObjectDB 来创建和维护一组数据库 我目前正在尝试实现一个数据库来存储由用户名和密码字符串组成的用户对象 在 JFrame swing 类上 我有一个用于创建新用户的按钮 单击此按钮时 我希
  • 通过周围像素的平均值去除图像中的孔

    任何人都可以帮助我用从相邻非零像素获取的值来填充这些黑洞 谢谢 做到这一点的一个好方法是解决线性热方程 http en wikipedia org wiki Heat equation 你要做的就是修复好区域像素的 温度 强度 让热量流入坏
  • 基本 Node.js 项目的“属性‘程序’不存在”

    我创建了简单的node js应用程序 源代码来自这里https azure microsoft com en us blog visual studio code and azure app service a perfect fit ht
  • 明显的 BufferBlock.Post/Receive/ReceiveAsync 竞赛/bug

    交叉发布到http social msdn microsoft com Forums en US tpldataflow thread 89b3f71d 3777 4fad 9c11 50d8dc81a4a9 http social msd
  • 如何使 Onboarding 与 iOS13 中的 Scene Delegate 配合使用?

    我正在尝试在 SceneDelegate 中设置我的入门屏幕 当我运行下面的代码时 它可以编译 但只是进入黑屏 其中有很多针对 AppDelegate 的精彩入门教程 但针对 iOS13 的新 SceneDelegate 的入门教程却很少
  • 如何查看 /bin/sh 指向的内容

    我正在阅读 bin sh 和 bin bash 之间的差异 并遇到了这个有趣的问题 答案 here https stackoverflow com questions 5725296 difference between sh and ba
  • PHP:如果!空&空

    所以我有这个表格 有2个字段 Youtube 和 链接 我想做的如果已经填写了YouTube 应该这样做 if empty youtube if pos false echo Du skal indtaste youtube et URL
  • 目前的iphone版本是否支持彩信? [复制]

    这个问题在这里已经有答案了 可能的重复 是否可以使用 iPhone SDK 发送图片消息 https stackoverflow com questions 5150271 is it possible to send a picture
  • 实体框架 ObjectContext 分享 - 优缺点

    在我的项目中 我使用实体框架 4 0 作为 ORM 将数据保存在 SQL Server 中 我的项目是应用程序的功能区 主窗体中有网格视图和导航树 其顶部有功能区面板 我的应用程序基本上是一个 CRUD UI 几乎没有业务逻辑 第一次使用
  • 位域元素的默认值

    在 C 11 中可以做 struct S int i 42 如果忘记初始化成员i它 默认初始化为 42 我刚刚尝试过 位域为 struct S int i 42 5 我正在得到 错误 预期为 在 标记之前 位域成员是否存在此功能 如果存在
  • JNI 的用处[关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi
  • 获取文本字段中最常用的 10 个单词

    我有一个包含数千个文档的索引 每个文档都有一个全文字段 我想搜索所有这些字段并获取最常出现的 10 个最常见的单词 如果可能的话 我还想要一种在 Kibana 上可视化它的方法 实现此目的的最常见方法是使用keyword datatype
  • Drupal CCK 的复选框

    我是 Drupal 的新人 到目前为止很喜欢 我正在创建 CCK 自定义内容类型 我需要以复选框格式制作便利设施列表 所以我做了 文件类型 Text 小部件类型 复选框 单选按钮 和允许值列表 onsite dining 现场用餐 Meet
  • 如何在启动时运行命令?

    我试图弄清楚如何在启动时运行命令 就像我将其输入控制台一样 我在 Raspberry Pi 上使用 Rasbian 但我认为这个问题对于 Debian 来说通常是相同的 我尝试运行的命令是 sudo screen mono server e
  • Rails 6 无法连接到 AWS Elastic Beanstalk 预置的 RDS。 Unix 域套接字“/var/run/postgresql/.s.PGSQL.5432”

    我在尝试向 Elastic Beanstalk 启动示例 Rails 6 应用程序时遇到了非常困难的情况 对于上下文 我遵循这些说明 将 RDS 添加到 Ruby 应用程序 https docs aws amazon com elastic
  • Python 中的线程队列挂起

    我正在尝试通过队列使解析器成为多线程 它似乎有效 但我的队列挂起 如果有人能告诉我如何解决这个问题 我将不胜感激 因为我很少编写多线程代码 此代码从 Q 中读取 from silk import import json import dat