FastAPI 中的 WebSockets - ConnectionClosedOK:收到 1000(正常)

2024-01-12

我有 3 个客户端,他们定期向我的服务器发送数据。我使用的例子来自FastAPI 的文档 https://fastapi.tiangolo.com/advanced/websockets/.

这是我的服务器代码:

class ConnectionManager:
    def __init__(self):
        self.active_connections: list[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def send_response(self, message: dict, websocket: WebSocket):
        await websocket.send_json(message)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)

manager = ConnectionManager()

@app.websocket("/chargeStationState/{client_id}")
async def websocket_endpoint(websocket: WebSocket,
                             client_id: int,
                             db: Session = Depends(deps.get_db)):

    await manager.connect(websocket)
    try:
        while True:
            message = await websocket.receive_json()
            logging.info(message)
          
            ## read data from db
            response = {
                             "stations": "repsonse",
                             "timestamp": int(time.time())
            }
            await manager.send_response(response, websocket)
            #await manager.broadcast(f"Client #{client_id} says: {data}")

    except WebSocketDisconnect:
        manager.disconnect(websocket)

这是客户端代码:

async for websocket in websockets.connect("ws://127.0.0.1:8001/chargeStationState/1"):
            message = {'name':'station1'}
            await websocket.send(json.dumps(message))
            p = await asyncio.wait_for(websocket.recv(), timeout=10)
            print(p)
            await asyncio.sleep(2)

所以,我想要 5 个客户端,他们将与我的服务器通信并发送传感器数据,但 5 分钟后我收到以下错误

websockets.exceptions.ConnectionClosedOK: received 1000 (OK) 

然后接收1000 (OK)并且无法确定问题出在哪里。


测试您提供的代码,似乎无法重现您所指的问题。因此,问题可能出在代码的其他地方。下面根据您问题中提供的代码提供了一个工作示例。相关例子可以找到here https://stackoverflow.com/a/70996841/17865804, 也here https://stackoverflow.com/a/71415451/17865804 and here https://stackoverflow.com/a/71871482/17865804.

工作示例

app.py

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import uvicorn
import time

class ConnectionManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def send_personal_message(self, message: str, websocket: WebSocket):
        await websocket.send_json(message)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_json(message)


app = FastAPI()
manager = ConnectionManager()

@app.websocket('/chargeStationState')
async def websocket_endpoint(websocket: WebSocket):
    await manager.connect(websocket)
    try:
        while True:
            data = await websocket.receive_json()
            response = {'station': data['station'], 'timestamp': time.ctime()}
            await manager.send_personal_message(response, websocket)
    except WebSocketDisconnect:
        manager.disconnect(websocket)

test.py

import websockets
import asyncio
import json

async def main():
    url = 'ws://127.0.0.1:8000/chargeStationState'
    data = json.dumps({'station':'1'})
    
    async for websocket in websockets.connect(url):
        await websocket.send(data)
        print(await asyncio.wait_for(websocket.recv(), timeout=10))
        await asyncio.sleep(2)
        
asyncio.run(main())

test.py(修改)

