我有个主意。编写一个基于 WebSocket 的 RPC,该 RPC 将根据以下场景处理消息。
- 客户端连接到 WS(Web 套接字)服务器
- 客户端向WS服务器发送消息
- WS服务器将消息放入传入队列(可以是multiprocessing.Queue或RabbitMQ队列)
- 进程池中的一个worker拿起消息进行处理
- 消息正在处理(可能非常快或非常慢 - 这与 WS 服务器无关)
- 消息处理完成后,将处理结果推送到输出队列
- WS服务器从队列中弹出结果并发送给客户端
NOTE:关键点是 WS 服务器应该是非阻塞的并且只负责:
- 连接接受
- 从客户端获取消息并将其放入传入队列
- 从输出队列中弹出消息并将其发送回客户端
NOTE2:以某种方式存储客户端标识符并将其与来自客户端的消息一起传递可能是个好主意
NOTE3:由于来回排队消息,简单消息处理的速度(例如,获取消息作为输入并将其推回结果)会变慢,这完全没问题。目标是能够使用与处理快速消息相同的代码风格在池中运行处理器昂贵的操作(粗略的非实际示例:几个嵌套的“for”循环)。 IE。从输入队列中弹出消息以及某种客户端标识符,对其进行处理(可能需要一段时间)并将处理结果与客户端 ID 一起推送到输出队列。
问题:
- 在 TornadoWeb 中,如果我有一个队列(多处理或 Rabit),如何才能
我让 Tornado 的 IOLoop 在有新数据时触发一些回调
该队列中的项目?你能导航我到一些现有的吗
有没有实施?
- 这样的设计有现成的实现吗? (不一定是龙卷风)
- 也许我应该使用另一种语言(不是python)来实现这样的设计?
致谢:
- 不欢迎使用 REST 和 WSGI 来实现我想要实现的任何目标的建议
- 诸如“这是我通过谷歌搜索 2 秒找到的代码的链接。它有一些来自龙卷风和多重处理的导入。我不确定它的作用,但我 99% 确定它正是您所需要的”也不受欢迎
- 建议使用异步库而不是普通的阻塞库......:)
龙卷风的IOLoop
允许您通过文件描述符处理来自任何文件对象的事件,因此您可以尝试以下操作:
- 通过以下方式与您的每个工作流程建立联系
multiprocessing.Pipe
- call add_handler http://www.tornadoweb.org/documentation/ioloop.html#tornado.ioloop.IOLoop.add_handler对于每个管道的父端(使用连接的
fileno()
)
- 让工作人员每次将某些内容放入输出队列时都会写入一些随机垃圾,无论这是否是
multiprocessing.Queue
任何 MQ 的。
- 处理事件处理程序中工作人员的答案
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)