回答我自己的问题,希望它可以帮助其他Python新手......asyncio
似乎是可行的方法(尽管存在一些问题,例如可以轻松地使事件循环陷入僵局)。
假设使用异步友好的 websocket 模块,例如网络套接字 https://websockets.readthedocs.io/en/stable/,似乎有效的是一个遵循以下思路的框架 - 为了简单起见,删除了诸如重新连接之类的逻辑。 (前提仍然是一个仓库,它发送其完整库存的初始列表,然后发送对该初始状态的更新。)
class Warehouse:
def __init__(self, warehouse_url):
self.warehouse_url = warehouse_url
self.inventory = {} # Some description of the warehouse's inventory
async def destroy():
if (self.websocket.open):
self.websocket.close() # Terminates any recv() in wait_for_incoming()
await self.incoming_message_task # keep asyncio happy by awaiting the "background" task
async def start(self):
try:
# Connect to the warehouse
self.websocket = await connect(self.warehouse_url)
# Get its initial message which describes its full state
initial_inventory = await self.websocket.recv()
# Store the initial inventory
process_initial_inventory(initial_inventory)
# Set up a "background" task for further streaming reads of the web socket
self.incoming_message_task = asyncio.create_task(self.wait_for_incoming())
# Done
return True
except:
# Connection failed (or some unexpected error)
return False
async def wait_for_incoming(self):
while self.websocket.open:
try:
update_message = await self.websocket.recv()
asyncio.create_task(self.process_update_message(update_message))
except:
# Presumably, socket closure
pass
def process_initial_inventory(self, initial_inventory_message):
... Process initial_inventory_message into self.inventory ...
async def process_update_message(self, update_message):
... Merge update_message into self.inventory ...
... And fire some sort of event so that the object's
... creator can detect the change. There seems to be no ...
... consensus about what is a pythonic way of implementing events, ...
... so I'll declare that - potentially trivial - element as out-of-scope ...
完成初始连接逻辑后,关键的一件事是设置一个“后台”任务,该任务重复读取通过 Websocket 传入的进一步更新消息。上面的代码不包含任何事件触发,但是有多种方式可以process_update_message()
可以/可以做到这一点(其中许多都非常简单),允许对象的创建者在其认为合适的时间和方式处理通知。只要对象的创建者继续与 asyncio 良好地配合并参与协作多任务处理,就将继续接收流消息,并且将继续触发任何事件。
完成后,可以按照以下方式建立连接:
async def main():
warehouse1 = Warehouse("wss://warehouse1")
if await warehouse1.start():
... Connection succeeded. Update messages will now be processed
in the "background" provided that other users of the event loop
yield in some way ...
else:
... Connection failed ...
asyncio.run(main())
可以通过多种方式启动多个仓库,包括执行create_task(warehouse.start())
对每一个然后做一个gather
执行任务以确保/检查它们是否都正常。
当需要退出时,为了让 asyncio 保持快乐,并停止它抱怨孤立任务,并让一切正常关闭,有必要调用destroy()
在每个仓库上。
但有一个共同点并未涵盖。扩展上面的原始前提,假设仓库还接受来自我们的 websocket 客户端的请求,例如“将 X 运送到 Y”。对这些请求的成功/失败响应将与一般更新消息一起出现;通常不可能保证请求的 send() 之后的第一个 receive() 将是对该请求的响应。这使情况变得复杂process_update_message()
.
我找到的最佳答案可能被认为是“pythonic”,也可能不被认为是“pythonic”,因为它使用了Future
以一种非常类似于TaskCompletionSource
在.NET中。
让我们发明一些实现细节;任何现实世界的场景都可能是这样的:
- 我们可以在向仓库提交指令时提供request_id
- 来自仓库的成功/失败响应将 request_id 重复返回给我们(因此也区分了命令响应消息与库存更新消息)
第一步是拥有一个字典,将待处理的、正在进行的请求的 ID 映射到Future
对象:
def __init__(self, warehouse_url):
...
self.pending_requests = {}
发送请求的协程的定义如下所示:
async def send_request(self, some_request_definition)
# Allocate a unique ID for the request
request_id = <some unique request id>
# Create a Future for the pending request
request_future = asyncio.Future()
# Store the map of the ID -> Future in the dictionary of pending requests
self.pending_requests[request_id] = request_future
# Build a request message to send to the server, somehow including the request_id
request_msg = <some request definition, including the request_id>
# Send the message
await self.websocket.send(request_msg)
# Wait for the future to complete - we're now asynchronously awaiting
# activity in a separate function
await asyncio.wait_for(command_future, timeout = None)
# Return the result of the Future as the return value of send_request()
return request_future.result()
调用者可以使用如下内容创建请求并等待其异步响应:
some_result = await warehouse.send_request(<some request def>)
使这一切顺利进行的关键是修改和扩展process_update_message()
执行以下操作:
- 区分请求响应与库存更新
- 对于前者,提取请求 ID(我们发明的场景称该 ID 会重复返回给我们)
- 查找待处理的
Future
对于请求
- Do a
set_result()
其值(其值可以是任何值,具体取决于服务器的响应内容)。这释放了send_request()
并导致它的等待得到解决。
例如:
async def process_update_message(self, update_message):
if <some test that update_message is a request response>:
request_id = <extract the request ID repeated back in update_message>
# Get the Future for this request ID
request_future = self.pending_requests[request_id]
# Create some sort of return value for send_request() based on the response
return_value = <some result of the request>
# Complete the Future, causing send_request() to return
request_future.set_result(return_value)
else:
... handle inventory updates as before ...