同步异步队列

2023-12-02

我计划有一个基于异步队列的生产者-消费者实现来处理实时数据,其中以正确的时间顺序发送数据至关重要。这是它的代码片段:

async def produce(Q, n_jobs):
    for i in range(n_jobs):
        
        print(f"Producing :{i}")
        await Q.put(i)


async def consume(Q):
    while True:
        n = await Q.get()
        
        print(f"Consumed :{n}")
       
       x = do_sometask_and_return_the_result(n)
       print(f"Finished :{n} and Result: {x}")


async def main(loop):
    Q = asyncio.Queue(loop=loop, maxsize=3)
    await asyncio.wait([produce(Q, 10), consume(Q), consume(Q), consume(Q)])
    print("Done")

这里生产者生产数据并将其放入异步队列中。我有多个消费者来消费和处理数据。在查看输出时,在打印“Consumed :{n}”时保持顺序(如 1,2,3,4... 等),这完全没问题。但是,由于函数 do_sometask_and_return_the_result(n) 需要可变的时间来返回结果,因此在 n "Finished :{n}" 的下一个打印中不会保持顺序(如 2,1,4,3,5,.. .)。

由于我需要维护打印结果的顺序,有什么方法可以同步这些数据吗?即使在 do_sometask_and_return_the_result(n) 之后,我也想看到 'n' 的 1,2,3,4,.. 顺序打印。


您可以使用优先级队列系统(使用 pythonheapq库)以在作业完成后重新排序。像这样的东西。

# add these variables at class/global scope
priority_queue = []
current_job_id = 1
job_id_dict = {}

async def produce(Q, n_jobs):
    # same as above

async def consume(Q):
    while True:
        n = await Q.get()
        
        print(f"Consumed :{n}")
       
       x = do_sometask_and_return_the_result(n)
       await process_result(n, x)


async def process_result(n, x):
    heappush(priority_queue, n)
    job_id_dict[n] = x
    while current_job_id == priority_queue[0]:
        job_id = heappop(priority_queue)
        print(f"Finished :{job_id} and Result: {job_id_dict[job_id]}")
        current_job_id += 1
     


async def main(loop):
    Q = asyncio.Queue(loop=loop, maxsize=3)
    await asyncio.wait([produce(Q, 10), consume(Q), consume(Q), consume(Q)])
    print("Done")

欲了解更多信息heapq模块:https://docs.python.org/3/library/heapq.html

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

同步异步队列 的相关文章

