MicroPython[ESP32-S3]:micropython-mqtt设置MQTT连接

bilibili-视频演示

点击本文字,打开哔哩哔哩APP

NO.1
本期功能

注意,这里没有选择umqtt.simple

因为在实时使用过程中有以下几个问题

  1. 需要while循环,堵塞进程

  2. 总是在持续一段时间后,尤其是收到遗嘱(断联)信息后,会报错并终止MQTT(这个问题Github有被提到过,没解决)

所以改用了异步无阻塞的micropython-mqtt

这是官方驱动程序的替代品。它已经在以下平台上进行了测试。

  1. ESP8266

  2. ESP32、ESP32-S2 和 ESP32-S3

  3. Pyboard D

  4. Arduino Nano Connect

  5. Raspberry Pi Pico W

该驱动程序的主要特点是:

  1. 使用 uasyncio 的应用程序的非阻塞操作。

  2. 从 WiFi 和代理中断中自动恢复。

  3. 真正的qos == 1重传操作。

  4. 改进了 WiFi 范围,因为它可以容忍连接不良。

旧图:在线人数 1

img

NO.2
micropython-mqtt

github地址

https://github.com/peterhinch/micropython-mqtt/tree/master

文档地址

https://github.com/peterhinch/micropython-mqtt/blob/master/mqtt_as/README.md

首先需要先部署MQTT服务,本文使用的是EMQX

参考文章

docker-compose自部署:EMQX设置MQTT

把mqtt_as.py文件传输到ESP32-S3中

https://github.com/peterhinch/micropython-mqtt/tree/master/mqtt_as

img

引入依赖

from mqtt_as import MQTTClient, configimport uasyncio as asyncio

设置config配置

需要设置

  1. wifi名称

  2. wifi密码

  3. mqtt的ip地址

config['ssid'] = 'wifi_name' config['wifi_pw'] = 'wifi_password'config['server'] = 'mqtt_ip_addr' 

设置收到订阅消息的回调函数

# 监听消息接收def msg_call(topic, msg, retained):    print("接收到消息")    topic_str=topic.decode()    msg_str=msg.decode()    print("topic_str",topic_str)    print("msg_str",msg_str)

设置订阅主题的函数

async def conn_han(client):    await client.subscribe('web', 1)    await client.subscribe('weather', 1)

设置config回调函数和订阅主题

config['subs_cb'] = msg_callconfig['connect_coro'] = conn_han

设置mqtt定时刷新消息(间隔几秒)

async def main(client):    await client.connect()    n = 0    while True:        await asyncio.sleep(5)        print('publish', n)        # If WiFi is down the following will pause for the duration.        await client.publish('result', '{}'.format(n), qos = 1)        n += 1

最后实例化mqtt并开始连接

MQTTClient.DEBUG = True  # Optional: print diagnostic messagesclient = MQTTClient(config)try:    asyncio.run(main(client))finally:    client.close()  # Prevent LmacRxBlk:1 errors

控制台提示

Checking WiFi integrity.Got reliable connectionConnecting to broker.Connected to broker.publish 0publish 1publish 2RAM free 8139680 alloc 56800publish 3

img

修改要发送的订阅主题

client.publish是发送订阅消息

把result改为weather主题,测试是否接收消息

async def main(client):    await client.connect()    n = 0    while True:        await asyncio.sleep(5)        print('publish', n)        # If WiFi is down the following will pause for the duration.        await client.publish('weather', '{}'.format(n), qos = 1)        n += 1


可以看到接收消息的回调函数收到并执行方法打印到控制台了

img

NO.3
设置网站在线人数

网站在线人数

步骤拆解

  1. 优先检测当前浏览器本地永久存储localStorage中的指定key的数据

  2. 如果没有,则生成对应的uuid作为mqtt的clientId并存储到localStorage

  3. 如果有,则动态加载mqttjs的库,并监听加载完成设置回调函数

  4. 加载mqttjs完成后,根据当前的uuid作为mqtt的clientId建立连接(这样无论打开几个tab,都只会有一个唯一连接)

  5. 设置mqtt连接成功回调中订阅指定主题和推送消息(如主题为域名)

  6. 设置mqtt停止连接后回调中推送主题消息(如主题为域名)

  7. 设置遗嘱消息(Will Message),当用户关闭网页失去连接时,推送主题消息

通过EMQX的API:/api/v5/subscriptions,可以获取当前主题的订阅数

参考文档地址

https://www.emqx.io/docs/zh/v5.0/admin/api.html

https://www.emqx.io/docs/zh/v5.0/admin/api-docs.html

img

代码如下 online.js

