如何在 Flask 项目中使用 MQTT

2023-11-02

Flask 是一个使用 Python 编写的轻量级 Web 应用框架,其被称为 “微框架”,因为它使用简单的核心,用扩展增加其他功能,例如:ORM、窗体验证工具、文件上传、各种开放式身份验证技术等。

MQTT 是一种基于发布/订阅模式的 轻量级物联网消息传输协议 ,可以用极少的代码和带宽为联网设备提供实时可靠的消息服务,它广泛应用于物联网、移动互联网、智能硬件、车联网、电力能源等行业。

本文主要介绍如何在 Flask 项目中实现 MQTT 客户端MQTT 服务器的连接、订阅、取消订阅、收发消息等功能。

我们将使用到 Flask-MQTT 客户端库 ,它是一个 Flask 扩展,可以看作一个 paho-mqtt 的装饰器,用于简化 Flask 应用程序中的 MQTT 集成。

项目初始化

本项目使用 Python 3.8 进行开发测试,读者可用如下命令确认 Python 的版本。

$ python3 --version
Python 3.8.2

使用 Pip 安装 Flask-MQTT 库

pip3 install flask-mqtt

Flask-MQTT 使用

本文将使用 EMQ 提供的免费公共 MQTT 服务器,该服务基于 MQTT 云服务 - EMQX Cloud 创建。服务器接入信息如下:

  • Broker: broker.emqx.io
  • TCP Port: 1883
  • Websocket Port: 8083

导入 Flask-MQTT

导入 Flask 库以及 Flask-MQTT 扩展,并创建 Flask 应用

from flask import Flask, request, jsonify
from flask_mqtt import Mqtt

app = Flask(__name__)

配置 Flask-MQTT 扩展

app.config['MQTT_BROKER_URL'] = 'broker.emqx.io'
app.config['MQTT_BROKER_PORT'] = 1883
app.config['MQTT_USERNAME'] = ''  # 当你需要验证用户名和密码时,请设置该项
app.config['MQTT_PASSWORD'] = ''  # 当你需要验证用户名和密码时,请设置该项
app.config['MQTT_KEEPALIVE'] = 5  # 设置心跳时间,单位为秒
app.config['MQTT_TLS_ENABLED'] = False  # 如果你的服务器支持 TLS,请设置为 True
topic = '/flask/mqtt'

mqtt_client = Mqtt(app)

完整的配置项可以参考 Flask-MQTT 配置文档

编写连接回调函数

可以在该回调函数中对 MQTT 连接成功或失败的情况进行处理,本示例将在连接成功后订阅 /flask/mqtt 主题。

@mqtt_client.on_connect()
def handle_connect(client, userdata, flags, rc):
   if rc == 0:
       print('Connected successfully')
       mqtt_client.subscribe(topic) # 订阅主题
   else:
       print('Bad connection. Code:', rc)

编写消息回调函数

该函数将打印 /flask/mqtt 主题接收到的消息。

@mqtt_client.on_message()
def handle_mqtt_message(client, userdata, message):
   data = dict(
       topic=message.topic,
       payload=message.payload.decode()
  )
   print('Received message on topic: {topic} with payload: {payload}'.format(**data))

创建发布消息接口

我们创建一个简单的 POST 接口实现 MQTT 消息发布。

在实际应用中该接口可能需要进行一些更复杂的业务逻辑处理。

@app.route('/publish', methods=['POST'])
def publish_message():
   request_data = request.get_json()
   publish_result = mqtt_client.publish(request_data['topic'], request_data['msg'])
   return jsonify({'code': publish_result[0]})

运行 Flask 应用

if __name__ == '__main__':
   app.run(host='127.0.0.1', port=5000)

当 Flask 应用启动后,MQTT 客户端将会连接到服务器,并且订阅主题 /flask/mqtt

测试

接下来我们使用 MQTT 客户端 - MQTT X 进行连接、订阅、发布测试。

测试消息接收

  1. 在 MQTT X 中创建链接并连接到服务器。

    MQTT X 中创建链接

  2. 在 MQTT X 中向 /flask/mqtt 主题发布消息 Hello from MQTT X

    MQTT X 消息发布

  3. 在 Flask 运行窗口中将能看到 MQTT X 发送的消息。

    Flask 接收 MQTT 消息

