如何从 django-celery 3 任务发送通道 2.x 组消息?

2024-01-01

我需要推迟发送频道消息。这是我的代码:

# consumers.py
class ChatConsumer(WebsocketConsumer):
    def chat_message(self, event):
        self.send(text_data=json.dumps(event['message']))

    def connect(self):
        self.channel_layer.group_add(self.room_name, self.channel_name)
        self.accept()

    def receive(self, text_data=None, bytes_data=None):
        send_message_task.apply_async(
            args=(
                self.room_name,
                {'type': 'chat_message',
                 'message': 'the message'}
            ),
            countdown=10
        )

# tasks.py
@shared_task
def send_message_task(room_name, message):
    layer = get_channel_layer()
    layer.group_send(room_name, message)

任务正在执行,我看不到任何错误,但未发送消息。仅当我从消费者类方法发送它时它才有效。

我还尝试使用 AsyncWebsocketConsumer 并使用 AsyncToSync(layer.group_send) 发送。它错误地显示“您不能在与异步事件循环相同的线程中使用 AsyncToSync - 只需直接等待异步函数。”

然后我尝试将 send_message_task 声明为异步并使用等待。没有再发生任何事情(没有错误),并且我不确定任务是否完全执行。

以下是版本:

Django==1.11.13
redis==2.10.5
django-celery==3.2.2
channels==2.1.2
channels_redis==2.2.1

设置:

REDIS_HOST = os.getenv('REDIS_HOST', '127.0.0.1')
BROKER_URL = 'redis://{}:6379/0'.format(REDIS_HOST)
CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.core.RedisChannelLayer",
        "CONFIG": {
            "hosts": ['redis://{}:6379/1'.format(REDIS_HOST)],
        },
    },
}

有任何想法吗?

UPD:刚刚发现redis通道层被检索,但它的group_send方法没有被调用,只是被跳过。

UPD 2:发送使用AsyncToSync(layer.group_send)从控制台工作。调用任务不带apply_async也有效。但运行它apply_async导致错误You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly。将任务定义为异步并使用await当然也打破了一切。


也许这不是对起始问题的直接回答,但这可能会有所帮助。 如果您收到异常“您不能在与异步事件循环相同的线程中使用 AsyncToSync - 只需直接等待异步函数”,那么您可能会执行以下操作:

  1. 事件循环在某处创建
  2. 一些异步代码已启动
  3. 一些 SYNC 代码是从 ASYNC 代码调用的
  4. SYNC 代码尝试使用 AsyncToSync 调用 ASYNC 代码,以防止出现这种情况

似乎 AsyncToSync 检测到外部事件循环并决定不干扰它。

解决方案是直接将异步调用包含在外部事件循环中。 示例代码如下,但最好检查您的情况以及外循环正在运行...

loop = asyncio.get_event_loop()
loop.create_task(layer.group_send(room_name, {'type': 'chat_message', 'message': message}))
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何从 django-celery 3 任务发送通道 2.x 组消息? 的相关文章

