如何使用 Django Channels 进行多线程 AsyncConsumer

2024-03-23

我已经使用 Django Channels 一周了,有些事情让我烦恼runworker并行性。

例如,我有一个 MQTT 客户端,它在收到消息时在通道中发布,基本。

async def treat_message(msg):
    channel_layer = get_channel_layer()
    payload = json.loads(msg.payload, encoding="utf-8")

    await channel_layer.send("mqtt", {
        "type": "value.change",
        "message": payload
    })

这个送的很好啊我想发送多少就发送多少,它会发送到redis队列中。前往频道mqtt.

然后我运行工作程序,它将重定向队列中的消息mqtt with :

python manage.py runworker mqtt
2018-09-12 16:33:42,232 - INFO - runworker - Running worker for channels ['mqtt']

这就是问题开始的地方。以下是 AsyncConsumer 读取数据的内容:

class MQTTConsumer(AsyncConsumer):
    async def value_change(self, event):
        await asyncio.sleep(5)
        print("I received changes : {}".format(event["message"]))

我睡觉是为了模拟任务的业务。这就是我要去的地方:异步消费者不是多线程的!当我向通道发送两条消息时,消费者需要 10 秒来处理第二条消息,而不是多线程时的 5 秒。如下所示。

2018-09-12 16:45:25,271 - INFO - runworker - Running worker for channels ['mqtt']
2018-09-12 16:45:32,559 - INFO - mqtt - I received changes : {'oui': 'non'}
2018-09-12 16:45:37,561 - INFO - mqtt - I received changes : {'oui': 'non'}
2018-09-12 16:45:42,563 - INFO - mqtt - I received changes : {'oui': 'non'}
2018-09-12 16:45:47,565 - INFO - mqtt - I received changes : {'oui': 'non'}

有关该主题的任何信息都会有很大帮助,提前致谢!

EDIT:我发现管理它的唯一方法是创建一个执行器,其中包含执行异步操作的工作人员。但我不确定其部署效率

def handle_mqtt(event):
    time.sleep(3)
    logger.info("I received changes : {}".format(event["message"]))


class MQTTConsumer(AsyncConsumer):
    def __init__(self, scope):
        super().__init__(scope)
        self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)

    async def value_change(self, event):
        loop = asyncio.get_event_loop()
        future = loop.run_in_executor(self.executor, handle_mqtt, event)

目前这是设计使然

是的,这就是预期的设计,因为它是最安全的方式(如果您不知道的话,它可以防止竞争条件)。如果您愿意并行运行消息,只需在需要时分离您自己的协程(使用asyncio.create_task),确保清理它们并等待它们关闭。这是相当大的开销,因此希望我们将来能够为消费者提供选择加入模式,但目前我们提供的只是安全选项。

