mqtt --- 簡易 MQTT 用戶端

mqtt 模組提供一個精簡的 MQTT v3.1.1 用戶端實作,適用於記憶體受限的裝置。它支援連接至代理伺服器(可選用 TLS)、發布訊息、訂閱主題、遺言(last-will)設定以及保持連線(keep-alive)的 ping。

範例:

from mqtt import MQTTClient

def callback(topic, msg):
    print(topic, msg)

client = MQTTClient("openmv", "broker.example.com", 1883)
client.set_callback(callback)
client.connect()
client.subscribe(b"openmv/in")
client.publish(b"openmv/out", b"hello")
while True:
    client.check_msg()

例外

exception mqtt.MQTTException

當代理伺服器拒絕 CONNECT 請求,或 SUBSCRIBE 請求被拒絕時引發。單一引數為代理伺服器提供的數值回傳碼。

類別

class mqtt.MQTTClient(client_id: bytes | str, server: str, port: int, ssl_params: dict | None = None, user: bytes | str | None = None, password: bytes | str | None = None, keepalive: int = 0, callback: callable | None = None)

建構一個 MQTT 用戶端。

引數:

  • client_id -- 傳送給代理伺服器的唯一用戶端識別碼。

  • server -- 代理伺服器主機名稱或 IP 位址(以 socket.getaddrinfo() 解析)。

  • port -- 代理伺服器 TCP 連接埠(一般為純連線的 1883 或 TLS 的 8883)。

  • ssl_params -- 若不為 None,socket 會以 ssl.wrap_socket() 包裝,並將 ssl_params 作為關鍵字引數轉送。傳入 {} 以使用預設值啟用 TLS。

  • user -- 代理伺服器驗證用的選用使用者名稱。若提供,則 password 也必須提供。

  • password -- 與 user 一併使用的選用密碼。

  • keepalive -- 保持連線間隔(以秒為單位,0 表示停用)。必須小於 65536

  • callback -- 針對代理伺服器送達的每個 PUBLISH,以 callback(topic, msg) 形式呼叫的可呼叫物件。也可稍後以 set_callback() 設定。

方法

set_callback(f: callable) None

設定當收到 PUBLISH 訊息時,由 wait_msg()check_msg() 呼叫的回呼函式。該回呼以 f(topic, msg) 形式呼叫,兩個引數皆為 bytes

set_last_will(topic: bytes | str, msg: bytes | str, retain: bool = False, qos: int = 0) None

設定 MQTT 遺言(Last Will and Testament)。若用戶端非正常斷線,代理伺服器會在 topic 上發布 msg。必須在 connect() 之前呼叫。

引數:

  • topic -- 遺言主題(不可為空)。

  • msg -- 遺言酬載。

  • retain -- 若為 True,代理伺服器會將遺言訊息儲存為保留訊息。

  • qos -- 遺言 QoS 等級。必須為 012

connect(clean_session: bool = True, timeout: float = 5.0) int

開啟一個至代理伺服器的 TCP(並可選用 TLS)連線,並傳送 CONNECT 封包。

引數:

  • clean_session -- 若為 True,請求一個乾淨工作階段;否則代理伺服器會恢復任何先前的工作階段狀態。

  • timeout -- 套用至底層 socket 的 socket 逾時(以秒為單位)。

傳回代理伺服器的 session present(工作階段存在)旗標(01)。若代理伺服器傳回非零的 CONNACK 回傳碼,則引發 MQTTException

disconnect() None

傳送 DISCONNECT 封包並關閉底層 socket。

ping() None

傳送 PINGREQ 封包至代理伺服器。若 keepalive 為非零值,應定期呼叫此方法,使代理伺服器不會中斷連線。

publish(topic: bytes | str, msg: bytes | str, retain: bool = False, qos: int = 0) None

msg 發布至 topic

引數:

  • topic -- 要發布至的主題名稱。

  • msg -- 酬載位元組。

  • retain -- 若為 True,指示代理伺服器為新訂閱者保留該訊息。

  • qos -- 服務品質等級。支援 0(發送後不理會)與 1(需確認)。2 未實作,將引發 AssertionError

對於 qos 1,此呼叫會封鎖直到收到對應的 PUBACK。封包總大小必須小於 2097152 位元組。

subscribe(topic: bytes | str, qos: int = 0) None

以指定的 qos 等級訂閱 topic。必須已透過 set_callback() 註冊或於建構子提供回呼函式;否則會引發 AssertionError

封鎖直到收到對應的 SUBACK。若代理伺服器拒絕訂閱(回傳碼 0x80),則引發 MQTTException

wait_msg() int | None

封鎖等待單一傳入的 MQTT 封包並加以處理。PUBLISH 封包會送達已註冊的回呼函式。PINGRESP 封包會被靜默消耗。對於其他控制封包,會傳回原始的第一個位元組。若無資料可用,或處理了 PINGRESP,則傳回 None

check_msg() int | None

wait_msg() 的非封鎖變體。使用 select.select() 輪詢 socket 最多約 50 毫秒;若有待處理資料,則執行與 wait_msg() 相同的處理,否則傳回 None