mqtt --- 簡易 MQTT 用戶端¶
mqtt 模組提供一個精簡的 MQTT v3.1.1 用戶端實作,適用於記憶體受限的裝置。它支援連接至代理伺服器(可選用 TLS)、發布訊息、訂閱主題、遺言(last-will)設定以及保持連線(keep-alive)的 ping。
範例:
from mqtt import MQTTClient
def callback(topic, msg):
print(topic, msg)
client = MQTTClient("openmv", "broker.example.com", 1883)
client.set_callback(callback)
client.connect()
client.subscribe(b"openmv/in")
client.publish(b"openmv/out", b"hello")
while True:
client.check_msg()
例外¶
- exception mqtt.MQTTException¶
當代理伺服器拒絕 CONNECT 請求,或 SUBSCRIBE 請求被拒絕時引發。單一引數為代理伺服器提供的數值回傳碼。
類別¶
- class mqtt.MQTTClient(client_id: bytes | str, server: str, port: int, ssl_params: dict | None = None, user: bytes | str | None = None, password: bytes | str | None = None, keepalive: int = 0, callback: callable | None = None)¶
建構一個 MQTT 用戶端。
引數:
client_id -- 傳送給代理伺服器的唯一用戶端識別碼。
server -- 代理伺服器主機名稱或 IP 位址(以
socket.getaddrinfo()解析)。port -- 代理伺服器 TCP 連接埠(一般為純連線的
1883或 TLS 的8883)。ssl_params -- 若不為
None,socket 會以ssl.wrap_socket()包裝,並將 ssl_params 作為關鍵字引數轉送。傳入{}以使用預設值啟用 TLS。user -- 代理伺服器驗證用的選用使用者名稱。若提供,則 password 也必須提供。
password -- 與 user 一併使用的選用密碼。
keepalive -- 保持連線間隔(以秒為單位,
0表示停用)。必須小於65536。callback -- 針對代理伺服器送達的每個 PUBLISH,以
callback(topic, msg)形式呼叫的可呼叫物件。也可稍後以set_callback()設定。
方法¶
- set_callback(f: callable) None¶
設定當收到 PUBLISH 訊息時,由
wait_msg()與check_msg()呼叫的回呼函式。該回呼以f(topic, msg)形式呼叫,兩個引數皆為bytes。
- set_last_will(topic: bytes | str, msg: bytes | str, retain: bool = False, qos: int = 0) None¶
設定 MQTT 遺言(Last Will and Testament)。若用戶端非正常斷線,代理伺服器會在 topic 上發布 msg。必須在
connect()之前呼叫。引數:
topic -- 遺言主題(不可為空)。
msg -- 遺言酬載。
retain -- 若為
True,代理伺服器會將遺言訊息儲存為保留訊息。qos -- 遺言 QoS 等級。必須為
0、1或2。
- connect(clean_session: bool = True, timeout: float = 5.0) int¶
開啟一個至代理伺服器的 TCP(並可選用 TLS)連線,並傳送 CONNECT 封包。
引數:
clean_session -- 若為
True,請求一個乾淨工作階段;否則代理伺服器會恢復任何先前的工作階段狀態。timeout -- 套用至底層 socket 的 socket 逾時(以秒為單位)。
傳回代理伺服器的 session present(工作階段存在)旗標(
0或1)。若代理伺服器傳回非零的 CONNACK 回傳碼,則引發MQTTException。
- publish(topic: bytes | str, msg: bytes | str, retain: bool = False, qos: int = 0) None¶
將 msg 發布至 topic。
引數:
topic -- 要發布至的主題名稱。
msg -- 酬載位元組。
retain -- 若為
True,指示代理伺服器為新訂閱者保留該訊息。qos -- 服務品質等級。支援
0(發送後不理會)與1(需確認)。2未實作,將引發AssertionError。
對於 qos
1,此呼叫會封鎖直到收到對應的 PUBACK。封包總大小必須小於2097152位元組。
- subscribe(topic: bytes | str, qos: int = 0) None¶
以指定的 qos 等級訂閱 topic。必須已透過
set_callback()註冊或於建構子提供回呼函式;否則會引發AssertionError。封鎖直到收到對應的 SUBACK。若代理伺服器拒絕訂閱(回傳碼
0x80),則引發MQTTException。
- wait_msg() int | None¶
封鎖等待單一傳入的 MQTT 封包並加以處理。PUBLISH 封包會送達已註冊的回呼函式。PINGRESP 封包會被靜默消耗。對於其他控制封包,會傳回原始的第一個位元組。若無資料可用,或處理了 PINGRESP,則傳回
None。
- check_msg() int | None¶
wait_msg()的非封鎖變體。使用select.select()輪詢 socket 最多約 50 毫秒;若有待處理資料,則執行與wait_msg()相同的處理,否則傳回None。