Python 中多个 websocket 客户端连接的最佳方法?

2024-03-31

我很高兴我要问的问题相当广泛,但是,作为 Python 的新手,我正在努力寻找 [最佳] 方法来做一些事情,这在 Node.js 中是微不足道的,而在 Node.js 中则非常微不足道。其他环境,例如 C#。

假设有一个装满东西的仓库。假设该仓库有一个具有两个特征的 Websocket 接口:在客户端连接上,它会输出仓库当前库存的完整列表,然后在库存发生变化时进一步进行流式更新。

网络上有很多关于如何在 Python 中连接到仓库并响应其状态变化的示例。但...

如果我想连接到怎么办two仓库并根据从每个仓库分别检索到的组合信息来执行某些操作?如果我想根据时间等因素来做事情,而不是仅仅由库存变化和传入的 websocket 消息驱动,该怎么办?

在我见过的所有例子中——而且开始感觉有数百个——在某个地方,以某种形式,有一个run() or a run_forever() or a run_until_complete()换句话说,I/O 可能是异步的,但代码中总是存在大量的阻塞操作,并且总是有两个不适合我的情况的基本假设:只有一个 websocket 连接,并且所有处理都将由[单个] websocket 服务器发出的事件驱动。

我非常不清楚我的问题的答案是否是使用多个事件循环、多线程或其他东西。

迄今为止,尝试 Python 感觉就像在顶层公寓的地板上,欣赏着古怪但无可否认的优雅装饰。但当你走进电梯,按下标有“并行”或“并发”的按钮,电梯就会自由落体,最终把你送到一个充满了一些相当丑陋且冒着热气的管道的地下室。

...从华丽的隐喻回到技术上,我正在努力解决的关键问题是相当于 Node.js 代码的 Python,它可能像下面的示例一样简单[为了简单起见,显得不优雅]:

var aggregateState = { ... some sort of representation of combined state ... };

var socket1 = new WebSocket("wss://warehouse1");
socket1.on("message", OnUpdateFromWarehouse);

var socket2 = new WebSocket("wss://warehouse2");
socket2.on("message", OnUpdateFromWarehouse);

function OnUpdateFromWarehouse(message)
{
  ... Take the information and use it to update aggregate state from both warehouses ...
}



回答我自己的问题,希望它可以帮助其他Python新手......asyncio似乎是可行的方法(尽管存在一些问题,例如可以轻松地使事件循环陷入僵局)。

假设使用异步友好的 websocket 模块,例如网络套接字 https://websockets.readthedocs.io/en/stable/,似乎有效的是一个遵循以下思路的框架 - 为了简单起见,删除了诸如重新连接之类的逻辑。 (前提仍然是一个仓库,它发送其完整库存的初始列表,然后发送对该初始状态的更新。)

class Warehouse:
    def __init__(self, warehouse_url):
        self.warehouse_url = warehouse_url
        self.inventory = {}  # Some description of the warehouse's inventory
    
    async def destroy():
        if (self.websocket.open):
            self.websocket.close()  # Terminates any recv() in wait_for_incoming() 
            await self.incoming_message_task  # keep asyncio happy by awaiting the "background" task

    async def start(self):
        try:
            # Connect to the warehouse
            self.websocket = await connect(self.warehouse_url)          
            # Get its initial message which describes its full state
            initial_inventory = await self.websocket.recv()
            # Store the initial inventory
            process_initial_inventory(initial_inventory)
            # Set up a "background" task for further streaming reads of the web socket
            self.incoming_message_task = asyncio.create_task(self.wait_for_incoming())
            # Done
            return True
        except:
            # Connection failed (or some unexpected error)
            return False

    async def wait_for_incoming(self):
        while self.websocket.open:
            try:
                update_message = await self.websocket.recv()
                asyncio.create_task(self.process_update_message(update_message))
            except:
                # Presumably, socket closure
                pass

    def process_initial_inventory(self, initial_inventory_message):
        ... Process initial_inventory_message into self.inventory ...
    
    async def process_update_message(self, update_message):
        ... Merge update_message into self.inventory ...
        ... And fire some sort of event so that the object's 
        ... creator can detect the change. There seems to be no ...
        ... consensus about what is a pythonic way of implementing events, ... 
        ... so I'll declare that - potentially trivial - element as out-of-scope ...

完成初始连接逻辑后,关键的一件事是设置一个“后台”任务,该任务重复读取通过 Websocket 传入的进一步更新消息。上面的代码不包含任何事件触发,但是有多种方式可以process_update_message()可以/可以做到这一点(其中许多都非常简单),允许对象的创建者在其认为合适的时间和方式处理通知。只要对象的创建者继续与 asyncio 良好地配合并参与协作多任务处理,就将继续接收流消息,并且将继续触发任何事件。