测试消息发布接口

  1. 在 MQTT X 中订阅 /flask/mqtt 主题。

    MQTT X 订阅主题

  2. 使用 Postman 调用 /publish 接口:发送消息 Hello from Flask/flask/mqtt 主题。

    Postman 调用发布接口

  3. 在 MQTT X 中将能看到 Flask 发送过来的消息。

    Flask 发布 MQTT 消息

完整代码

from flask import Flask, request, jsonify
from flask_mqtt import Mqtt

app = Flask(__name__)

app.config['MQTT_BROKER_URL'] = 'broker.emqx.io'
app.config['MQTT_BROKER_PORT'] = 1883
app.config['MQTT_USERNAME'] = ''  # 当你需要验证用户名和密码时,请设置该项
app.config['MQTT_PASSWORD'] = ''  # 当你需要验证用户名和密码时,请设置该项
app.config['MQTT_KEEPALIVE'] = 5  # 设置心跳时间,单位为秒
app.config['MQTT_TLS_ENABLED'] = False  # 如果你的服务器支持 TLS,请设置为 True
topic = '/flask/mqtt'

mqtt_client = Mqtt(app)


@mqtt_client.on_connect()
def handle_connect(client, userdata, flags, rc):
   if rc == 0:
       print('Connected successfully')
       mqtt_client.subscribe(topic)
   else:
       print('Bad connection. Code:', rc)


@mqtt_client.on_message()
def handle_mqtt_message(client, userdata, message):
   data = dict(
       topic=message.topic,
       payload=message.payload.decode()
  )
   print('Received message on topic: {topic} with payload: {payload}'.format(**data))


@app.route('/publish', methods=['POST'])
def publish_message():
   request_data = request.get_json()
   publish_result = mqtt_client.publish(request_data['topic'], request_data['msg'])
   return jsonify({'code': publish_result[0]})

if __name__ == '__main__':
   app.run(host='127.0.0.1', port=5000)

注意事项

Flask-MQTT 目前不适合使用多个工作实例,如果您需要使用 geventgunicorn 这样的 WSGI 服务器,请确保只有一个工作实例。

总结

至此,我们使用 Flask-MQTT 完成了简单的 MQTT 客户端,并且可以在 Flask 应用中订阅、发布消息。

版权声明: 本文为 EMQ 原创,转载请注明出处。

原文链接:https://www.emqx.com/zh/blog/how-to-use-mqtt-in-flask

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在 Flask 项目中使用 MQTT 的相关文章

