Python websockets,订阅多个频道

2023-12-04

我正在尝试同时连接到多个通道,并通过 python websocket 库从推送 API 接收消息。

考虑下面的代码,您将如何连接到多个通道?该代码是从此处获得并稍加修改的:https://pypi.python.org/pypi/websocket-client

让我困惑的是倒数第二行:ws.on_open = on_open。 on_open 被定义为上面的函数,接受 1 个参数,但在调用该函数时没有传递任何参数,我不记得以前在 python 代码中遇到过这个,所以我不确定这一行到底发生了什么。

如何修改此代码,以便将包含字符串的变量传递给函数 on_open,以便可以指定我想要订阅的 Chanel 的名称?我的主要目标是能够使用多处理库传递多个通道以同时订阅。

我是否可以通过创建多个 ws 对象或一个 ws 对象并使用不同通道作为参数多次调用 on_open 来实现这一目标?

import websocket
import thread
import time
import json

def on_message(ws, message):
    print(message)

def on_error(ws, error):
    print(error)

def on_close(ws):
    print("### closed ###")

def on_open(ws):
    def run(*args):   
        ws.send(json.dumps({'channel':'channel1'}))  
        while True:
            time.sleep(1)
        ws.close()
        print("thread terminating...")

    thread.start_new_thread(run, ())

if __name__ == "__main__":
    websocket.enableTrace(True)
    ws = websocket.WebSocketApp("wss://random.example.com",
                              on_message = on_message,
                              on_error = on_error,
                              on_close = on_close)
    ws.on_open = on_open
    ws.run_forever()

Use partial传递附加参数

from functools import partial

def on_open(ws, channel_name):
    """
    Notice the channel_name parameter
    """

# create a new function with the predefined variable
chan1 = partial(on_open, channel_name='channel 1')
# set the new function as the on_open callback
ws1.on_open = chan1

# do the same for the rest
chan2 = partial(on_open, channel_name='channel 2')
ws2.on_open = chan2

作为旁注,考虑使用Tornado or Crossbar.io (aka autobahn)。这些是正确的异步框架,与线程和多处理相比,它们使 Websocket 开发更简单。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Python websockets,订阅多个频道 的相关文章

  • 行未从树视图复制

    该行未在树视图中复制 我在按行并复制并粘贴到未粘贴的任何地方后制作了弹出复制 The code popup tk Menu tree opportunity tearoff 0 def row copy item tree opportun
  • 在Python3.6中调用C#代码

    由于完全不了解 C 编码 我希望在我的 python 代码中调用 C 函数 我知道有很多关于同一问题的问答 但由于一些奇怪的原因 我无法从示例 python 模块导入简单的 c 类库 以下是我所做的事情 C 类库设置 我使用的是 VS 20
  • 如何确定非阻塞套接字是否真正连接?

    这个问题不仅限于Python 这是一个一般的套接字问题 我有一个非阻塞套接字 想要连接到一台可访问的机器 在另一端 该端口不存在 为什么 select 仍然成功 我预计会超时 sock send 因管道损坏而失败 select 之后如何确定
  • 即使使用 .loc[row_indexer,col_indexer] = value 时也会设置 WithCopyWarning

    这是我的代码中得到的行之一SettingWithCopyWarning value1 Total Population value1 Total Population replace to replace value 4 然后我将其更改为
  • Python - 用逗号分割,跳过括号内的内容

    我需要用逗号分隔字符串 但我对这种情况有一个问题 TEXT EXAMPLE THIS IS A EXAMPLE BUT NOT WORKS FOR ME SECOND THIRD 我想拆分并得到 var 0 TEXT EXAMPLE THI
  • Pandas重置索引未生效[重复]

    这个问题在这里已经有答案了 我不确定我在哪里误入歧途 但我似乎无法重置数据帧上的索引 当我跑步时test head 我得到以下输出 正如您所看到的 数据帧是一个切片 因此索引超出范围 我想做的是重置该数据帧的索引 所以我跑test rese
  • 定义函数后对其进行修饰?

    I think答案是否定的 但我似乎找不到明确的说法 我有以下情况 def decorated function function functools wraps function def my function print Hello s
  • “char”/“character”类型的类型提示

    char 或 character 没有内置的原始类型 因此显然必须使用长度为 1 的字符串 但是为了暗示这一点并暗示它应该被视为一个字符 如何通过类型提示来实现这一点 grade chr A 一种方法可能是使用内置的 chr 函数来表示这一
  • 创建一个类似于 Tkinter 的表

    我希望创建类似于 Tkinter 中的表格的东西 但它不一定是这样的 例如 我想创建标题 Name1 Name2 Value 并在每个标题下面有几个空白行 然后 我希望稍后用我计算的值或名称的字符串值填充这些行 因此是标签 对于 Name2
  • 在径向(树)网络x图中查找末端节点(叶节点)

    给定下图 是否有一种方便的方法来仅获取末端节点 我所说的端节点是指那些具有一个连接边的到节点 我认为这些有时被称为叶节点 G nx DiGraph fromnodes 0 1 1 1 1 1 2 3 4 5 5 5 7 8 9 10 ton
  • 如何在 Python 中将彩色输出打印到终端?

    是否有与 Perl 等效的 Python 语言 print color red print
  • 如何列出 python PDB 中的当前行?

    在 perl 调试器中 如果重复列出离开当前行的代码段 可以通过输入命令返回到当前行 点 我无法使用 python PDB 模块找到任何类似的东西 如果我list如果我自己离开当前行并想再次查看它 似乎我必须记住当前正在执行的行号 对我来说
  • Django 在选择列表更改时创建毫无意义的迁移

    我正在尝试使用可调用创建一个带有选择字段的模型 以便 Django 在选择列表更改时不会创建迁移 如中所述this https stackoverflow com questions 31788450 stop django from cr
  • conda-env list / conda info --envs 如何查找环境?

    我一直在尝试 anaconda miniconda 因为我的用户使用随 miniconda 安装的结构生物学程序 并且作者都没有 A 考虑到可能存在其他 miniconda 应用程序 B 他们的程序将在多用户环境中使用 因此 使用 Arch
  • Airflow Python 单元测试?

    我想为我们的 DAG 添加一些单元测试 但找不到任何单元测试 有 DAG 单元测试框架吗 有一个端到端的测试框架存在 但我猜它已经死了 https issues apache org jira browse AIRFLOW 79 https
  • 导入错误:没有名为 google.auth 的模块

    当我尝试导入时firebase admin in python 2 7我收到错误 导入错误 没有名为 google auth 的模块 这是Docker文件 https github com ammaratef45 Attendance bl
  • Python组合目录中的所有csv文件并按日期时间排序

    我有 2 年的每日数据分成每月文件 我想将所有这些数据合并到一个按日期和时间排序的文件中 我正在使用的代码组合了所有文件 但不按顺序 我正在使用的代码 import pandas as pd import glob os import cs
  • 如何在sphinx中启用数学?

    我在用sphinx http sphinx pocoo org index html与pngmath http sphinx pocoo org ext math html module sphinx ext pngmath扩展来记录我的代
  • 在 Python 模块中使用 InstaLoader

    我正在尝试使用 Instaloader 下载与主题标签相关的照片以进行图像分析 我在GitHub存储库中找到了一个全面的方法 如何在终端中执行它 但是 我需要将脚本集成到Python笔记本中 这是脚本 instaloader no vide
  • 来自 django 教程 was_published_recently.admin_order_field = 'pub_date'

    From Django 教程 https www jetbrains com help pycharm 2017 1 creating and running your first django project html d28041e21

随机推荐