https://github.com/django/channels/issues/1203 https://github.com/django/channels/issues/1203

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何使用 Django Channels 进行多线程 AsyncConsumer 的相关文章

  • 无法在 virtualenv 中安装 libxml2

    我有一个问题libxml2蟒蛇模块 我正在尝试将其安装在python3 虚拟环境使用以下命令 pip install libxml2 python3 但它显示以下错误 Collecting libxml2 python3 Using cac
  • 如何在多线程C++ 17程序中交换两个指针?

    我有两个指针 pA 和 pB 它们指向两个大的哈希映射对象 当pB指向的哈希图完全更新后 我想交换pB和pA 在C 17中 如何快速且线程安全地交换它们 原子 我是 c 17 的新手 2个指针的原子无等待交换可以通过以下方式实现 inclu
  • 在linux上安装python ssl模块,无需重新编译

    是否可以在已经安装了 OpenSSL 的 Linux 机器上安装 python 的 SSL 模块 而无需重新编译 python 我希望它就像复制几个文件并将它们包含在库路径中一样简单 Python版本是2 4 3 谢谢 是否可以在已经安装了
  • 为什么 re.findall 在查找字符串中的三元组项时不具体。 Python

    所以我有四行代码 seq ATGGAAGTTGGATGAAAGTGGAGGTAAAGAGAAGACGTTTGA OR 0 re findall r ATG 9 TAA TAG TGA seq 首先让我解释一下我正在尝试做什么 如果这令人困惑
  • 在 Linux 上使用多处理时,TKinter 窗口不会出现

    我想生成另一个进程来异步显示错误消息 同时应用程序的其余部分继续 我正在使用multiprocessingPython 2 6 中的模块来创建进程 我试图用以下命令显示窗口TKinter 这段代码在Windows上运行良好 但在Linux上
  • 类型错误:此 COM 对象无法自动执行 makepy 过程 - 请为此对象手动运行 makepy

    这是什么错误 回溯错误 C Users DELL PycharmProjects MyNew venv Scripts python exe C Users DELL PycharmProjects MyNew agaaaaain py T
  • Python-验证我的文档 xls 中是否存在工作表

    我正在尝试在空闲时间设计一个小程序 加载 xls 文件 然后在要扫描的文档中选择一张纸 步骤1 用户导入 xls文件 导入程序后检查文件是否存在 我能做到的 第 2 步 我要求用户提供要分析的文档表 xls 的名称 这就是它停止的地方 该程
  • 使用 if 语句的网格网格和用户定义函数的真值不明确

    假设我有一个函数f x y 足够光滑 然而 有些值仅在有限的意义上存在 以sin x x的价值x 0只存在于极限 x gt 0 中 在一般情况下 我用一个来处理这个问题if陈述 如果我在情节中使用它meshgrid我收到一条错误消息 Val
  • Python 属性和 Swig

    我正在尝试使用 swig 为一些 C 代码创建 python 绑定 我似乎遇到了一个问题 试图从我拥有的一些访问器函数创建 python 属性 方法如下 class Player public void entity Entity enti
  • Seaborn 中没有线性拟合的散点图

    我想知道是否有办法关闭seaborn中的线性拟合lmplot或者是否有一个等效函数可以生成散点图 当然 我也可以使用 matplotlib 但是 我发现 seaborn 中的语法和美学非常吸引人 例如 我想绘制以下情节 import sea
  • Django 独特的不工作

    我在从查询中过滤掉重复项时遇到问题 我正在使用 Django 1 4 和 Postgres 8 4 13 我在我的模型对象上使用这个查询 它是一个 jquery 自动完成 term request GET get term field re
  • 如何按 pandas 中的值对系列进行分组?

    我现在有一只熊猫Series与数据类型Timestamp 我想按日期对其进行分组 并且每组中有许多行具有不同的时间 看似显而易见的方法类似于 grouped s groupby lambda x x date 然而 熊猫的groupby按索
  • 导入 pandas 显示 ImportError: 无法导入名称哈希表

    我已经在 python 3 3 上安装了 pandas 代码如下 import csv import pandas from pandas import DataFrame csvdata pandas read csv datafile
  • 无法让gunicorn使用Python 3

    我有 Ubuntu NGINX Gunicorn 以及可与 Python 3 设置配合使用的虚拟环境 但我的 Flask 应用程序仍然以 2 7 6 运行 我已系统地按照说明进行操作 但找不到解决方案 Gunicorn 配置文件 progr
  • Python 读取未格式化的直接访问 Fortran 90 给出不正确的输出

    这是数据的写入方式 它是一个二维浮点矩阵 我不确定大小 open unit 51 file rmsd nn output form unformatted access direct status replace recl Npoints
  • 两种 ODE 求解器之间的差异

    我想知道 两者之间有什么区别ODEINT and solve ivp用于求解微分方程 它们之间有什么优点和缺点 f1 solve ivp f 0 1 y0 y0 is the initial point f2 odeint f y0 0 1
  • 为boost python编译的.so找不到模块

    我正在尝试将 C 代码包装到 python 中 只需一个类即可导出两个函数 我编译为map so 当我尝试时import map得到像噪音一样的错误 Traceback most recent call last File
  • 如何同时接受int和float类型的输入?

    我正在制作一个货币转换器 如何让 python 同时接受整数和浮点数 我就是这样做的 def aud brl amount From to ER 0 42108 if amount int if From strip aud and to
  • 基于值的 matplotlib 条形图颜色

    有没有一种方法可以根据条形图的值对条形图的条形进行着色 例如 values below 0 5 red values between 0 5 to 0 green values between 0 to 08 blue etc 我找到了一些
  • Biopython 可以执行 Seq.find() 来解释歧义代码吗

    我希望能够在 Seq 对象中搜索考虑歧义代码的子序列 Seq 对象 例如 以下内容应该是正确的 from Bio Seq import Seq from Bio Alphabet IUPAC import IUPACAmbiguousDNA

