MicroPython的MQTT实现(asyncio协程版)

2023-11-05

一、改自:umqtt.simple代码

https://github.com/micropython/micropython-lib/tree/master/umqtt.simple

二、免费MQTT测试服务器

https://www.emqx.cn/mqtt/public-mqtt5-broker
在这里插入图片描述
https://www.emqx.cn/mqtt/mqtt-websocket-toolkit
在这里插入图片描述

三、源代码(lib\umqtt\cw_simple.py)

import ustruct as struct
import uasyncio as asyncio
from ubinascii import hexlify
import ussl as ssl

class MQTTException(Exception):
    pass

class MQTTClient:

    def __init__(self, client_id, server, port=0, user=None, password=None, keepalive=0,
                 ssl=False, ssl_params={}):
        if port == 0:
            port = 8883 if ssl else 1883
        self.client_id = client_id
        self.server = server
        self.port = port
        self.ssl = ssl
        self.ssl_params = ssl_params
        self.pid = 0
        self.cb = None
        self.user = user
        self.pswd = password
        self.keepalive = keepalive
        self.lw_topic = None
        self.lw_msg = None
        self.lw_qos = 0
        self.lw_retain = False
        self.r = None
        self.w = None

    async def _recv_bytes(self, bytes_len):
        bs =  await self.r.read(bytes_len)
        return bs
    
    async def _send_bytes(self, bytes_data, bytes_len=None):
        if bytes_len == None:
            self.w.write(bytes_data)
        else:
            self.w.write(bytes_data[0:bytes_len])
        await self.w.drain()
            
    async def _send_str(self, s):
        await self._send_bytes(struct.pack("!H", len(s)))
        await self._send_bytes(s)

    async def _recv_len(self):
        n = 0
        sh = 0
        while 1:
            bn = await self._recv_bytes(1)
            b  = bn[0]
            n |= (b & 0x7f) << sh
            if not b & 0x80:
                return n
            sh += 7

    def set_callback(self, f):
        self.cb = f

    def set_last_will(self, topic, msg, retain=False, qos=0):
        assert 0 <= qos <= 2
        assert topic
        self.lw_topic = topic
        self.lw_msg = msg
        self.lw_qos = qos
        self.lw_retain = retain

    async def connect(self, clean_session=True):
        self.r, self.w = await asyncio.open_connection(host=self.server, port=self.port)
 
        if self.ssl:
            ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
            ssl_context.check_hostname = False
            ssl_context.load_default_certs()      
            self.r, self.w = await asyncio.open_connection(host=self.server, port=self.port, ssl=ssl_context)
        
        premsg = bytearray(b"\x10\0\0\0\0\0")
        msg = bytearray(b"\x04MQTT\x04\x02\0\0")

        sz = 10 + 2 + len(self.client_id)
        msg[6] = clean_session << 1
        if self.user is not None:
            sz += 2 + len(self.user) + 2 + len(self.pswd)
            msg[6] |= 0xC0
        if self.keepalive:
            assert self.keepalive < 65536
            msg[7] |= self.keepalive >> 8
            msg[8] |= self.keepalive & 0x00FF
        if self.lw_topic:
            sz += 2 + len(self.lw_topic) + 2 + len(self.lw_msg)
            msg[6] |= 0x4 | (self.lw_qos & 0x1) << 3 | (self.lw_qos & 0x2) << 3
            msg[6] |= self.lw_retain << 5

        i = 1
        while sz > 0x7f:
            premsg[i] = (sz & 0x7f) | 0x80
            sz >>= 7
            i += 1
        premsg[i] = sz
        
        await self._send_bytes(premsg, i + 2)
        await self._send_bytes(msg)
        #print(hex(len(msg)), hexlify(msg, ":"))
        await self._send_str(self.client_id)
        if self.lw_topic:
            await self._send_str(self.lw_topic)
            await self._send_str(self.lw_msg)
        if self.user is not None:
            await self._send_str(self.user)
            await self._send_str(self.pswd)

        resp = await self._recv_bytes(4)
        assert resp[0] == 0x20 and resp[1] == 0x02
        if resp[3] != 0:
            raise MQTTException(resp[3])
        
        return resp[2] & 1

    async def disconnect(self):
        await self._send_bytes(b"\xe0\0")
        self.w.close()
        await self.w.wait_closed()

    async def ping(self):
        await self._send_bytes(b"\xc0\0")

    async def publish(self, topic, msg, retain=False, qos=0):
        pkt = bytearray(b"\x30\0\0\0")
        pkt[0] |= qos << 1 | retain
        sz = 2 + len(topic) + len(msg)
        if qos > 0:
            sz += 2
        assert sz < 2097152
        i = 1
        while sz > 0x7f:
            pkt[i] = (sz & 0x7f) | 0x80
            sz >>= 7
            i += 1
        pkt[i] = sz
        #print(hex(len(pkt)), hexlify(pkt, ":"))
        await self._send_bytes(pkt, i + 1)
        await self._send_str(topic)
        if qos > 0:
            self.pid += 1
            pid = self.pid
            struct.pack_into("!H", pkt, 0, pid)
            await self._send_bytes(pkt, 2)
        await self._send_bytes(msg)
        if qos == 1:
            while 1:
                op = await self.wait_msg()
                if op == 0x40:
                    sz = await self._recv_bytes(1)
                    assert sz == b"\x02"
                    rcv_pid = await self._recv_bytes(2)
                    rcv_pid = rcv_pid[0] << 8 | rcv_pid[1]
                    if pid == rcv_pid:
                        return
        elif qos == 2:
            assert 0

    def subscribe(self, topic, qos=0):
        assert self.cb is not None, "Subscribe callback is not set"
        pkt = bytearray(b"\x82\0\0\0")
        self.pid += 1
        struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic) + 1, self.pid)
        #print(hex(len(pkt)), hexlify(pkt, ":"))
        await self._send_bytes(pkt)
        await self._send_str(topic)
        await self._send_bytes(qos.to_bytes(1, "little"))
        while 1:
            op = await self.wait_msg()
            if op == 0x90:
                resp = await self._recv_bytes(4)
                #print(resp)
                assert resp[1] == pkt[2] and resp[2] == pkt[3]
                if resp[3] == 0x80:
                    raise MQTTException(resp[3])
                return

    # Wait for a single incoming MQTT message and process it.
    # Subscribed messages are delivered to a callback previously
    # set by .set_callback() method. Other (internal) MQTT
    # messages processed internally.
    def wait_msg(self):
        res = await self._recv_bytes(1)
        if res is None:
            return None
        if res == b"":
            print("res is empty!")
            #raise OSError(-1)
            return None            
        if res == b"\xd0":  # PINGRESP
            sz_n = await self._recv_bytes(1)
            sz = sz_n[0]
            print('sz = {0}'.format(sz))
            assert sz == 0
            return None
        op = res[0]
        if op & 0xf0 != 0x30:
            return op
        sz = await self._recv_len()
        topic_len = await self._recv_bytes(2)
        topic_len = (topic_len[0] << 8) | topic_len[1]
        topic = await self._recv_bytes(topic_len)
        sz -= topic_len + 2
        if op & 6:
            pid = await self._recv_bytes(2)
            pid = pid[0] << 8 | pid[1]
            sz -= 2
        msg = await self._recv_bytes(sz)
        await self.cb(topic, msg)
        if op & 6 == 2:
            pkt = bytearray(b"\x40\x02\0\0")
            struct.pack_into("!H", pkt, 2, pid)
            await self._send_bytes(pkt)
        elif op & 6 == 4:
            assert 0
        
        return op

