bilibili-视频演示
注意,这里没有选择umqtt.simple
因为在实时使用过程中有以下几个问题
-
需要while循环,堵塞进程
-
总是在持续一段时间后,尤其是收到遗嘱(断联)信息后,会报错并终止MQTT(这个问题Github有被提到过,没解决)
所以改用了异步无阻塞的micropython-mqtt
这是官方驱动程序的替代品。它已经在以下平台上进行了测试。
-
ESP8266
-
ESP32、ESP32-S2 和 ESP32-S3
-
Pyboard D
-
Arduino Nano Connect
-
Raspberry Pi Pico W
该驱动程序的主要特点是:
-
使用 uasyncio 的应用程序的非阻塞操作。
-
从 WiFi 和代理中断中自动恢复。
-
真正的qos == 1重传操作。
-
改进了 WiFi 范围,因为它可以容忍连接不良。
旧图:在线人数 1

github地址
https://github.com/peterhinch/micropython-mqtt/tree/master
文档地址
https://github.com/peterhinch/micropython-mqtt/blob/master/mqtt_as/README.md
首先需要先部署MQTT服务,本文使用的是EMQX
参考文章
《docker-compose自部署:EMQX设置MQTT》
把mqtt_as.py文件传输到ESP32-S3中
https://github.com/peterhinch/micropython-mqtt/tree/master/mqtt_as

引入依赖
from mqtt_as import MQTTClient, configimport uasyncio as asyncio
设置config配置
需要设置
-
wifi名称
-
wifi密码
-
mqtt的ip地址
config['ssid'] = 'wifi_name'config['wifi_pw'] = 'wifi_password'config['server'] = 'mqtt_ip_addr'
设置收到订阅消息的回调函数
# 监听消息接收def msg_call(topic, msg, retained):print("接收到消息")topic_str=topic.decode()msg_str=msg.decode()print("topic_str",topic_str)print("msg_str",msg_str)
设置订阅主题的函数
async def conn_han(client):await client.subscribe('web', 1)await client.subscribe('weather', 1)
设置config回调函数和订阅主题
config['subs_cb'] = msg_callconfig['connect_coro'] = conn_han
设置mqtt定时刷新消息(间隔几秒)
async def main(client):await client.connect()n = 0while True:await asyncio.sleep(5)print('publish', n)# If WiFi is down the following will pause for the duration.await client.publish('result', '{}'.format(n), qos = 1)n += 1
最后实例化mqtt并开始连接
MQTTClient.DEBUG = True # Optional: print diagnostic messagesclient = MQTTClient(config)try:asyncio.run(main(client))finally:client.close() # Prevent LmacRxBlk:1 errors
控制台提示
Checking WiFi integrity.Got reliable connectionConnecting to broker.Connected to broker.publish 0publish 1publish 2RAM free 8139680 alloc 56800publish 3

修改要发送的订阅主题
client.publish是发送订阅消息
把result改为weather主题,测试是否接收消息
async def main(client):await client.connect()n = 0while True:await asyncio.sleep(5)print('publish', n)# If WiFi is down the following will pause for the duration.await client.publish('weather', '{}'.format(n), qos = 1)n += 1
可以看到接收消息的回调函数收到并执行方法打印到控制台了

网站在线人数
步骤拆解
-
优先检测当前浏览器本地永久存储localStorage中的指定key的数据
-
如果没有,则生成对应的uuid作为mqtt的clientId并存储到localStorage
-
如果有,则动态加载mqttjs的库,并监听加载完成设置回调函数
-
加载mqttjs完成后,根据当前的uuid作为mqtt的clientId建立连接(这样无论打开几个tab,都只会有一个唯一连接)
-
设置mqtt连接成功回调中订阅指定主题和推送消息(如主题为域名)
-
设置mqtt停止连接后回调中推送主题消息(如主题为域名)
-
设置遗嘱消息(Will Message),当用户关闭网页失去连接时,推送主题消息
通过EMQX的API:/api/v5/subscriptions,可以获取当前主题的订阅数
参考文档地址
https://www.emqx.io/docs/zh/v5.0/admin/api.html
https://www.emqx.io/docs/zh/v5.0/admin/api-docs.html