完成后,可以按照以下方式建立连接:

async def main():
    warehouse1 = Warehouse("wss://warehouse1")
    if await warehouse1.start():
        ... Connection succeeded. Update messages will now be processed 
        in the "background" provided that other users of the event loop 
        yield in some way ...
    else:
        ... Connection failed ...

asyncio.run(main())

可以通过多种方式启动多个仓库,包括执行create_task(warehouse.start())对每一个然后做一个gather执行任务以确保/检查它们是否都正常。

当需要退出时,为了让 asyncio 保持快乐,并停止它抱怨孤立任务,并让一切正常关闭,有必要调用destroy()在每个仓库上。

但有一个共同点并未涵盖。扩展上面的原始前提,假设仓库还接受来自我们的 websocket 客户端的请求,例如“将 X 运送到 Y”。对这些请求的成功/失败响应将与一般更新消息一起出现;通常不可能保证请求的 send() 之后的第一个 receive() 将是对该请求的响应。这使情况变得复杂process_update_message().

我找到的最佳答案可能被认为是“pythonic”,也可能不被认为是“pythonic”,因为它使用了Future以一种非常类似于TaskCompletionSource在.NET中。

让我们发明一些实现细节;任何现实世界的场景都可能是这样的:

  • 我们可以在向仓库提交指令时提供request_id
  • 来自仓库的成功/失败响应将 request_id 重复返回给我们(因此也区分了命令响应消息与库存更新消息)

第一步是拥有一个字典,将待处理的、正在进行的请求的 ID 映射到Future对象:

    def __init__(self, warehouse_url):
        ...
        self.pending_requests = {}

发送请求的协程的定义如下所示:

    async def send_request(self, some_request_definition)
        # Allocate a unique ID for the  request
        request_id = <some unique request id>
        # Create a Future for the pending request
        request_future = asyncio.Future()
        # Store the map of the ID -> Future in the dictionary of pending requests
        self.pending_requests[request_id] = request_future
        # Build a request message to send to the server, somehow including the request_id
        request_msg = <some request definition, including the request_id>
        # Send the message 
        await self.websocket.send(request_msg) 
        # Wait for the future to complete - we're now asynchronously awaiting
        # activity in a separate function
        await asyncio.wait_for(command_future, timeout = None)
        # Return the result of the Future as the return value of send_request()
        return request_future.result()

调用者可以使用如下内容创建请求并等待其异步响应:

     some_result = await warehouse.send_request(<some request def>)

使这一切顺利进行的关键是修改和扩展process_update_message()执行以下操作:

  • 区分请求响应与库存更新
  • 对于前者,提取请求 ID(我们发明的场景称该 ID 会重复返回给我们)
  • 查找待处理的Future对于请求
  • Do a set_result()其值(其值可以是任何值,具体取决于服务器的响应内容)。这释放了send_request()并导致它的等待得到解决。

例如:

    async def process_update_message(self, update_message):
        if <some test that update_message is a request response>:
            request_id = <extract the request ID repeated back in update_message>
            # Get the Future for this request ID
            request_future = self.pending_requests[request_id]
            # Create some sort of return value for send_request() based on the response
            return_value = <some result of the request>
            # Complete the Future, causing send_request() to return
            request_future.set_result(return_value)
        else:
            ... handle inventory updates as before ...
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Python 中多个 websocket 客户端连接的最佳方法? 的相关文章