随机推荐

  • SVM如何避免过拟合

    过拟合 Overfitting 表现为在训练数据上模型的预测很准 在未知数据上预测很差 过拟合主要是因为训练数据中的异常点 这些点严重偏离正常位置 我们知道 决定SVM最优分类超平面的恰恰是那些占少数的支持向量 如果支持向量中碰巧存在异常点
  • pyside6中QcommandLinkButton 控件qss有哪些用法和案例

    QCommandLinkButton 是 Qt GUI 框架中的一个功能按钮类 而 Qt Style Sheets QSS 提供了一种强大的方式来自定义 Qt 控件的外观 以下是 QCommandLinkButton 常用的 QSS 用法
  • 比亚迪半导体IPO再生波折:又被中止审核 红杉小米是股东

    雷递网 雷建平 4月1日报道 2022年1月底刚刚过会的比亚迪半导体上市再生波折 于2022年3月31日的审核再度被中止 这不是比亚迪半导体IPO审核第一次被深交所中止 2021年8月8日 因律师北京市天元律师事务所被中国证监会立案调查 比
  • [从零开始学DeepFaceLab-16]: 使用-命令行八大操作步骤-第6步:模型的选择与训练 - 进阶 - SAEHD模型训练参数详解与优化

    目录 前言 第1章 SAEHD模型训练参数详解 1 1 SAEHD参数汇总 默认 1 2 参数详解
  • 手把手教你:解决python UnicodeDecodeError: 'gb2312' codec can't decode问题

    问题 UnicodeDecodeError gb2312 codec can t decode bytes in position 2 3 illegal multibyte sequence 原因 python在做将普通字符串转换为uni
  • 【Github Action】使用ssh-deploy上传文件的小坑

    可以使用这个Github Action上传文件到服务器 https github com easingthemes ssh deploy README中描述了使用方法 name Deploy to Staging server uses e
  • FastAPI从入门到实战(13)——常见配置项

    这一部分的内容主要是一些常见的配置 包括路由 静态文件等 还包括一些路径和文档的修饰器 包括简介 标签参数等内容 配置静态文件 from fastapi import FastAPI from fastapi staticfiles imp
  • jvisualvm ssl远程连接JVM

    jvisualvm 远程ssl连接 文章目录 一 没认证的 JMX连接 不安全 二 SSL证书认证的JMX连接 安全 1 进入生成证书的目录 并执行脚本 2 一键生成密钥脚本 3 服务器端运行jar包时 开启ssl连接 4 客户端远程SSL
  • AR小项目的制作过程(一)

    前段时间一直想着初一个教程 怎么用unity去做一个AR小demo 在做之前先科普一下什么是AR AR技术也被称作是 增强现实 主要是一种将虚拟信息与真实世界巧妙融合的技术 现在光返的运用在很多方面 例如多媒体 3D建模 实时跟踪及注册 智
  • 如何在CentOS 8上使用firewalld设置防火墙

    介绍 Introduction firewalld is firewall management software available for many Linux distributions which acts as a fronten
  • JVM内存结构与内存模型

    JVM内存结构 前言 java开发人员不像C C 开发人员那样需要自己来管理内存 每一个对象从出生到死亡都需要由开发人员来管理 对于初级开发人员来说很容易出现内存问题 而java开发人员就很 幸运 了 内存的管理几乎全部交给JVM虚拟机来管
  • php 操作系统之间的一些黑魔法(绕过文件上传a.php/.)

    http wonderkun cc index html p 626 0x00 前言 做了一个CTF题目 遇到了一些有趣的东西 所以写了这篇文章记录了一下 但是我却不明白造成这个问题的原因在哪里 所以不知道给文章起什么标题 就姑且叫这个非常
  • 汇总

    点击上方 小白学视觉 选择加 星标 或 置顶 重磅干货 第一时间送达 汇总图像语义分割那些质量最好的数据集与常用benchmark数据集 前言 图像语义分割是计算机视觉最经典的任务之一 早期的图像分割主要有以下几种实现方法 基于像素分布的分
  • 4.3、Flink任务怎样读取Kafka中的数据

    目录 1 添加pom依赖 2 API使用说明 3 这是一个完整的入门案例 4 Kafka消息应该如何解析 4 1 只获取Kafka消息的value部分 4 2 获取完整Kafka消息 key value Metadata 4 3 自定义Ka
  • 我读zookeeper源码系列1

    一 准备工作 1 zookeeper版本 01 zookeeper 3 4 x 企业最常用 大数据技术组件最常用 基本维持在 3 4 5 3 4 6 3 4 7 这几个版本 02 zookeeper 3 5 x 03 zookeeper 3
  • MYSQL的系统数据表空间,用户数据表空间,系统临时表空间,用户临时表空间详解

    系统数据表空间 系统表空间可以有一个或多个数据文件 默认情况下 会在数据目录中创建一个名为ibdata1的系统表空间数据文件 系统表空间数据文件的大小和数量由innodb data file path启动选项定义 mysql gt show
  • vue-element-admin教程

    vue element admin 是一个后台前端解决方案 它基于 vue 和 element ui实现 它使用了最新的前端技术栈 内置了 i18 国际化解决方案 动态路由 权限验证 提炼了典型的业务模型 提供了丰富的功能组件 它可以帮助你
  • 个人信贷违约预测代码实战

    本次分享一个数据挖掘实战项目 个人信贷违约预测 项目背景 当今社会 个人信贷业务发展迅速 但同时也会暴露较高的信用风险 信息不对称在金融贷款领域突出 在过去时期借款一方对自身的财务状况 还款能力及还款意愿有着较为全面的掌握 而金融机构不能全
  • c/c++,char型数组转化为int类型

    char型数组转int类型 这几天遇到需要将int等类型转换并保存在char数组中 同时还需要将char数组转换为int等类型进行显示 1 int等类型转换并保存在char数组中 int为4字节 char为1字节 由长变短 容易发出截断 数
  • 如何在 Flask 项目中使用 MQTT

    Flask 是一个使用 Python 编写的轻量级 Web 应用框架 其被称为 微框架 因为它使用简单的核心 用扩展增加其他功能 例如 ORM 窗体验证工具 文件上传 各种开放式身份验证技术等 MQTT 是一种基于发布 订阅模式的 轻量级物