我需要推迟发送频道消息。这是我的代码:
# consumers.py
class ChatConsumer(WebsocketConsumer):
def chat_message(self, event):
self.send(text_data=json.dumps(event['message']))
def connect(self):
self.channel_layer.group_add(self.room_name, self.channel_name)
self.accept()
def receive(self, text_data=None, bytes_data=None):
send_message_task.apply_async(
args=(
self.room_name,
{'type': 'chat_message',
'message': 'the message'}
),
countdown=10
)
# tasks.py
@shared_task
def send_message_task(room_name, message):
layer = get_channel_layer()
layer.group_send(room_name, message)
任务正在执行,我看不到任何错误,但未发送消息。仅当我从消费者类方法发送它时它才有效。
我还尝试使用 AsyncWebsocketConsumer 并使用 AsyncToSync(layer.group_send) 发送。它错误地显示“您不能在与异步事件循环相同的线程中使用 AsyncToSync - 只需直接等待异步函数。”
然后我尝试将 send_message_task 声明为异步并使用等待。没有再发生任何事情(没有错误),并且我不确定任务是否完全执行。
以下是版本:
Django==1.11.13
redis==2.10.5
django-celery==3.2.2
channels==2.1.2
channels_redis==2.2.1
设置:
REDIS_HOST = os.getenv('REDIS_HOST', '127.0.0.1')
BROKER_URL = 'redis://{}:6379/0'.format(REDIS_HOST)
CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels_redis.core.RedisChannelLayer",
"CONFIG": {
"hosts": ['redis://{}:6379/1'.format(REDIS_HOST)],
},
},
}
有任何想法吗?
UPD:刚刚发现redis通道层被检索,但它的group_send方法没有被调用,只是被跳过。
UPD 2:发送使用AsyncToSync(layer.group_send)
从控制台工作。调用任务不带apply_async
也有效。但运行它apply_async
导致错误You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly
。将任务定义为异步并使用await
当然也打破了一切。