如何在 FastAPI 中使用 WebSockets 发送和接收 XML 格式的数据?

2023-12-06

我正在尝试与指纹设备进行通信。实际上它通过一个发送数据websocket联系。所以,我想我可以使用以下方式与设备进行通信webscokets。这里我使用FastAPI,但它只接受JSON数据。问题是我需要处理XML数据,但是,我不知道如何发送和接受数据XML format.


FastAPI 不仅可以接受和验证其他类型的数据JSON正如你所说。看看文档。关于XML, as FastAPI实际上是底层的Starlette,你可以使用 Starlette 的Request直接反对阅读请求body作为字节(你可能会发现这个答案也有帮助),并返回custom ResponseXML数据(如果需要)。您可以检查传入的请求是否符合要求Content-Type,如果是,则让它通过;否则,你可以提出HTTPException。下面是一个在客户端使用 Python 请求和普通请求的工作示例HTTP服务器端的端点。

Using HTTP协议

app.py

from fastapi import FastAPI, Response, Request, HTTPException

app = FastAPI()

@app.post("/submit")
async def submit(request: Request):
    content_type = request.headers['Content-Type']
    if content_type == 'application/xml':
        body = await request.body()
        return Response(content=body, media_type="application/xml")
    else:
        raise HTTPException(status_code=400, detail=f'Content type {content_type} not supported')

test.py

import requests

body = """<?xml version='1.0' encoding='utf-8'?><a>б</a>"""
headers = {'Content-Type': 'application/xml'}
url = 'http://127.0.0.1:8000/submit'
r = requests.post(url, data=body.encode('utf-8'), headers=headers)
print(r.content)

在 websocket 中,您可以使用send_bytes() and receive_bytes()对于通信,如中所述Starlette 的文档,允许您发送和接收(字节编码)XML数据也是如此。如果您想对收到的信息进行验证XML数据,看一下this.

Using WebSocket协议

app.py

from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect
import uvicorn

app = FastAPI()

@app.websocket("/ws")
async def get_stream(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            contents = await websocket.receive_bytes()
            print(str(contents, 'utf-8'))
    except WebSocketDisconnect:
        print("Client disconnected")   
 
if __name__ == '__main__':
    uvicorn.run(app, host='127.0.0.1', port=8000)

test.py

import websockets
import asyncio

async def main():
    url = 'ws://127.0.0.1:8000/ws'
    
    async with websockets.connect(url) as ws:
         while True:
            b = bytes("<?xml version='1.0' encoding='utf-8'?><a>б</a>", 'utf-8')
            await ws.send(b)
            await asyncio.sleep(1)
            
asyncio.run(main())
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在 FastAPI 中使用 WebSockets 发送和接收 XML 格式的数据? 的相关文章

随机推荐