随机推荐

  • 查找背包中的物品

    我想用C 递归地解决背包问题 这是我的代码 public int f int n int remain if n lt 0 return 0 if w n gt remain Thread VolatileWrite ref check n
  • as3crypto 的 sha1 哈希值与 PHP 生成的哈希值不同

    使用 as3 中的 as3crypto 从字符串 12345 生成 SHA1 哈希 与示例中的操作方式相同 var sha1 SHA1 new SHA1 var src ByteArray Hex toArray 12345 var dig
  • SCons 库和子库

    我有一个基于 SCons 的分层构建系统 我有一个根 SConstruct 它调用一个构建共享库的 SConscript 然后调用另一个构建依赖于共享库的可执行文件的 SConscript 所以这是我的问题 我对linux上共享库的理解是
  • 与目标虚拟机断开连接,地址:“127.0.0.1:62535”,传输:intellij idea CE 上的“socket”。我无法调试我的程序。有什么建议么?

    连接到目标VM 地址 127 0 0 1 63073 传输 socket 与目标虚拟机断开连接 地址 127 0 0 1 63073 传输 socket 我有同样的问题 我注意到应用程序上没有设置下拉菜单 看一下这个
  • C指针指针和段错误

    下面是我用 C 编写的简单链表 我的问题是在 headRef newNode 中这会导致分段错误 然后我尝试 headRef newNode 这解决了seg错误问题 尽管这两行代码在我看来似乎以相同的方式工作 但为什么一行代码会导致段错误
  • Android 以编程方式接听电话

    是否可以以编程方式在android中接听电话 我发现有些地方不可能 但随后安装了应用程序https play google com store apps details id com a0softus autoanswer https pl
  • JqueryUI可排序滚动问题

    我已经初始化了 items作为jquery可排序 使项目列表可排序 items是父 div 的子 div content 父级div content最大高度设置为 40 并且允许在溢出时滚动 这是CSS content parent div
  • Clojure ^floats 与 #^floats?

    有什么区别 floats and floats在 Clojure 类型注释中 在拉取请求中 有人建议我使用此注释 put floats init fft array 这确实有效 但我不知道为什么会有 我发现这也有效 put floats i
  • ScrollView 根本不滚动

    我无法使 ScrollView 正确滚动 它总是切断底部的内容 就好像它是一个普通的 LinearLayout 一样 My code
  • 我会被这个代码攻击吗?

    我购买了一个脚本 其中有一些奇怪的代码 我是一名 PHP 初学者 但对清理输入数据之类的事情略知一二 这是代码
  • 如何判断代码是否将在 Blazor 的客户端或服务器上运行?

    我是 Blazor 开发的新手 我可能在这里遗漏了一些明显的东西 但是搜索 google 阅读文档和搜索此网站并没有为我找到答案 出于安全原因 我需要确保一些代码在服务器端运行 例如散列密码 我知道 Blazor 通常会自动确定代码的运行位
  • Asp.net Viewstate不保存控件的样式

    我读过 asp net 中的视图状态存储以下值控制属性跨回发 假设我有一个带有文本框的页面
  • JQgrid 从列中保存和恢复对象

    可以将复杂的对象保存到列中并在之后恢复它 这是一个例子 杰森 datamain mydata address data1 15 data2 0 0 data3 1000 Jqgrid jQuery rowed5 jqGrid datatyp
  • 如何在检索 ListView 项目时显示“正在加载...”文本

    还有一些其他应用程序可以执行此操作 例如 Twitter Facebook 甚至是 Android Market 等本机应用程序 当您想要显示从互联网检索到的项目列表时 这看起来像是向用户显示有关正在进行的操作的一些通知的标准方法 这是一个
  • Apache 上不带尾部斜杠的虚荣 URL

    下面的代码重写了我们网站上 profiles 目录中的所有 URLexample com profiles name to example com name 但我们还想删除结尾的斜杠 以进一步简化生成的 URL 使其更漂亮example c
  • 1 个 django 应用程序中约有 20 个模型

    我已经开始为自己开发一个通过浏览器运行的本地应用程序 最近读完 django 教程后 我认为使用 django 而不是简单的 python 可能会更好 有一个问题 我至少有 20 个模型 每个模型都有很多功能 很简单 它将创建一个巨大的模型
  • Android 应用程序中带有 KeyStore.getInstance 的 NoSuchAlgorithmException

    我在android中编写程序用于与服务器通信 我使用SSL协议 当我编写这段代码时 KeyStore ks KeyStore getInstance JKS 我收到这个错误 java security NoSuchAlgorithmExce
  • Mockito:如何在 Spy 中模拟对象

    该应用程序运行在JEE环境中 我希望将 Spy 注入到被测试的 bean 中 Spy 对象内部还有一些需要注入的 bean 如何将这些 bean 的模拟注入到 Spy 中 这是用例 package testinject2 import ja
  • 如何使用 MATLAB 创建 k 阶矩阵?

    我希望创建一个排名矩阵k 矩阵的维数是m x n 输入k满足这个条件k lt min m n 目前还不太清楚您的目标是什么 但为了创建一个矩阵B具有特定等级k 从矩阵A with rank至少k 您可能想利用svd并继续如下 gt gt g
  • 如何使用 Django Channels 进行多线程 AsyncConsumer

    我已经使用 Django Channels 一周了 有些事情让我烦恼runworker并行性 例如 我有一个 MQTT 客户端 它在收到消息时在通道中发布 基本 async def treat message msg channel lay