mqtt --- 简易 MQTT 客户端

mqtt 模块提供了一个最小化的 MQTT v3.1.1 客户端实现,适用于内存受限的设备。它支持连接到代理(可选 TLS)、发布消息、订阅主题、遗嘱配置以及保活 ping。

示例:

from mqtt import MQTTClient

def callback(topic, msg):
    print(topic, msg)

client = MQTTClient("openmv", "broker.example.com", 1883)
client.set_callback(callback)
client.connect()
client.subscribe(b"openmv/in")
client.publish(b"openmv/out", b"hello")
while True:
    client.check_msg()

异常

exception mqtt.MQTTException

当代理拒绝 CONNECT 请求或 SUBSCRIBE 请求被拒绝时引发。唯一的参数是代理提供的数字返回码。

class mqtt.MQTTClient(client_id: bytes | str, server: str, port: int, ssl_params: dict | None = None, user: bytes | str | None = None, password: bytes | str | None = None, keepalive: int = 0, callback: callable | None = None)

构造一个 MQTT 客户端。

参数:

  • client_id —— 发送给代理的唯一客户端标识符。

  • server —— 代理的主机名或 IP 地址(用 socket.getaddrinfo() 解析)。

  • port —— 代理的 TCP 端口(通常明文为 1883,TLS 为 8883)。

  • ssl_params —— 如果不为 None,则套接字会用 ssl.wrap_socket() 包装,并将 ssl_params 作为关键字参数转发。传入 {} 以使用默认设置启用 TLS。

  • user —— 用于代理身份验证的可选用户名。如果给出,则还必须提供 password

  • password —— 与 user 一起使用的可选密码。

  • keepalive —— 保活间隔(以秒为单位)(0 表示禁用)。必须小于 65536

  • callback —— 对从代理传递的每个 PUBLISH,以 callback(topic, msg) 形式调用的可调用对象。也可以稍后通过 set_callback() 设置。

方法

set_callback(f: callable) None

设置当收到 PUBLISH 消息时由 wait_msg()check_msg() 调用的回调。回调以 f(topic, msg) 形式调用,两个参数均为 bytes

set_last_will(topic: bytes | str, msg: bytes | str, retain: bool = False, qos: int = 0) None

配置 MQTT 遗嘱(Last Will and Testament)。如果客户端非正常断开连接,代理会在 topic 上发布 msg。必须在 connect() 之前调用。

参数:

  • topic —— 遗嘱主题(必须非空)。

  • msg —— 遗嘱负载。

  • retain —— 如果为 True,代理会将遗嘱消息存储为保留消息。

  • qos —— 遗嘱 QoS 级别。必须为 012

connect(clean_session: bool = True, timeout: float = 5.0) int

打开到代理的 TCP(以及可选的 TLS)连接并发送 CONNECT 数据包。

参数:

  • clean_session —— 如果为 True,请求一个干净的会话;否则代理会恢复任何先前的会话状态。

  • timeout —— 应用于底层套接字的套接字超时(以秒为单位)。

返回代理的 session present(会话存在)标志(01)。如果代理返回非零的 CONNACK 返回码,则引发 MQTTException

disconnect() None

发送一个 DISCONNECT 数据包并关闭底层套接字。

ping() None

向代理发送一个 PINGREQ 数据包。如果 keepalive 非零,则应定期调用此方法,以使代理不会断开连接。

publish(topic: bytes | str, msg: bytes | str, retain: bool = False, qos: int = 0) None

msg 发布到 topic

参数:

  • topic —— 要发布到的主题名称。

  • msg —— 负载字节。

  • retain —— 如果为 True,则指示代理为新订阅者保留该消息。

  • qos —— 服务质量级别。支持 0(即发即弃)和 1(带确认)。2 未实现,将引发 AssertionError

对于 qos1 的情况,调用会阻塞,直到收到匹配的 PUBACK。数据包总大小必须小于 2097152 字节。

subscribe(topic: bytes | str, qos: int = 0) None

以给定的 qos 级别订阅 topic。必须已通过 set_callback() 注册或向构造函数提供了回调;否则将引发 AssertionError

阻塞直到收到匹配的 SUBACK。如果代理拒绝订阅(返回码 0x80),则引发 MQTTException

wait_msg() int | None

阻塞等待单个传入的 MQTT 数据包并对其进行处理。PUBLISH 数据包会被传递给已注册的回调。PINGRESP 数据包会被静默消耗。对于其他控制数据包,返回原始的第一个字节。如果没有可用数据或处理了 PINGRESP,则返回 None

check_msg() int | None

wait_msg() 的非阻塞变体。使用 select.select() 对套接字轮询最多约 50 ms;如果有数据待处理,则执行与 wait_msg() 相同的处理,否则返回 None