AIORedis 和 PUB/SUB 不是 asnyc

2024-04-29

I used aioredis http://aioredis.readthedocs.org/en/latest/examples.html用于编写异步服务,该服务将侦听某个通道并以异步方式运行一些命令。

基本上我从示例页面 http://aioredis.readthedocs.org/en/latest/examples.html编写一个小型测试应用程序并删除不必要的部分:

import asyncio
import aioredis

async def reader(ch):
    while (await ch.wait_message()):
        msg = await ch.get_json()
        print('Got Message:', msg)
        i = int(msg['sleep_for'])
        print('Sleep for {}'.format(i))
        await asyncio.sleep(i)
        print('End sleep')


async def main():
    sub = await aioredis.create_redis(('localhost', 6379))
    res = await sub.subscribe('chan:1')
    ch1 = res[0]
    tsk = await reader(ch1)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

还有另一个测试应用程序,它发布带有sleep_for然后在订阅者应用程序中使用该字段来模拟内部的某些工作reader协程使用sleep陈述。

我期望“睡眠”以“并行”方式运行,但实际上它们以同步方式出现在屏幕上,只是一个接一个。

我的猜测是,一旦击中await ch.get_json(..)(或者甚至可能await ch.wait_message())行我应该能够处理下一条消息。在实践中,它像同步代码一样运行。我哪里错了?这可以使用连接池来处理,但这意味着有些东西不是异步的,并且不知道到底是什么。


我的猜测是,一旦点击await ch.get_json(..)(或者甚至await ch.wait_message())行,我应该能够处理下一条消息。

事情不是这样的async/await语法有效。每次你击中一个await在协程中,该协程将被“暂停”,将控制权交给被调用的协程。如果它正在睡眠,它不会自动处理下一条消息。

你应该做的是使用ensure_future在单独的协程中处理每条消息:

import asyncio
import aioredis

async def handle_msg(msg):
    print('Got Message:', msg)
    i = int(msg['sleep_for'])
    print('Sleep for {}'.format(i))
    await asyncio.sleep(i)
    print('End sleep')

async def reader(ch):
    while (await ch.wait_message()):
        msg = await ch.get_json()
        asyncio.ensure_future(handle_msg(msg))

async def main():
    sub = await aioredis.create_redis(('localhost', 6379))
    res = await sub.subscribe('chan:1')
    ch1 = res[0]
    tsk = await reader(ch1)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close() 
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

