Paho MQTT Python客户端常用API、安装与使用

2023-05-16

MQTT(Message Queuing Telemetry Transport)是一种轻量级的即时通信协议,相关介绍可见:MQTT简介。

Paho 是Eclipse的开源 MQTT 客户端项目,提供多种语言的 MQTT 客户端实现,包括 C、C++、C#、Java、Python、JavaScript 等。在Python环境下,Paho MQTT Python客户端由paho-mqtt模块支撑。

安装Paho MQTT Python 客户端

Paho MQTT Python 客户端依赖于Python 2.7.9以上版本和Python 3.5以上版本。本文测试环境为Python 3.7.1。
用pip安装paho-mqtt如下:

pip install paho-mqtt

常用API

paho-mqtt主要由三个模块组成:Client模块、Publish模块和Subscribe模块。Publish模块和Subscribe模块使用相对较少,参数含义也与Client模块的publish和subscribe方法的参数类似,本文限于篇幅原因就不介绍了。

Client的基本使用流程

Client的基本使用流程如下:

  • 创建客户端实例
  • 使用 connect*() 函数之一连接到代理
  • 调用 loop*() 函数之一来维护与代理的网络流量
  • 使用 subscribe() 订阅主题并接收消息
  • 使用 publish() 将消息发布到代理
  • 使用 disconnect() 断开与代理的连接

值得注意的是,Client使用过程中存在许多回调,这些回调可以帮助Client处理各种事件,后面我们将详细介绍。

Client类与方法

(1)Client的构建与重置

Client的构建与重置由以下两个方法承担:

Client(client_id="", clean_session=True, userdata=None, protocol=MQTTv311, transport="tcp")
reinitialise(client_id="", clean_session=True, userdata=None)

具体参数说明如下:


client_id:
连接到代理时使用的唯一客户端 ID 字符串。如果 client_id 为零长度或 None ,则将随机生成一个。在这种情况下,clean_session 参数必须为 True。
clean_session:
确定客户端类型的布尔值。如果为 True,代理将在断开连接时删除有关此客户端的所有信息。如果为 False,则客户端是持久客户端,并且在客户端断开连接时将保留订阅信息和排队消息。

注意,客户端永远不会在断开连接时丢弃自己的传出消息。调用 connect() 或 reconnect() 将导致消息被重新发送。使用 reinitialise() 将客户端重置为其原始状态。

userdata:
作为 userdata 参数传递给回调的任何类型的用户定义数据。稍后可能会使用 user_data_set() 函数对其进行更新。

protocol:
用于此客户端的 MQTT 协议版本。可以是 MQTTv31 或 MQTTv311

transport:
设置为“websockets”以通过 WebSockets 发送 MQTT。保留默认值“tcp”以使用原始 TCP。


使用示例如下:

import paho.mqtt.client as mqtt
# 构建一个Client
mqttc = mqtt.Client()
# 重置一个Client
mqttc.reinitialise()
(2)连接至代理/重新连接/与代理断开连接

相应方法是:

connect(host, port=1883, keepalive=60, bind_address="")
reconnect()
disconnect()

注意,MQTT的本质是一个用以维护客户端与代理之间的长连接的、并且是低消耗的协议。所以,无论对于代理还是对于客户端,他们都需要清楚地知道二者之间的连接是否断开、而且是正常断开还是非正常断开(正常断开(客户端使用disconnect方法)则不需特别操作;非正常断开情况下,客户端需要尝试重新连接,而代理则需要发送遗嘱)。这一点贯穿与MQTT的协议设计与“连接”这一部分的内容。

具体参数说明如下:


host:
远程代理的主机名或 IP 地址
port:
要连接的服务器主机的网络端口。 默认为 1883。请注意,基于 SSL/TLS 的 MQTT 的默认端口为 8883,因此如果您使用 tls_set() 或 tls_set_context(),则可能需要手动提供端口
keepalive:
与代理通信之间允许的最长间隔(以秒为单位)。 如果没有其他消息正在交换,这将控制客户端向代理发送 ping 消息的速率。