代码如下 online.js
// 生成uuidfunction getUuid() {return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function (c) {var r = (Math.random() * 16) | 0,v = c == 'x' ? r : (r & 0x3) | 0x8;return v.toString(16);});}// 动态加载jsfunction loadScript(src, callback) {var script = document.createElement('script'),head = document.getElementsByTagName('head')[0];script.type = 'text/javascript';script.charset = 'UTF-8';script.src = src;if (script.addEventListener) {script.addEventListener('load', function () {callback();}, false);} else if (script.attachEvent) {script.attachEvent('onreadystatechange', function () {var target = window.event.srcElement;if (target.readyState == 'loaded') {callback();}});}head.appendChild(script);}// 先检测本地存储是否有对应idlet mqtt_uuid = localStorage.getItem('mqtt_uuid');console.log(`mqtt_uuid`, mqtt_uuid)if (!mqtt_uuid) {// 如果没有mqtt_uuid则创建一个新的mqtt_uuid = getUuid()console.log(`mqtt_uuid_res`, mqtt_uuid)// 保存uuid到本地永久存储localStorage.setItem('mqtt_uuid', mqtt_uuid);}// 动态加载mqttjs到页面// 参考地址// https://www.npmjs.com/package/mqttlet mqttJsUrl = `https://xxx.com/mqtt.min.js`loadScript(mqttJsUrl, () => {console.log(`已完成mqtt.js的加载`)// 订阅主题 设置为当前网站let domainStr = ``// 使用本地存储的id建立mqtt连接const client = mqtt.connect('wss://mqtt.com/wss', {clientId: mqtt_uuid,port: `443`,// host:`ip addr`// 设置遗嘱消息will: {// 要发布的主题topic: domainStr,// 要发布的消息payload: `_disconnect`,// 服务质量qos: 2,// 保留标志retain: false}})// 监听连接成功后回调client.on('connect', function () {client.subscribe(domainStr, function (err) {if (!err) {// 订阅成功则给该主题推送一个消息来测试client.publish(domainStr, '_online')}})})// 接收订阅消息后的回调client.on('message', function (topic, message) {// message is Bufferconsole.log(`接收到订阅通知`, message.toString())})// 监听断开连接client.on('disconnect', function () {console.log(`监听断开连接`)client.publish(domainStr, '_disconnect')})})
然后随意在任何一个网页放入上述的online.js文件即可
<script src="online.js"></script>
部署好后,再使用LVGL在ESP32-S3写文案,收到消息后,更新当前在线人数
新增对网页MQTT消息响应,修改实时在线人数
在msg_call添加
其中get_online_num方法是通过emqx http api获取当前订阅主题的连接数
if msg_str=="web_online":print("获取当前在线人数-新创建连接")get_online_num()elif msg_str=="web_disconnect":print("获取当前在线人数——已断开连接")get_online_num()
设置emqx http api方法获取连接数
# 获取网站当前在线人数def get_online_num():print("获取网站当前在线人数")username = 'de0ce9f4ba6bd2xx'password = 'qluJUCfxSYN8c19BSBLihxxfXjZgVQFgwx6DJ'# auth_header = "Basic " + base64.b64encode((username + ":" + password).encode()).decode()# 注意这里get请求只支持http,使用https会报错response = urequests.get('http://domain.com/api/v5/subscriptions',headers ={'Content-Type':'application/json',"Authorization":"Basic ZGUwY2U5ZjiZDRkMHVKVUNmeFNZTjhqTXc5QXBLeWMxOUJTQkxpaHh4ZlhqWmdWUUZnd3g2REo="})parsed = response.json()list_data=parsed["data"]list_online=[]# 遍历当前在线订阅连接数for item in list_data:if item["topic"]=="web":list_online.append(item)# 打印当前在线人数print("len",len(list_online))len_str=str(len(list_online))# label_online_num.set_style_text_font(font_cn, 0) # set the fontlabel_online_num.set_text(""+len_str)
上述的username 和password 需要到emqx后台创建

创建成功
在代码get请求时,auth_header 需要转为base64 (方法见注释)

以下注意事项
-
在浏览器中mqttjs只能使用websocket方法连接,无法使用mqtt
-
如果网站为https,则同样的要使用wss(ssl websocket),需要配置nginx和证书等
-
ESP32-S3尚未测试ssl和非ssl区别,如果ssl有问题,就直接使用ip即可
总结如下
-
使用EMQX自建MQTT服务
-
使用micropython-mqtt设置ESP32-S3 MQ
-
使用MQTTJS库封装一个js文件,可以到任何网站建立MQTT(websocker连接),并使用EMQX的API获取在线人数
-
使用LVGL显示对应的在线人数
往期内容
《MicroPython[ESP32-S3]:LVGL显示GIF+http request显示哔哩哔哩粉丝计数》
《MicroPython[ESP32-S3]:使用lv_font_conv制作中文字体文件然后加载和显示》
《MicroPython[ESP32-S3]:lvgl使用fontTools提取字体子集设置中文手写字体》
END