随机推荐

  • 尝试使用 jQuery 隐藏 html 表的列

    function func id document ready function toggle click function td nth child id gt div toggle return false 我试图隐藏与单击的按钮对应的
  • 使用 select、group by 和 count 时如何获得非空结果集?

    这是当前的查询 SELECT status ct reconcile status IFNULL COUNT status ct reconcile status 0 AS sch change status num FROM db cre
  • 我可以使用WebRTC接收标准RTP视频流吗?

    我有两台计算机在同一网络上 其中一个使用 RTP 协议传输电影 H264 是否可以创建一个简单的 JavaScript 应用程序来在第二台计算机上接收此流并显示在视频标签中 到目前为止 我对WebRTC的印象是它被设计为在浏览器之间使用 都
  • 一直运行的 iOS GPS 跟踪应用程序

    我正在尝试制作一个应用程序来始终跟踪用户的 GPS 该应用程序是一种汽车 GPS 跟踪器 用于始终获取驾驶员的位置并将其发送到服务器 我尝试将 位置更新 添加到 后台模式 但进入后台时应用程序将在 10 分钟后自动暂停 有没有办法让这个应用
  • 在另一个单元格中输入数据并按 Enter 选项后转到特定单元格

    我有一个 Excel 工作表 我正在 A2 B2 和 C2 中输入数据 按 Enter 键后 我希望光标选择下一行 以便我可以输入 A3 B3 和 C3 等信息 我找到了这个信息 Private Sub Worksheet Change B
  • 确定点是否在 3D 三角形内部

    我正在寻求对我对确定点是否位于 3D 三角形内部的方法的看法的认可 给定一条 R t e td 形式的射线和一组三个点 T V0 V1 V2 它们在三维空间中形成一个三角形 我知道如何找到该平面的参数方程三点形成以及如何确定射线是否与该平面
  • Android - InstrumentationTestRunner

    我是 Android 新手 并且来自 NET 背景 对 Java 有点生疏 我正在为 Android 上运行的移动应用程序开发一些测试自动化 目前 我只是使用 Android 调试桥通过命令行启动测试 但当我尝试运行任何测试时遇到错误 首先
  • 从 SQL 查询中获取参数名称

    后端是PostgreSQL服务器9 1 我正在尝试构建 AdHoc XML 报告 报告文件将包含 SQL 查询 所有查询都必须以 SELECT 语句开头 SQL 查询将带有参数 根据关联列的数据类型 这些参数将相应地呈现给用户以提供值 一个
  • Traceview最大记录时间?

    我正在使用 Debug startMethodTracing 和 Debug stopMethodTracing 来优化一段需要大约 30 秒执行的代码 但是当我使用跟踪视图打开跟踪文件时 它只显示大约 6 5 秒的跟踪数据 有什么线索吗
  • Java 中的 方法是什么?它可以被覆盖吗? [复制]

    这个问题在这里已经有答案了
  • 陷入 Matlab 的用于匹配 vlfeat 图像点的子图机制

    我正在 Matlab 中做 vlfeat 我正在关注这个问题here https stackoverflow com questions 1500498 how to use sift algorithm to compute how si
  • 获取EAX寄存器的上半部分

    在x86汇编语言中 有没有办法获得上半部分EAX登记 我知道AX寄存器已经包含了下半部分EAX注册 但我还不知道有什么方法可以获得上半部分 我知道mov bx ax会移动下半部分eax into bx 但我想知道如何移动上半部分eax in
  • 参数中的前向声明与“正常”前向声明之间的区别

    模板 参数中的前向声明 使用详细类型说明符 https en cppreference com w cpp language elaborated type specifier 和 正常 的前向声明 void foo struct bar
  • 音视频同步、TS MPEG2;H264/AVC、了解Handbrake中的PTS

    同步一直让我着迷 或者准确地说 为什么媒体播放器可以同步观看 ts 而重新组装的解复用音频 视频却不同步 所以我试图了解这一点 以及可以采取哪些措施来防止这种情况发生 我已阅读以下内容 https trac handbrake fr wik
  • 重用 Azure 服务总线中的连接

    我们在 Windows Azure 上以 Web 角色托管了一个 API 该 API 具有 2 个实例 用于接收请求 验证它们 然后将它们添加到 Azure 服务总线队列 最近我们开始对此进行负载测试 发现我们当前的代码抛出以下异常 无法将
  • Silverlight 工具包;饼图颜色

    我有一个巨大的问题无法解决 假设我有五种不同的水果 我希望每种水果都与某种颜色相关联 假设我有三个 篮子 其中包含零个或多个上述水果 当我为我的三个篮子制作饼图时 每个楔形只是一些随机颜色 大概是由控件选择的 我该怎么说 将图表中的蓝莓 香
  • Craco 无法与 [email protected] 正常工作

    将react scripts升级到v5后 craco start不能正常工作 应用程序启动时没有错误 但在浏览器中 有一个空白页面 如果我打开检查器 我只能看到 index html 代码而不是反应代码 它运作良好 电子邮件受保护 cdn
  • 比较 python 中的旋转列表

    我试图比较两个列表以确定一个列表是否是另一个列表的旋转 循环排列 例如 a 1 2 3 b 1 2 3 or 2 3 1 or 3 1 2 都是匹配的 而 b 3 2 1 is not 为此 我有以下代码 def matching list
  • java中ThreadPool的编号

    您好 我在一个函数中使用带有两个线程的线程池 该函数由多个客户端以同步方式调用 在我正在使用的那个函数中 ExecutorService executor Executors newFixedThreadPool 2 完成任务后我将关闭执行
  • 如何从 django-celery 3 任务发送通道 2.x 组消息?

    我需要推迟发送频道消息 这是我的代码 consumers py class ChatConsumer WebsocketConsumer def chat message self event self send text data jso