// 生成uuidfunction getUuid() {    return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function (c) {        var r = (Math.random() * 16) | 0,            v = c == 'x' ? r : (r & 0x3) | 0x8;        return v.toString(16);    });}// 动态加载jsfunction loadScript(src, callback) {    var script = document.createElement('script'),        head = document.getElementsByTagName('head')[0];    script.type = 'text/javascript';    script.charset = 'UTF-8';    script.src = src;    if (script.addEventListener) {        script.addEventListener('load', function () {            callback();        }, false);    } else if (script.attachEvent) {        script.attachEvent('onreadystatechange', function () {            var target = window.event.srcElement;            if (target.readyState == 'loaded') {                callback();            }        });    }    head.appendChild(script);}// 先检测本地存储是否有对应idlet mqtt_uuid = localStorage.getItem('mqtt_uuid');console.log(`mqtt_uuid`, mqtt_uuid)if (!mqtt_uuid) {    // 如果没有mqtt_uuid则创建一个新的    mqtt_uuid = getUuid()    console.log(`mqtt_uuid_res`, mqtt_uuid)    // 保存uuid到本地永久存储    localStorage.setItem('mqtt_uuid', mqtt_uuid);}// 动态加载mqttjs到页面// 参考地址// https://www.npmjs.com/package/mqttlet mqttJsUrl = `https://xxx.com/mqtt.min.js`loadScript(mqttJsUrl, () => {    console.log(`已完成mqtt.js的加载`)    // 订阅主题 设置为当前网站    let domainStr = ``    // 使用本地存储的id建立mqtt连接    const client = mqtt.connect('wss://mqtt.com/wss', {        clientId: mqtt_uuid,        port: `443`,        // host:`ip addr`        // 设置遗嘱消息        will: {            // 要发布的主题            topic: domainStr,            // 要发布的消息            payload: `_disconnect`,            // 服务质量            qos: 2,            // 保留标志            retain: false        }    })    // 监听连接成功后回调    client.on('connect', function () {
client.subscribe(domainStr, function (err) { if (!err) { // 订阅成功则给该主题推送一个消息来测试 client.publish(domainStr, '_online') } }) }) // 接收订阅消息后的回调 client.on('message', function (topic, message) { // message is Buffer console.log(`接收到订阅通知`, message.toString()) }) // 监听断开连接 client.on('disconnect', function () { console.log(`监听断开连接`) client.publish(domainStr, '_disconnect') })})

然后随意在任何一个网页放入上述的online.js文件即可

<script src="online.js"></script>

部署好后,再使用LVGL在ESP32-S3写文案,收到消息后,更新当前在线人数

NO.4
ESP32-S3接收消息

新增对网页MQTT消息响应,修改实时在线人数

在msg_call添加

其中get_online_num方法是通过emqx http api获取当前订阅主题的连接数

if msg_str=="web_online":        print("获取当前在线人数-新创建连接")        get_online_num()    elif msg_str=="web_disconnect":        print("获取当前在线人数——已断开连接")        get_online_num()

设置emqx http api方法获取连接数

# 获取网站当前在线人数def get_online_num():    print("获取网站当前在线人数")    username = 'de0ce9f4ba6bd2xx'    password = 'qluJUCfxSYN8c19BSBLihxxfXjZgVQFgwx6DJ'    # auth_header = "Basic " + base64.b64encode((username + ":" + password).encode()).decode()    # 注意这里get请求只支持http,使用https会报错    response = urequests.get('http://domain.com/api/v5/subscriptions',headers ={        'Content-Type':'application/json',        "Authorization":"Basic ZGUwY2U5ZjiZDRkMHVKVUNmeFNZTjhqTXc5QXBLeWMxOUJTQkxpaHh4ZlhqWmdWUUZnd3g2REo="        })    parsed = response.json()    list_data=parsed["data"]    list_online=[]    # 遍历当前在线订阅连接数    for item in list_data:        if item["topic"]=="web":            list_online.append(item)    # 打印当前在线人数    print("len",len(list_online))    len_str=str(len(list_online))    # label_online_num.set_style_text_font(font_cn, 0)  # set the font    label_online_num.set_text(""+len_str)

上述的username 和password 需要到emqx后台创建

img

创建成功

在代码get请求时,auth_header 需要转为base64 (方法见注释)

img

NO.5
Tips

以下注意事项

  1. 在浏览器中mqttjs只能使用websocket方法连接,无法使用mqtt

  2. 如果网站为https,则同样的要使用wss(ssl websocket),需要配置nginx和证书等

  3. ESP32-S3尚未测试ssl和非ssl区别,如果ssl有问题,就直接使用ip即可

总结如下

  1. 使用EMQX自建MQTT服务

  2. 使用micropython-mqtt设置ESP32-S3 MQ

  3. 使用MQTTJS库封装一个js文件,可以到任何网站建立MQTT(websocker连接),并使用EMQX的API获取在线人数

  4. 使用LVGL显示对应的在线人数

往期内容

MicroPython[ESP32-S3]:LVGL显示GIF+http request显示哔哩哔哩粉丝计数

MicroPython[ESP32-S3]:使用lv_font_conv制作中文字体文件然后加载和显示

MicroPython[ESP32-S3]:lvgl使用fontTools提取字体子集设置中文手写字体

END