四、源代码(test_mqtt.py)

import ujson
import time
import dht
from machine import Pin, ADC, Timer
import uasyncio as asyncio
from umqtt.cw_simple import MQTTClient

class MyIotPrj:
    def __init__(self):
        client_id = "slim_id"
        self.mserver   = 'broker.emqx.io'
        port           = 1883
        self.client = MQTTClient(client_id, self.mserver, 0)
        self.isconn = False

        self.topic_ctl = b'demo_ctl'
        self.topic_sta = b'demo_sta'

    async def sub_callback(self, topic, msg):
        print((topic, msg))

    async def mqtt_main_thread(self):

        try:
            self.client.set_callback(self.sub_callback)

            conn_ret_code = await self.client.connect()
            if conn_ret_code != 0:
                return
                            
            print('conn_ret_code = {0}'.format(conn_ret_code))
            
            await self.client.subscribe(self.topic_ctl)
            print("Connected to %s, subscribed to %s topic" % (self.mserver, self.topic_ctl))
            
            self.isconn = True

            while True:
                await self.client.wait_msg()
                await asyncio.sleep(1)
                print('wait_msg')
        finally:
            if self.client is not None:
                print('off line')
                await self.client.disconnect()
        
        self.isconn = False

    async def mqtt_upload_thread(self):