AIORedis 和 PUB/SUB 不是 asnyc 的相关文章

  • 如何在后台运行python程序以保持活动窗口相同

    我编写了一个程序 可以将前景窗口更改为显示器尺寸的 85 并且要成功运行 前景窗口需要保持不变 我已将 python 脚本 pyw 放入批处理文件 运行 pythonw 并在桌面上创建了批处理文件的快捷方式 并提供了快速运行它的快捷方式 我
  • 无法在 python 中导入名称 GoogleMaps

    我使用下面的代码来获取地址的纬度和经度 from googlemaps import GoogleMaps gmaps GoogleMaps api key address Constitution Ave NW 10th St NW Wa
  • Python3 http.server:将日志保存到文件中

    我使用Python3 6编写了一个简单的HTTP服务器来重定向所有请求 我写的文件可以找到here https github com kmahyyg learn py3 blob master antiscanhttp py 我可以在 Ub
  • 如何为 apscheduler 指定“记录器”

    我正在尝试学习如何使用 Python 的 apscheduler 包 但它会定期抛出以下错误 No handlers could be found for logger apscheduler scheduler 该消息似乎与计划作业中的错
  • Twython - 如何使用媒体 url 更新状态

    在我的应用程序中 我允许用户在 Twitter 上发帖 现在我想让他们通过媒体更新他们的状态 In twython py我看到一个方法update status with media从文件系统读取图像并上传到 Twitter 我的图像不在文
  • 根据两个预先计算的直方图报告两个样本的 K-S 统计量

    Problem 在这里 我绘制了存储在文本文件中的 2 个数据集 在列表中 dataset 每个包含 218 亿个数据点 这使得数据太大而无法作为数组保存在内存中 我仍然能够将它们绘制为直方图 但我不确定如何通过2 样本KS测试 http
  • 如何使用 tweepy 仅提取主题标签中的文本?

    我想为我的情感分析项目提取主题标签 但是我得到了一个字典列表 其中包含所有主题标签及其在推文中的索引 我只想要文字 我的代码 data tweepy Cursor api search q since a i until b i items
  • 带有 UnboundLocalError 的本地和全局引用

    我不太明白为什么代码 def f print s s foo f 运行得很好但是 def f print s s bar s foo f 给我 UnboundLocalError 我知道我可以通过声明来解决这个问题s作为函数内的全局变量或简
  • 根据Python中两行之间的匹配创建一个带有[0,1]的新列

    我正在尝试将多个列表或数据帧与一个大型基础数据帧进行比较 然后对于任何匹配 我想附加一个存储 1 匹配或 0 不匹配的列 df pd DataFrame Name A B C D ID 5 6 6 7 8 9 7 list1 5 6 8 9
  • Keras ImageDataGenerator 相当于 csv 文件

    我在文件夹中排序了一堆数据 如下图所示 我需要构建一个 DataIterator 以便将数据放入神经网络模型中 当数据是图像时 我找到了很多例子来解决这个问题 使用 Keras 类图像数据生成器及其方法流自目录 但当数据是 csv 结构时则
  • 用于 OAuth 身份验证的 WSGI 中间件

    我使用构建了一个非常小的网络应用程序Flask http flask pocoo org 现在我想向网站添加非常基本的身份验证 我不需要授权 由于 Flask 不支持开箱即用的 auth auth 我想插入 WSGI 中间件来完成这项工作
  • 加载 IPython 笔记本时出错

    一旦我用 Jupyter 打开笔记本文件 它要求我转换文件 我就再也无法在标准 IPython 笔记本中打开它了 我收到以下错误 Error loading notebook Bad Request 2014 12 21 04 13 03
  • Xamarin 无法从异步获取实例

    我编写了一个通过蓝牙连接到 ESP32 的 Xamarin Forms 应用程序 现在我想从 MainPage xaml 页面的 CustomControl JoystickControl 获取值 我已经这样尝试过了 MainPage xa
  • Python Pandas DateOffset 使用另一列中的值

    我以为这会很容易 但下面的内容并不适合我想要的 只是尝试通过使用另一列中的值将天数添加到预先存在的日期时间列来计算新的日期列 我下面的 偏移 列只有 1 位数字 df new date df orig date apply lambda x
  • 如何检索 SQLAlchemy 结果集的 python 列表? [复制]

    这个问题在这里已经有答案了 我有以下查询来检索单列数据 routes query select schema stop times c route number schema stop times c stop id stop id dis
  • 将文本文件转换为 plink PED 和 MAP 格式

    我有以下数据 其中的一小部分 名为 short2 pre snp tumor txt rs987435 C G 1 1 1 0 2 rs345783 C G 0 0 1 0 0 rs955894 G T 1 1 2 2 1 rs608879
  • 从多个大型 NetCDF 文件中提取数据的快速/高效方法

    我只需要从全局网格中提取特定节点集的数据 由纬度 经度坐标 按 5000 10000 的顺序 给出 这些数据是水力参数的时间序列 例如波高 全局数据集很大 因此分为许多 NetCDF 文件 每个 NetCDF 文件大小约为 5GB 包含整个
  • 为什么 Python ggplot 返回名称“aes”未定义?

    当我使用以下命令时 p ggplot aes x DTM y TMP1 data data 我收到以下错误 NameError name aes is not defined 你可以帮帮我吗 你需要导入aes from ggplot imp
  • 从受密码保护的 Excel 文件到 pandas DataFrame

    我可以使用以下命令打开受密码保护的 Excel 文件 import sys import win32com client xlApp win32com client Dispatch Excel Application print Exce
  • 如何将 c_uint 的 ctypes 数组转换为 numpy 数组

    我有以下 ctypes 数组 data ctypes c uint 100 我想创建一个 numpy 数组np data包含来自 ctypes 数组数据的整数值 ctypes 数组显然稍后会填充值 我看到numpy中有一个ctypes接口

随机推荐