9.19. Python 中的 MQTT

每台联网的 OpenMV cam 都内置了 mqtt 模块,它用一个类 mqtt.MQTTClient 封装了 MQTT 线路协议。该类负责打开 TCP 套接字、执行 CONNECT 握手、打包和解包字节级数据包、处理 PINGREQ 保活,并将传入的 PUBLISH 消息分派给一个回调。应用代码则调用 connect()publish()subscribe() 以及 wait_msg() / check_msg()

9.19.1. 十五行代码实现一个发布者

最小却实用的程序就是一次发布。连接、发布一条消息、断开连接:

from mqtt import MQTTClient

client = MQTTClient(
    client_id='yard-cam',
    server='test.mosquitto.org',
    port=1883,
)
client.connect()
client.publish(b'yard-cam/motion', b'detected at 14:02', qos=0)
client.disconnect()

test.mosquitto.org 是由 Eclipse Mosquitto 项目运行的公共测试代理服务器。它在 1883 端口上接受无凭据的纯 TCP 连接。请勿将其用于任何正经用途;它不提供任何隐私保证,并且其主题命名空间与互联网上所有其他测试者共享。

client_id 在每个代理服务器连接中必须唯一——代理服务器用它来跟踪会话。主题和消息负载都是字节;如果传入 str 更方便,客户端会将其编码为 UTF-8。

9.19.2. 通过 TLS 连接

对于超出快速实验范畴的任何用途,基于 TLS 的 MQTT 只需多一个参数。ssl_params 字典会被转发给 ssl.wrap_socket(),因此在那里能用的参数在这里也都能用:

import ssl

client = MQTTClient(
    client_id='yard-cam',
    server='broker.example.com',
    port=8883,                          # TLS-MQTT default port
    ssl_params={'server_hostname': 'broker.example.com'},
    user='yard-cam',
    password=load_token(),
)

8883 端口是 IANA 为 TLS-MQTT 保留的端口。server_hostname 会启用 SNI,使得位于共享 IP 之后的代理服务器能够路由到正确的证书——这与 HTTPS 所使用的机制相同。user / password 映射到 CONNECT 数据包的用户名/密码字段;代理服务器据此决定这些凭据是否授予对特定主题的发布或订阅权限。

9.19.3. 订阅与接收

要接收消息,客户端需提供一个回调并调用 subscribe()。该回调会接收两个字节类型的参数:主题和负载:

def on_message(topic, msg):
    print('received on', topic.decode(), ':', msg.decode())

client = MQTTClient(
    client_id='dashboard',
    server='test.mosquitto.org',
    port=1883,
    callback=on_message,
)
client.connect()
client.subscribe(b'yard-cam/motion', qos=0)
while True:
    client.wait_msg()

wait_msg() 会阻塞,直到有一个 MQTT 数据包到达,然后解析它,如果它是已订阅主题上的 PUBLISH,就调用回调,最后返回。已订阅的回调是从该次调用内部触发的——并不存在后台线程。

对于需要持续做其他工作的交互式 cam 循环,check_msg() 是同样的逻辑,只是采用非阻塞形式。它使用带 50 ms 超时的 select.select(),如果没有待处理的内容就立即返回:

while True:
    client.check_msg()
    run_frame()                  # capture + processing
    check_motion_threshold()

9.19.4. 干净地重连

任何长期运行的 MQTT 客户端都必须处理连接掉线的情况。Wi-Fi 断开、代理服务器重启、NAT 超时,或者仅仅是在没有流量的情况下超过了保活时间,都会终止套接字。内置客户端会从察觉到掉线的那次调用中抛出 OSError(或带有代理服务器返回码的裸异常),而标准做法是采用一个重试循环:

import time

def keep_publishing(client, topic, get_message):
    while True:
        try:
            client.connect()
            while True:
                client.publish(topic, get_message())
                time.sleep(5)
        except OSError:
            print('connection lost, reconnecting in 5s')
            time.sleep(5)

除非客户端在连接时传入了 clean_session=False,否则订阅不会在重连之间持久保留,因此内层的 connect 也应当在进入发布循环之前重新发起所有的 subscribe() 调用。

9.19.5. 遗嘱钩子

上报状态的 cam 应当告诉代理服务器:万一连接意外中断,要代表 cam 发送什么消息。请在 connect() 之前设置遗嘱:

client = MQTTClient(
    client_id='yard-cam',
    server='broker.example.com',
    port=8883,
    ssl_params={'server_hostname': 'broker.example.com'},
)
client.set_last_will(
    b'yard-cam/status',
    b'offline',
    retain=True,
    qos=0,
)
client.connect()
client.publish(b'yard-cam/status', b'online', retain=True)

如此一来,任何订阅了 yard-cam/status 的仪表板,都会在 cam 连接的那一刻看到 online,并在代理服务器察觉 cam 掉线时看到 offline。被保留的 offline 消息会在代理服务器上持久存在,因此即便是十分钟后才连入的仪表板,仍能看到正确的当前状态。

9.19.6. 何时选择 MQTT 而非 HTTP

Web 服务器一章介绍了 cam 作为 HTTP 服务器,以及在云上传那一页中作为 HTTP 客户端向一个固定 URL 发布 JPEG 的情形。两者各有用武之地。改用 MQTT 的恰当时机是:

  • 同样的数据需要发送给多个监听方(一个仪表板、一个通知服务、一个录制器),而 cam 事先并不知道这份名单。

  • 监听方可能来来去去,而无需重启 cam。

  • cam 希望进行订阅——以便从控制器接收命令——而 HTTP 客户端如果不采用长轮询或让服务器向某个回调 URL 推送,就无法做到这一点。

  • 连接必须以低成本的方式熬过长时间的空闲期。

继续使用 HTTP 的恰当时机:单台 cam、单台服务器、固定的请求/响应模式,且消息体过大、无法塞进单个 MQTT 主题(通过 MQTT 传输 JPEG 帧虽然可行,但对代理服务器很不友好;HTTP POST 才是自然的选择)。

交叉引用:Web 服务器一章中的云上传那一页展示了"cam → 云端归档"的 HTTP 版本。同一问题的 MQTT 版本则让 cam 与归档的 URL 解耦,并允许第二个消费方(比如说一个手机告警应用)接入同一数据流。