在之前的test.py代码示例中,每次将数据发送到服务器时,websocket 连接都会关闭,并会建立一个新的连接(使用async for表达)。除非那是最适合您的,否则下面的代码示例将通过使用while里面的循环语句async for环形。这意味着首先建立的(相同的)连接将保持打开状态,除非循环体中发生异常,该异常的处理方式如下connect()达到,以便重新连接下一次迭代(否则,您可能会让异常冒泡并跳出循环)。如果建立连接时发生错误,connect()使用指数退避重试。退避延迟从三秒开始,最多增加一分钟(请参阅相关文件 https://websockets.readthedocs.io/en/stable/reference/client.html#opening-a-connection).

import websockets
import asyncio
import json

async def main():
    url = 'ws://127.0.0.1:8000/chargeStationState'
    data = json.dumps({'station':'1'})
    
    async for websocket in websockets.connect(url):
        try:
            while True:
                await websocket.send(data)
                print(await asyncio.wait_for(websocket.recv(), timeout=10))
                await asyncio.sleep(2)
        except websockets.ConnectionClosed:
            continue
        
asyncio.run(main())
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

FastAPI 中的 WebSockets - ConnectionClosedOK:收到 1000(正常) 的相关文章

  • pip 安装最新的依赖版本

    当我使用安装包时pip install e 它仅安装不满足的依赖项并忽略依赖项升级 如何在每次运行时安装最新的依赖版本pip install e 我尝试过使用pip install upgrade e 但是使用这个选项没有任何改变 我仍然得
  • setColumnStretch 和 setRowStretch 如何工作

    我有一个使用构建的应用程序PySide2它使用setColumnStretch用于柱拉伸和setRowStretch用于行拉伸 它工作得很好 但我无法理解它是如何工作的 我参考了 qt 文档 但它对我没有帮助 我被困在括号内的两个值上 例如
  • Django 营业时间

    我想添加诊所的营业时间 我已经对此进行了调查在 Django 中实现 开放时间 的任何现有解决方案 https stackoverflow com questions 8128143 any existing solution to imp
  • 使用 pyppeteer 与 asyncio 关联来抓取内容

    我用 python 结合编写了一个脚本pyppeteer随着asyncio从其登陆页面抓取不同帖子的链接 并最终通过跟踪通向其内页的 url 来获取每个帖子的标题 我这里解析的内容不是动态的 但是 我利用了pyppeteer and asy
  • 嵌套函数中的变量作用域

    有人可以解释为什么以下程序失败 def g f for in range 10 f def main x 10 def f print x x x 1 g f if name main main 带有消息 Traceback most re
  • 使用 keras 澄清 Yolo v3 模型输出

    我将 yolo v3 模型与 keras 一起使用 该网络为我提供了形状如下的输出容器 1 13 13 255 1 26 26 255 1 52 52 255 所以我找到了这个link https www cyberailab com ho
  • Python:由于 OSError 无法安装软件包:[Errno 2] 没有这样的文件或目录

    我尝试使用pip安装sklearn 并且我收到以下错误消息 错误 由于 OSError 无法安装软件包 Errno 2 没有这样的文件或目录 C Users 13434 AppData Local Packages PythonSoftwa
  • TemplateSyntaxError:“settings_tags”不是有效的标签库

    当我尝试运行此测试用例时 出现此错误 这是在我的 django 应用程序的tests py 中编写的 def test accounts register self self url http royalflag com pk accoun
  • 在 ubuntu 中卸载 python 模块

    我必须删除一个名为 django 的 python 模块 一种流行的模块 因为我安装了错误的版本 1 3 py 2 6 中的 beta 如何卸载这个模块 请解释一下 因为我只在 Windows 中使用过 python 而从未在 Ubuntu
  • 将 numpy 数组及其大小写入二进制文件

    我需要将 2D numpy 数组写入文件 包括其尺寸 以便我可以从 C 程序中读取它并创建相应的数组 我编写了一些简单的代码来保存数组 并且可以从 C 读取它 但是如果我尝试先写入数组的大小 它总是会给我一个错误 这是我的简单 python
  • 如何同时有效地运行多个 Pytorch 进程/模型? Traceback:分页文件太小,无法完成此操作

    背景 我有一个非常小的网络 我想用不同的随机种子进行测试 该网络几乎只使用了我的 GPU 计算能力的 1 因此理论上我可以同时运行 50 个进程来同时尝试许多不同的种子 Problem 不幸的是我什至无法在多个进程中导入 pytorch 当
  • pandas DataFrame 中行的高效成对比较

    我目前正在处理一个较小的数据集 大约 900 万行 不幸的是 大多数条目都是字符串 即使强制类别 框架在内存中也只有几 GB 我想做的是将每一行与其他行进行比较 并对内容进行直接比较 例如 给定 A B C D 0 cat blue old
  • 在 Python 中引发异常的正确方法是什么? [复制]

    这个问题在这里已经有答案了 这是简单的代码 import sys class EmptyArgs StandardError pass if name main The first way to raise an exception if
  • 类型错误:无法连接“str”和“int”对象有人可以帮助新手使用他们的代码吗?

    感谢任何帮助 还有任何重大缺陷或您在格式或基本方面看到的任何重大缺陷 请指出 谢谢 day raw input How many days locations raw input Where to days str day location
  • 从 SUDS 中的 SOAP 响应中提取 Cookie

    我必须使用具有多种服务的 API 所有这些都需要来自下面的身份验证的 JSESSION cookie 然而 当我调用下一个服务时 它不会保留 cookie 因此会拒绝它们 from suds client import Client url
  • 根据标签位置计算 Pandas DataFrame 的索引

    我正在尝试计算标签的索引Pandas https pandas pydata org DataFrame在每一列中 基本上我有以下内容DataFrame d col1 label1 label2 label3 col2 label2 lab
  • 从函数在 python 3 中创建全局变量

    我想知道为什么在函数结束后我无法访问变量 variable for raw data 代码是这样的 def htmlfrom Website URL import urllib request response urllib request
  • model.predict() 返回类而不是概率

    Hello 我是第一次使用 Keras 我训练并保存了一个模型 作为 json 文件及其权重 该模型旨在将图像分为 3 个类别 我的编译方法 model compile loss categorical crossentropy optim
  • 如何通过 API Gateway 使用事件调用类型调用 Lambda 函数?

    文件说 默认情况下 Invoke API 采用 RequestResponse 调用类型 您可以选择通过将 Event 指定为 InitationType 来请求异步执行 因此 我可以发送到我的函数 python 的就是到处都是 Inspi
  • nltk 标记化和缩写

    我用 nltk 对文本进行标记 只是将句子输入到 wordpunct tokenizer 中 这会拆分缩写 例如 don t 到 don t 但我想将它们保留为一个单词 我正在改进我的方法 以实现更精确的文本标记化 因此我需要更深入地研究

随机推荐