#        my_dht = dht.DHT11(Pin(14, Pin.IN))
        dht_data = {
            'temperature':0,
            'humidity':0
            }
        
        while True:
            if self.isconn == True:
#                my_dht.measure()
#                dht_data['temperature'] = my_dht.temperature()
#                dht_data['humidity']    = my_dht.humidity()
                print(dht_data)
                await self.client.publish(self.topic_sta, ujson.dumps(dht_data), retain=True)
            
            await asyncio.sleep(1)

        while True:
            if self.isconn == True:
                await self.client.ping()
            await asyncio.sleep(5)
            
def main():
    mip = MyIotPrj()
    
    loop = asyncio.get_event_loop()
    loop.create_task(mip.mqtt_main_thread())
    loop.create_task(mip.mqtt_upload_thread())
    loop.run_forever()
 
if __name__ == '__main__':
    main()
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

MicroPython的MQTT实现(asyncio协程版) 的相关文章

随机推荐

  • 《python数据分析与挖掘实战》笔记第4章

    文章目录 第4章 数据预处理 4 1 数据清洗 4 1 1 缺失值处理 4 1 1 异常值处理 4 2 数据集成 4 2 1 实体识别 4 2 2 冗余属性识别 4 3 数据变换 4 3 1 简单函数变换 4 3 2 规范化 4 3 3 连
  • HDS存储链路的一个问题

    新上线的HDS AMS1000 由于光纤交换机还未到位 于是先采用两块光纤卡与存储直连的方式连接 安装配置时一切正常 第二天回到公司 进行性能测试时 发现dd copy的速度只能到100M s的速度 真是晕死 more 于是开始查原因 在d
  • 使用公式与格式控制Excel快速实现计划甘特图

    项目中都会遇到做任务计划的需求 有的客户要求需要有甘特图的形式 本文介绍如何使用excel 单元格实现甘特图显示 调整任务时间自动填充单元格填色实现甘特图效果 废话不多说 先看效果 准备工作先创建两列开始时间与完成时间 这样有一个时间区间了
  • 共享库的兼容性

    1 共享库的更新 1 由于Bug的修正 新功能的增加 性能的提升 共享库需要不断的更新版本 2 共享库的更新可以被分为两类 兼容性更新和不兼容更新 3 不兼容更新 共享库改变了原有的接口 使用该共享库原有接口的程序可能不能运行或者运行不正常
  • java连接sqlserver2005 tcp 有时超时,sqlserver 2005连接超时采用bat命令解决

    将以下内容保存为 openSql bat 双击运行即可 echo SQL Server Ports echo Enabling SQLServer default instance port 1433 netsh firewall set
  • MoveIt编程实现笛卡尔空间机械臂运动

    前两篇文章是关于在关节空间中进行机械臂的运动控制 MoveIt简单编程实现关节空间机械臂运动 逆运动学 MoveIt简单编程实现关节空间机械臂运动 正运动学 通过对关节空间下的机器人6个轴进行控制 每个轴的变化都是通过插补进行完成运动 六个
  • vue从node服务器获取文件,前端全栈入门(1)Vue+nodejs(express) 实现文件上传

    Vue Nodejs 实现图片上传 图片上传在平时的业务开发中应该算是个标配的需求 这里分享下使用Vue和Nodejs简单的实现下该需求 流程图 简单画一个 使用 vue cli 创建一个最简单的 vue 项目 确保 vue cli 是最新
  • 2019,那些属于飞桨的重要时刻

    2019已经悄然落幕 在过去一年中 飞桨加速崛起 在产品性能上高效迭代 并屡次斩获多项大奖 其取得的成绩有目共睹 这也意味着飞桨正领衔中国深度学习框架迎来高光时刻 现在 让我们一起来回顾一下2019关于飞桨的那些重要记忆 01 核心发布 一
  • CPU的原理

    其实CPU主要就是做运算 那么运算的本质其实就是加减乘除 也就是说你在电脑上做得任何操作 对于计算机来说 都是加减乘除 晶体管的组成 首先CPU有个很重要的电子元件 晶体管 晶体管由半导体组成 大概长下面这样 我们把1称为输入端 3称为输出
  • dorabox靶场writeup

    靶场搭建 这里我直接采用Windows下的wamp集成环境 直接将靶场源码下载到本地 解压到网站根目录即可 网站源码下载地址 https github com Acmesec DoraBox 数据库的配置 修改conn php的数据库连接账
  • c语言字 字符串转换成数组_C语言基础--数组

    数组的定义 为什么要使用数组 单一的变量很难满足需求 这个时候多变量的集合数组出现了 数组的定义 类型 数组名 数组大小 int 类型可以是int double float char 这些基本数据类型中的任意一种 数组的初始化 int 数组
  • Ubuntu LLVM-CLang安装以及Helloworld pass

    1 LLVM project安装 参考https github com llvm llvm project以及https github com whjthu llvm examples 1 Ubuntu版本为 Linux version 5
  • 【Java线程】线程池的创建

    线程池装配类 线程池优雅停机控制 Configuration public class FeedBackExecutorConfig implements ApplicationListener
  • UE4-(蓝图)第二十一课射线

    一 射线节点 LineTraceByChannel 沿给定的线执行碰撞追踪 并返回首个阻挡命中 只返回对特定追踪通道响应的对象 start 射线起始点 end 射线结束点 Trace Channel 检测通道 Trace Complex 针
  • LINUX OS ANT一键安装制作 JAVA WEB + TOMCAT7.0 RPM 安装包

    LINUX OS ANT一键安装制作 JAVA WEB TOMCAT7 0 RPM 安装包 待续
  • c++程序分析

    题目 输入n个学生的五门课成绩 输出学生的成绩 成绩总分和各科成绩的平均分 程序代码 include
  • 不到 20 人的 IT 公司该去吗?

    今天早上在知乎看到一个挺有意思的话题 不到 20 人的 IT 公司该去吗 图片 回答区有一位老哥分享了自己在一个20 来人的小公司的奇葩工作经历 分享一下 下面是正文 刚到西安有幸加入了一个 20 来人的 it 公司 本来是不想去那种小公司
  • Log4j2入门(1)-控制台日志输出

    Log4j2入门 1 控制台日志输出 Log4j2入门1 控制台日志输出 1引入依赖 2构思需求 3Log4j简单的配置文件 4编写demo 刚学习log4j2日志 对于里面的root和logger的继承关系比较迷惑 遂有此文 使用控制台输
  • 1432. 改变一个整数能得到的最大差值

    题目 给你一个整数 num 你可以对它进行如下步骤恰好 两次 选择一个数字 x 0 lt x lt 9 选择另一个数字 y 0 lt y lt 9 数字 y 可以等于 x 将 num 中所有出现 x 的数位都用 y 替换 得到的新的整数 不
  • MicroPython的MQTT实现(asyncio协程版)

    一 改自 umqtt simple代码 https github com micropython micropython lib tree master umqtt simple 二 免费MQTT测试服务器 https www emqx c