随机推荐

  • Angular JIT 编译失败:“@angular/compiler”未加载

    Angular JIT 编译失败 angular compiler 未加载 对于生产用例 不鼓励使用 JIT 编译 请考虑使用 AOT 模式 您是否使用 angular platform b rowser dynamic 或 angular
  • Scrapy 与 Selenium 爬行但不刮取

    我已阅读有关使用 scrapy 进行 AJAX 页面的所有线程 并安装了 selenium webdrive 来简化任务 我的蜘蛛可以部分爬行 但无法将任何数据获取到我的项目中 我的目标是 爬取自这一页 to 这一页 抓取每个项目 帖子 的
  • Python:导入同名的不同模块

    我正在开发一个项目 需要有多个同名的模块 这是该架构的代表性摘录 其中 init py文件来显示哪些文件夹是模块 path1 ProjectA init py src init py ctrl init py somectrl py pat
  • 无法从其他 Docker 容器连接到 MongoDB 容器

    我有两个通过 Docker compose 连接在一起的容器 我可以使用主机名从包含我的 node js 应用程序的容器成功 ping 通包含 MongoDB 的容器 然而 当实际连接到数据库时 node js 告诉我连接被拒绝 如果有帮助
  • 如何动态构建 Redux 表单?

    我正在学习如何动态构建 redux form 这个想法是 名为 componentDidMount 的组件从服务器获取项目列表并将它们插入商店 store my items 中 item1 itemB itemZZZ etc 现在这些商品已
  • ldconfig 只链接以 lib* 开头的文件?

    我正在努力让 MVTec Halcon 11 在 Ubuntu 上运行 一切都在正确的位置 但程序看不到图像采集所需的动态库 相机单独工作正常 驱动程序已安装 我将库的路径添加到 etc ld so conf然后跑了ldconfig v但目
  • 用于动态创建视频缩略图的 ffmpeg 替代方案 [关闭]

    Closed 此问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 目前不接受答案 托管我的网站的服务器没有 ffmpeg 并且不允许我安装任何其他扩展 还有其他方法可以动态制作视频缩略图吗 也许是某种网络服务 我在其中传递视频文件
  • 在 bash 中无需按 ctrl+r 即可触发反向搜索的替代方法

    The reverse i search facility in bash is useful but it is unlike most other bash commands in that it seems to be bound t
  • 在 WinRT 和 C# 中使用 HttpRequestHeaders

    我正在使用 HttpWebRequests 联系 API 需要添加标头 但编译器告诉我该方法不存在 然而 当我查看 MSDN 时 它告诉我该方法已经存在 设置我的 UserAgent 属性也失败 有人可以帮我吗 try HttpWebReq
  • 如何使用 SSIS 将大型平面文件加载到数据库表中?

    我不确定它是如何工作的 所以我正在寻找正确的解决方案 我认为 SSIS 是正确的方法 但我以前从未使用过它 设想 每天早上 我都会收到一个包含 800K 记录的制表符分隔文件 我需要将其加载到我的数据库中 从 ftp 或本地获取文件 Fir
  • 在浏览器中 Flash 至全屏

    如何使我的 Flash 应用程序在浏览器中处于全屏模式 我知道舞台可以置于该模式 但是当我在任何浏览器中运行应用程序时 这不起作用 那么 这是可以做到的 但是如何做到呢 在包含 Flash SWF 的 HTML 中 将以下参数添加到 标记中
  • 我应该如何使用 Django 中的电子签名 Web 应用程序的邀请和收件人来构建数据库实体?

    我有兴趣为我的电子签名 Web 应用程序实现以下要求 用户可以创建新的签约合同 该合同可以包括多个用户来签署 合约创建者需要提供emails的收件人 每个收件人都会分配额外的数据 例如签名详细信息 说明等 但是 受邀请的用户仍然可以系统中不
  • 为 XNA 游戏实现简单的基于 XML 的脚本语言

    我正在与一个团队合作使用 C 和 XNA 开发 RPG 引擎 我们计划针对 Windows 和 Windows Phone 7 但在过场动画期间遇到 AI 交互和控制玩家操作的问题 大多数情况下 所有内容都是使用 MVC 设计模式提取的 但
  • 在公共块中拥有全局变量是未定义的行为吗?

    0 c int i 5 int main return i 1 c int i 上面编译得很好gcc 0 c 1 c没有任何链接错误multiple definitions 原因是i生成为common blocks fcommon whic
  • ASP.NET MVC 中使用 SSL 操作过滤器重定向循环

    我正在使用 ActionFilter 见下文 来检测 1 当前控制器 操作是否需要 SSL 以及 2 当前正在使用 SSL 并相应地重定向 这在本地工作正常 在 IIS 7 中使用虚拟证书 但是一旦我在服务器上启动它 我就会收到一条错误 指
  • 在 Android 中发送 HTML 格式的电子邮件

    我已经成功创建了一个 Android 应用程序 它可以计算价格 然后能够以预先格式化的方式将该数据传输到用户选择的电子邮件程序 根据用户在应用程序中创建的数据 包含 HTML 的字符串将被读入意图 我为此的代码是 final Intent
  • 模拟使用外部类的方法,mockito

    我对mockito很陌生 只是想了解它是如何工作的 我有一个方法想要测试 该方法实例化多个类以使用其方法 e g methodToTest class1 c1 new class1 class2 c2 new class2 class3 c
  • TypeScript 条件返回值类型?

    function f x boolean string return x f true boolean string 为什么打字稿不能理解返回值是布尔值 function f x boolean string return typeof x
  • 如何将多维数组传递到 CodeIgniter 中的视图

    这真是让我抓狂了 我将多维数组传递给这样的视图 res this gt deliciouslib gt getRecentPosts 正如你所看到的 这是我正在使用的美味 API result 是一个数组 print r result 给出
  • 同步异步队列

    我计划有一个基于异步队列的生产者 消费者实现来处理实时数据 其中以正确的时间顺序发送数据至关重要 这是它的代码片段 async def produce Q n jobs for i in range n jobs print f Produ