需要指出,MQTT协议规定,在 1.5倍的keepalive时间内,如果代理没有收到来自客户端的任何数据包,那么代理将认为它和这个客户端之间的连接已经断开;而如果客户端没有收到来自 代理的任何数据包,那么这个客户端会认为它和代理之间的连接已经断开。为维持正常的连接,如果代理与客户端之间没有其他数据传输,客户端会每隔keepalive时间向代理发送一次ping消息(由loop()来维护)。keepalive的缺省时间是60s。

bind_address:
假设存在多个接口,要将此客户端绑定到的本地网络接口的 IP 地址


使用示例如下:

# 已构建一个Client:mqttc
mqttc.connect("mqtt.eclipseprojects.io") 
# 使用reconnect与disconnect之前必须已经调用过connect
mqttc.reconnect()
mqttc.disconnect()

注意这三个方法是阻塞进程的。

(3)网络回路控制

网络回路控制是客户端对传入与发出数据进行控制的背后驱动力,同时也根据客户端的keepalive设置发送ping消息(心跳报文),以刷新连接。如果不调用网络回路方法,则客户端不会处理传入的网络数据,并且可能无法及时发送传出的网络数据。其作用可由下图表示:
Loop的作用(图转载自Steve's Internet Guide)

相关方法包括:

loop(timeout=1.0, max_packets=1)
loop_start()
loop_stop(force=False)
loop_forever(timeout=1.0, max_packets=1, retry_first_connection=False)

包含三套逻辑,一是循环调用loop()阻塞进程:timeout参数定义了loop()阻塞进程的超时时间,而max_packets 参数已过时,应保持未设置。timeout 不得超过客户端的 keepalive 值,否则客户端将被代理定期断开连接。
用例如下:

mqttc.connect("mqtt.eclipseprojects.io")
while True:
    mqttc.loop(timeout=1.0)
    # do something else

二是使用loop_start()和loop_stop()创建和停止后台线程,以自动调用loop()。这种方式释放了主线程。 loop_start()可以在connect*() 之前或之后调用。 此调用还处理与代理的重新连接。 调用 loop_stop() 停止后台线程。 force 参数当前被忽略。
定期调用以处理网络事件。 此调用在 select() 中等待,直到网络套接字可用于读取或写入(如果合适),然后处理传入/传出数据。 此函数最多阻塞超时秒。
用例如下:

mqttc.connect("mqtt.eclipseprojects.io")
mqttc.loop_start()
# do something else
while True:
    temperature = sensor.blocking_read()
    mqttc.publish("paho/temperature", temperature)

mqttc.loop_stop()

三是使用loop_forever()持续阻塞进程(不需外部循环),连接将维持到客户端调用disconnect()。timeout 和 max_packets 参数已过时,应保持未设置。retry_first_connection=True 使其重试第一次连接。警告:这可能会导致客户端不断连接到不存在的主机而不提示失败的情况。由于主线程被阻塞,其他操作无法在主线程执行,包括disconnect()在内的其他操作都需要通过回调函数来执行。

用例如下:

mqttc.connect("mqtt.eclipseprojects.io")
mqttc.loop_forever(retry_first_connection=False)
(4)订阅/取消订阅

使客户端订阅到一个或多个主题,或从相应主题退订。

subscribe(topic, qos=0)
unsubscribe(topic)

具体参数说明:


topic:
一个字符串,指定要订阅的订阅主题。
qos:
订阅所需的服务质量(quality of service)级别。 默认为 0,可选0,1,2。


用例:

mqttc.subscribe(("my/topic", 1))
mqttc.subscribe([("my/topic", 0), ("another/topic", 2)])
mqttc.unsubscribe("my/topic")
mqttc.unsubscribe(["my/topic", "another/topic"])
(5)发布

发布会使得消息被发送到代理,然后再由代理发送到订阅匹配主题的任何客户端。
相关方法:

publish(topic, payload=None, qos=0, retain=False)

具体参数说明如下:


topic:
消息应该发布到的主题
payload:
要发送的实际消息。 如果没有给出,或者设置为 None 将使用零长度消息。 传递 int 或 float 将导致有效负载转换为表示该数字的字符串。 如果您希望发送真正的 int/float,请使用 struct.pack() 创建您需要的有效负载。
qos:
要使用的服务质量(quality of service)水平,默认为 0,可选0,1,2。
retain:
如果设置为 True,则该消息将被设置为该主题的保留消息。保留消息的作用是使新订阅某个主题的客户端能够收到该主题中上一次发布的消息。


用例如下:

mqttc.publish(topic="my/topic", payload=None, qos=0, retain=True)

回调

回调是在指由某个事件触发相应的处理。七大类回调函数,分别为连接、断开连接、收到消息、发布消息、订阅主题、取消订阅主题、收到日志七类事件提供了处理方法。

(1)连接

on_connect():当代理响应客户端的连接请求时调用。

on_connect(client, userdata, flags, rc)

具体参数说明如下:


client:
此回调的客户端实例
userdata:
在 Client() 或 user_data_set() 中设置的私有用户数据
flags:
代理发送的响应标志
rc:
连接结果


用例:

def on_connect(client, userdata, flags, rc):
    print("Connection returned result: "+connack_string(rc))

mqttc.on_connect = on_connect
...
(2)断开连接

on_disconnect():当客户端与代理断开连接时调用。

on_disconnect(client, userdata, rc)

具体参数说明如下:


client:
此回调的客户端实例
userdata:
在 Client() 或 user_data_set() 中设置的私有用户数据
flags:
代理发送的响应标志
rc:
断开结果


用例:

def on_disconnect(client, userdata, rc):
    if rc != 0:
        print("Unexpected disconnection.")

mqttc.on_disconnect = on_disconnect
...
(3)收到消息

on_message():当收到客户端订阅的主题的消息并且该消息与现有主题过滤器回调(由message_callback_add()定义)不匹配时调用。
message_callback_add():当收到客户端订阅的主题的消息并且该消息与现有主题过滤器回调匹配时调用。
message_callback_remove():删除之前使用message_callback_add()定义的主题过滤器回调

on_message(client, userdata, message)
message_callback_add(sub, callback)
message_callback_remove(sub)

具体参数说明如下:


client:
此回调的客户端实例
userdata:
在 Client() 或 user_data_set() 中设置的私有用户数据
message:
MQTTMessage信息实例,这是一个包含成员 topic、payload、qos、retain 的类。
sub:
特定主题
callback:
定义的callback函数


on_message()用例:

def on_message(client, userdata, message):
    print("Received message '" + str(message.payload) + "' on topic '"
        + message.topic + "' with QoS " + str(message.qos))

mqttc.on_message = on_message
...

另外在举个例子说明下何时使用message_callback_add(),如果客户端订阅了sensors/#系列主题(#为通配符),可能收到消息的主题有sensors/temperature和sensors/humidity,则可以使用message_callback_add()定义两个主题过滤器回调,分别处理这两个主题下收到的消息。

def temperature_callback(client, userdata, message):
    ...

def humidity_callback(client, userdata, message):
    ...

mqttclient.message_callback_add("sensors/temperature", temperature_callback)
mqttc.message_callback_add("sensors/humidity", humidity_callback)

mqttc.subscribe("sensors/#")
(4)发布消息

on_publish():

on_publish(client, userdata, mid)
(5)订阅主题

on_subscribe():当代理响应订阅请求时调用。

on_subscribe(client, userdata, mid, granted_qos)
(6)取消订阅

on_unsubscribe():当代理响应取消订阅请求时调用。

on_unsubscribe(client, userdata, mid)
(7)收到日志

on_log():当客户端有日志信息时调用。 定义该回调可用于调试。 级别变量给出消息的严重性,将是 MQTT_LOG_INFO、MQTT_LOG_NOTICE、MQTT_LOG_WARNING、MQTT_LOG_ERR 和 MQTT_LOG_DEBUG 之一。

on_log(client, userdata, level, buf)

参考资料

Paho Python Client Documentation
Paho MQTT Python GitHub Repository
Steve’s Internet Guide

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Paho MQTT Python客户端常用API、安装与使用 的相关文章

  • 连接到 Azure (Resource Manager) 上的 SQL Server 虚拟机

    概述 本主题介绍如何连接到运行于 Azure 虚拟机的 SQL Server 实例 它介绍了一些常规连接方案 xff0c 并提供了在 Azure VM 中配置 SQL Server 连接的详细步骤 Note Azure 具有用于创建和处理资
  • 网络安全组(NSG)简介

    韩源 xff0c 资深工程师 xff0c 存储和灾备专家 Azure 网络安全解析 作为公有云最重要环节之一 xff0c 网络安全一直是 Azure 的重中之重 在 Azure 中 xff0c 多种安全技术共同构成了立体的网络保护 xff1
  • gnome manjaro设置无法打开

    本文转载自 xff1a https joshtronic com 2018 04 02 unable to open gnome settings on arch linux after gnome upgrade 我经常会写关于主题的博客
  • 手动将经典 VM 从 VHD 迁移到新的 ARM 托管磁盘 VM

    本部分有助于将现有 Azure VM 从经典部署模型迁移到资源管理器部署模型中的托管磁盘 计划迁移到托管磁盘 本部分可帮助你针对 VM 和磁盘类型做出最佳决策 位置 选取 Azure 托管磁盘可用位置 如果要迁移到高级托管磁盘 xff0c
  • 适用于 Azure 虚拟网络的常见 PowerShell 命令

    如果想要创建虚拟机 xff0c 需要创建虚拟网络或了解可在其中添加 VM 的现有虚拟网络 通常情况下 xff0c 创建 VM 时 xff0c 还需考虑创建本文所述资源 有关安装最新版 Azure PowerShell 选择订阅和登录到帐户的
  • 创建包含多个子网的虚拟网络

    本教程介绍如何创建包含独立公共子网和专用子网的基本 Azure 虚拟网络 虚拟网络中的资源可以彼此通信 xff0c 并可以与连接到虚拟网络的其他网络中的资源通信 可在虚拟网络中相同或不同的子网中创建 Azure 资源 xff0c 如虚拟机
  • matplotlib笔记

    文章目录 matplotlib笔记cmap选择cmap创建cmap 子图断点轴 Broken axis 子图大小 坐标轴scale matplotlib笔记 有一个在线使用matplotlib的网址 cmap 选择cmap choose c
  • Fortran pgplot安装

    pgplot 首先确保已经安装了gfortran 以下为linux下安装流程 从这里下载安装包解压tar zxvf pgplot5 2 tar gz到某个目录比如 src pgplot创建一个文件夹xxx pgplot用于安装 xff0c
  • CUDA和Compute Capability

    CUDA Enabled GPUs Cuda支持的GPU 在这个参考包含了GPU的Compute Capacity列表 比如我的笔记本搭载了一块Geforce830m xff0c 查询列表就可以发现如下图 那么这块830M GPU的Comp
  • Javascript笔记

    数据类型 基本类型 primitive value 简单的数据段 xff0c 包括 Undefined Null Boolean Number String初始化只使用2原始字面量形式 xff0c 如果使用new则会创建Object无法加入
  • 前端面试题笔记

    前端面试八股 发现了一个地方包含了很多前端面试八股 1 用户喜好 为了不断优化推荐效果 xff0c 今日头条每天要存储和处理海量数据 假设有这样一种场景 xff1a 我们对用户按照它们的注册时间先后来标号 xff0c 对于一类文章 xff0
  • Matlab:数据写入Excel

    使用xlswrite 可以help xlswrite查看用法 xlswrite filename A xlswrite filename A sheet xlswrite filename A xlRange xlswrite filena
  • python处理FITS 1:astropy介绍与安装

    1 1介绍 astropy是一个开源的python库 xff0c 专门用于处理天文方面的数据 astropy包是Astropy 项目的内核 xff0c 这个项目致力于发展一个鲁棒性较好的伴随子包 xff08 能兼容优秀的astropy这个库
  • 使用sublime编译运行C程序

    1 打开sublime xff0c 找到顶部工具 xff08 Tool xff09 菜单 gt 编译系统 xff08 Build System xff09 gt 新编译系统 xff08 New Build System xff09 xff1
  • python处理FITS文件 2:astropy.io.fits介绍及打开FITS文件

    astropy这个库有很多功能 xff0c 因为本文主要涉及FITS文件 xff0c 因此仅仅使用astropy io fits 1介绍 astropy io fits包提供FITS文件操作的函数接口 xff0c 使得用户可以忽略FITS文
  • python处理FITS 3:处理头文件和数据单元

    1头文件处理 在获得hdul后 xff0c 可以使用两个属性 header data分别获得头文件和数据单元 gt gt gt hdul 61 fits span class hljs built in open span fits ima
  • Django使用pip安装

    1 pip安装 pip是python的包管理器 xff0c 使用这个工具可以很轻松安装各种python库 直接运行 pip install django 然后就可以安装了 1 1安装问题 输入 pip install django 报错 x
  • 内网穿透方式

    ssh 内网中的机器A 需要访问内网中的c 64 C 公网中的机器B xff0c 用户名b 内网中的机器A ssh CNR 7280 C 22 b 64 B 公网中的机器B ssh fCNL 7279 localhost 7280 loca
  • vue笔记

    rollup 专注于JavaScript打包不包含无关代码 对比webpack tree shaking 最开始由rollup实现 xff0c 之后被webpack借鉴配置output format xff0c 选择输出资源的模块形式 xf
  • geant4学习

    文章目录 配置vscode configuration materialgeant4的类及成员函数physicsList选择构建Physics List 粒子粒子类型能量损失重子和离子 杂项getEnergyoptical photon的速

随机推荐

  • C++枚举与字符串转换工具类

    C 43 43 枚举与字符串转换工具类 最近需要一个能够在字符串和枚举值之间互相转换的功能 xff0c 因为C 43 43 没有对枚举值进行遍历 反射之类的操作 xff0c 不像Java那样可以轻松搞定 网上找到一些代码感觉用起来有点不爽
  • iOS 使用xmpp做聊天客户端

    可以号称史上最详细的xmpp做iOS客户端聊天介绍 简介 xff1a XMPP协议是一种基于Socket长连接 以XML格式进行基本信息交换 C S S S多种架构的聊天协议 XMPPServer 基于XMPP协议的服务端 例如eJabbe
  • 基于树莓派的蓝牙出勤追踪系统

    本文介绍一个基于树莓派的蓝牙出勤追踪系统 xff0c 用于记录和监督自己的工作时长情况 代码与安装指引已更新在GitHub上 xff1a 树莓派蓝牙出勤追踪系统 该系统使用树莓派扫描附近的蓝牙或蓝牙低功耗设备 xff0c 以无感方式收集出勤
  • Python的开发环境与实用工具

    Python的各种实用工具 xff0c 大致可以分为包管理 环境管理 编辑相关 xff08 代码补全 snippet等 xff09 调试工具 xff08 集成开发环境 xff09 笔记本构建工具Jupyter 接下来就介绍下我常用的工具吧
  • 更新系统grub

    1 查看分区 grub rescue gt ls 列出磁盘分区 hd0 hd0 msdos9 hd0 msdos8 hd0 msdos7 hd0 msdos6 hd0 msdos5 hd0 msdos2 hd0 msdos1 2 寻找ubu
  • 预训练语言模型综述(一)—— 预训练语言模型及其历史

    本系列文章是笔者以邱锡鹏老师 Pre trained Models for Natural Language Processing A Survey 为主要参考材料所做的关于 预训练语言模型综述 的记录 xff0c 所涉及之素材也包括其他相
  • 在远程服务器上部署JupyterLab 3.0

    近期 xff0c JupyterLab刚刚升级到3 0版本 xff0c 在安装与使用方面都有不小改进 xff0c 加之之前部署在树莓派上时遇到偶尔需要跟服务器之间做些文件交换的情况 xff0c 处理起来还是稍微麻烦了点 xff0c 所以趁着
  • 基于TensorFlow 2.x的一些CNN模块/网络的实现

    开源一些基于TensorFlow 2 x的CNN模块 网络的实现 xff0c 可能不定时更新 仓库链接 xff1a TensorFlow 2 Implementations of CNN Based Networks 目前的实现包括 xff
  • 预训练语言模型综述(二)—— 预训练任务及训练策略

    本系列文章是笔者以邱锡鹏老师 Pre trained Models for Natural Language Processing A Survey 为主要参考材料所做的关于 预训练语言模型综述 的记录 xff0c 所涉及之素材也包括其他相
  • 预训练语言模型综述(三)—— 预训练语言模型的实际使用

    本系列文章是笔者以邱锡鹏老师 Pre trained Models for Natural Language Processing A Survey 为主要参考材料所做的关于 预训练语言模型综述 的记录 xff0c 所涉及之素材也包括其他相
  • scikit-learn算法与API速查表

    出处 xff1a scikit learn官方教程 算法速查表 xff1a scikit learn algorithm cheat sheet 进链接可以点击图上不同算法深入了解 API速查表 xff1a API Reference
  • 人工智能学习清单

    人工智能学习清单 一份人工智能学习清单 xff0c 帮助初学者了解本领域知识框架 xff0c 以及查找优秀学习资源 部分资源分享在GitHub xff0c 欢迎star与贡献 基础知识 1 人工智能 xff1a 了解人工智能的概念 xff0
  • 图神经网络(GNN)简介

    深度学习与图神经网络 近年来 xff0c 人工智能与深度学习在各个领域得到了长足的发展 从最先掀起这轮深度学习浪潮的计算机视觉 xff08 Computer Vision xff09 领域 xff0c 到亦备受关注的自然语言处理 xff08
  • 自变量/解释变量/因变量/响应变量/协变量等变量相关概念探析

    概念探析 一般科学实验主要涉及以下三种变量 xff1a 自变量 独立变量 xff08 independent variable xff09 xff1a 自变量是指在实验中由实验者操作的变量 xff0c 它被认为不会受其他变量的影响 xff0
  • 算法时间复杂度及P、NP、NP-Complete、NP-Hard问题

    算法的时间复杂度 如果某个算法的复杂度可以表示为 O n k O n k O n k
  • geoserver-jms虚拟机集群-外置Broker方式

    该方式使用外置Broker xff0c 网上查询到的大部分都是该种方式 xff0c 但是都存在问题 也有使用内置Broker的方式 xff0c 此时Broker也将是一个集群 一 测试规划 IP 节点名称 部署 修改配置文件 192 168
  • scikit-learn issues - classification metrics can‘t handle a mix of continuous-multioutput ...

    classification metrics can t handle a mix of continuous multioutput and multi label indicator targets 示例场景 xff1a cm 61 c
  • 使用VS Code的代码片(snippets)以及使用Settings Sync插件同步VS Code的配置

    创建Snippets文件 在VS Code可以为每种语言创建Snippets文件 xff1a 打开File gt Preferences gt User Snippets xff0c Existing Snippets区域显示了已经创建的S
  • 实用生产力工具整理

    Access and Control xff1a 用于连接与控制的软件 集成远程控制工具 MobaXterm 10 4 专业版 xff08 Portable xff09 MobaXterm 11 1 专业版 gt 进一步了解MobaXter
  • Paho MQTT Python客户端常用API、安装与使用

    MQTT Message Queuing Telemetry Transport 是一种轻量级的即时通信协议 xff0c 相关介绍可见 xff1a MQTT简介 Paho 是Eclipse的开源 MQTT 客户端项目 xff0c 提供多种语