随机推荐

  • 缓存是个好主意吗?如果是的话,在哪里?

    我有一个 ASP NET 网站 每天有 10 25 000 名访问者 节假日前的峰值超过 60 000 人 由于它是一个内容网站 因此页面 访问量也很高 我有一些特定的页面 它们产生了大约 60 的流量 这些页面有点复杂 并且数据库较多 s
  • Xcode 4.2:如何从子项目导入 .h 文件

    我是 Xcode 子项目的新手 在我的 iPhone 应用程序项目中MyProject 我正在尝试将一些常见代码重构为一个名为的静态库项目MyLibrary 我创建后MyLibrary并移动代码 MyProject不再编译 错误在于MyPr
  • 在 openCV 中连接足够近的轮廓

    我有一组从图像中检测到的轮廓 斑点 问题在于 一些斑点在斑点检测和平滑过程中被分割 我尝试使用以下代码 Mat outlines Mat zeros m3 size CV 8UC3 findContours m3 contours CV R
  • 多匹配布尔提升

    我正在尝试执行以下操作 curl X POST localhost 9200 magento customer search pretty 1 d query bool must multi match query john fields
  • cygwin git下的KDiff3不会调用

    为了与互联网上发布的许多解决方案保持一致 我安装了 KDiff3 并进行了修改 gitconfig如下所示来使用它 尽管如此 当我跑步时git diff HEAD HEAD在存储库中进行测试 我得到了默认工具执行的差异 我在 Windows
  • NestJS 在非模块文件中注入模块服务

    我有一个用于视图渲染的函数文件 我想在这里使用 Nestjs 模块服务 我的渲染文件是这样的 export default parse render 因此 为了在这里使用模块服务 我尝试像这样注入 import Inject from ne
  • Django URL 模板匹配(除了模式之外的所有内容)

    我需要一个 django 正则表达式 它实际上适用于 url 路由器来执行以下操作 匹配路由中不包含 api 的所有内容 以下不起作用 因为 django 无法反转 r api 通常的方法是对路由声明进行排序 以便包罗万象的路由被 api路
  • 下载文件时强制显示“另存为”对话框

    下面的代码将文件保存到用户的磁盘上 function handleSaveImg event const image canvas toDataURL const saveImg document createElement a saveI
  • C# 中的内部与公共

    我想知道两者之间的区别public and internal可见性修饰符 我们什么时候应该使用internal在课堂上以及何时public 我对什么时候应该使用方法感到困惑public or internal 我读到了internal可以通
  • 将 SceneKit 对象放置在 SCNCamera 当前方向的前面

    我想在用户点击屏幕时创建一个新的 SceneKit 节点 并让它以设定的距离直接出现在相机前面 为了测试 这将是一个 SCNText 读取 您点击此处 它还应该与视线成直角 即 面向 相机 所以 鉴于self camera orientat
  • 循环遍历所有Mongo集合并执行查询

    首先 我对 mongodb 还很陌生 这是我的问题 我一直无法找到解决方案 假设我有 3 个不同的集合 mongos gt show collections collectionA collectionB collectionC 我想创建一
  • 是否可以在 C# 程序中使用 C++ .lib 文件?

    是否可以在 C 程序中使用 C lib 文件 有很多方法 阅读 MSDN 中有关 互操作 的内容 一种方法是将 lib 作为 DLL 公开 然后使用 pinvoke 从 C 项目调用这些函数 不过 这限制了您只能使用 C 风格的界面 如果您
  • 为什么 float 类型会将 0.5 舍入为 0?如何避免? [复制]

    这个问题在这里已经有答案了 我有这样的声明 SQL Server 2012 SELECT ROUND CAST 50 9685 as float 3 AS Col1 INTO Test 我想看到结果 50 969 但事实上我看到 50 96
  • 如何在 Android Studio 1.3 中设置 Gradle JVM 设置

    从版本 1 3 开始 Android Studio 将不再支持 IDE 特定的 Gradle JVM 参数设置 Gradle JVM 设置需要在 gradle properties 文件中设置 无论构建在何处 IDE 命令行或 CI 服务器
  • 在WPF中使用多重绑定时是否必须使用转换器?

    我想知道是否存在无需转换器即可使用多重绑定的场景 以及迫使我们使用转换器的限制 特别是 我试图以 string format 样式将一个字符串绑定到另外两个字符串 您最常使用的区域MultiBinding没有转换器是指您有一个字符串格式连接
  • libssh 支持 sftp 服务器功能吗?

    在我的项目中 我需要在服务器和多个客户端之间传输文件 客户端和服务器之间的通信应该加密 应该通过用户名和密码进行身份验证 而且协议应该是通用的 所以就想到了sftp ssh 有两个 c c 库 libssh 和 libssh2 其中只有第一
  • 查看 iframe url 是否已更改

    有没有办法让我知道 iframe 中的 URL 是否已更改 即用户已导航到某个地方 谢谢 类似的问题在这里 https stackoverflow com questions 44359 how do i get the current l
  • Jenkins下的Maven toolchains.xml位置

    我发现很难将 Maven 工具链与 Jenkins 一起使用 我需要指定测试应该使用 32 位 JVM 运行 并通过在中放置合适的 JDK 定义来执行此操作 HOME m2 toolchains xml当我在机器上本地运行时可以工作 但是如
  • 模拟没有可用磁盘空间情况的最简单方法?

    我需要在没有剩余磁盘空间的情况下测试我的网络应用程序 即我无法写入更多文件 但我不想只是为了确保确实没有剩余空间而在硬盘中塞满垃圾 我想要的是用特定的进程 实际上是一个 PHP 应用程序 来模拟这种情况 事实上 暂时禁止进程的磁盘写入就足够
  • Python 中多个 websocket 客户端连接的最佳方法?

    我很高兴我要问的问题相当广泛 但是 作为 Python 的新手 我正在努力寻找 最佳 方法来做一些事情 这在 Node js 中是微不足道的 而在 Node js 中则非常微不足道 其他环境 例如 C 假设有一个装满东西的仓库 假设该仓库有