mqtt --- 简易 MQTT 客户端¶
mqtt 模块提供了一个最小化的 MQTT v3.1.1 客户端实现,适用于内存受限的设备。它支持连接到代理(可选 TLS)、发布消息、订阅主题、遗嘱配置以及保活 ping。
示例:
from mqtt import MQTTClient
def callback(topic, msg):
print(topic, msg)
client = MQTTClient("openmv", "broker.example.com", 1883)
client.set_callback(callback)
client.connect()
client.subscribe(b"openmv/in")
client.publish(b"openmv/out", b"hello")
while True:
client.check_msg()
异常¶
- exception mqtt.MQTTException¶
当代理拒绝 CONNECT 请求或 SUBSCRIBE 请求被拒绝时引发。唯一的参数是代理提供的数字返回码。
类¶
- class mqtt.MQTTClient(client_id: bytes | str, server: str, port: int, ssl_params: dict | None = None, user: bytes | str | None = None, password: bytes | str | None = None, keepalive: int = 0, callback: callable | None = None)¶
构造一个 MQTT 客户端。
参数:
client_id —— 发送给代理的唯一客户端标识符。
server —— 代理的主机名或 IP 地址(用
socket.getaddrinfo()解析)。port —— 代理的 TCP 端口(通常明文为
1883,TLS 为8883)。ssl_params —— 如果不为
None,则套接字会用ssl.wrap_socket()包装,并将 ssl_params 作为关键字参数转发。传入{}以使用默认设置启用 TLS。user —— 用于代理身份验证的可选用户名。如果给出,则还必须提供 password。
password —— 与 user 一起使用的可选密码。
keepalive —— 保活间隔(以秒为单位)(
0表示禁用)。必须小于65536。callback —— 对从代理传递的每个 PUBLISH,以
callback(topic, msg)形式调用的可调用对象。也可以稍后通过set_callback()设置。
方法¶
- set_callback(f: callable) None¶
设置当收到 PUBLISH 消息时由
wait_msg()和check_msg()调用的回调。回调以f(topic, msg)形式调用,两个参数均为bytes。
- set_last_will(topic: bytes | str, msg: bytes | str, retain: bool = False, qos: int = 0) None¶
配置 MQTT 遗嘱(Last Will and Testament)。如果客户端非正常断开连接,代理会在 topic 上发布 msg。必须在
connect()之前调用。参数:
topic —— 遗嘱主题(必须非空)。
msg —— 遗嘱负载。
retain —— 如果为
True,代理会将遗嘱消息存储为保留消息。qos —— 遗嘱 QoS 级别。必须为
0、1或2。
- connect(clean_session: bool = True, timeout: float = 5.0) int¶
打开到代理的 TCP(以及可选的 TLS)连接并发送 CONNECT 数据包。
参数:
clean_session —— 如果为
True,请求一个干净的会话;否则代理会恢复任何先前的会话状态。timeout —— 应用于底层套接字的套接字超时(以秒为单位)。
返回代理的 session present(会话存在)标志(
0或1)。如果代理返回非零的 CONNACK 返回码,则引发MQTTException。
- publish(topic: bytes | str, msg: bytes | str, retain: bool = False, qos: int = 0) None¶
将 msg 发布到 topic。
参数:
topic —— 要发布到的主题名称。
msg —— 负载字节。
retain —— 如果为
True,则指示代理为新订阅者保留该消息。qos —— 服务质量级别。支持
0(即发即弃)和1(带确认)。2未实现,将引发AssertionError。
对于 qos 为
1的情况,调用会阻塞,直到收到匹配的 PUBACK。数据包总大小必须小于2097152字节。
- subscribe(topic: bytes | str, qos: int = 0) None¶
以给定的 qos 级别订阅 topic。必须已通过
set_callback()注册或向构造函数提供了回调;否则将引发AssertionError。阻塞直到收到匹配的 SUBACK。如果代理拒绝订阅(返回码
0x80),则引发MQTTException。
- wait_msg() int | None¶
阻塞等待单个传入的 MQTT 数据包并对其进行处理。PUBLISH 数据包会被传递给已注册的回调。PINGRESP 数据包会被静默消耗。对于其他控制数据包,返回原始的第一个字节。如果没有可用数据或处理了 PINGRESP,则返回
None。
- check_msg() int | None¶
wait_msg()的非阻塞变体。使用select.select()对套接字轮询最多约 50 ms;如果有数据待处理,则执行与wait_msg()相同的处理,否则返回None。