9.19. Python 中的 MQTT¶
每台連網的 OpenMV cam 都隨附的 mqtt 模組,將 MQTT 線路協定包裝在單一類別 mqtt.MQTTClient 中。這個類別會開啟 TCP socket、執行 CONNECT 交握、封裝與解封裝位元組層級的封包、處理 PINGREQ 保活機制,並將傳入的 PUBLISH 訊息派送給一個回呼函式。應用程式碼則呼叫 connect()、publish()、subscribe() 以及 wait_msg() / check_msg()。
9.19.1. 十五行程式碼寫成的發布者¶
最小而實用的程式就是單次發布。連線、發布一則訊息、然後斷線::
from mqtt import MQTTClient
client = MQTTClient(
client_id='yard-cam',
server='test.mosquitto.org',
port=1883,
)
client.connect()
client.publish(b'yard-cam/motion', b'detected at 14:02', qos=0)
client.disconnect()
test.mosquitto.org 是由 Eclipse Mosquitto 專案營運的公開測試代理伺服器。它在連接埠 1883 上接受不需憑證的純 TCP 連線。請勿將它用於任何正式用途;它沒有任何隱私保證,而且主題命名空間是與網際網路上其他所有測試者共用的。
client_id 在每個代理伺服器連線中都必須是唯一的——代理伺服器會用它來追蹤工作階段。主題與訊息酬載都是位元組;若傳入 str 更方便,用戶端會將它編碼為 UTF-8。
9.19.2. 透過 TLS 連線¶
對於任何超出快速實驗範圍的用途,透過 TLS 進行的 MQTT 只需多一個引數。ssl_params 字典會被轉送給 ssl.wrap_socket(),因此凡是在那裡可用的設定,在這裡也都可用::
import ssl
client = MQTTClient(
client_id='yard-cam',
server='broker.example.com',
port=8883, # TLS-MQTT default port
ssl_params={'server_hostname': 'broker.example.com'},
user='yard-cam',
password=load_token(),
)
連接埠 8883 是 IANA 保留給 TLS-MQTT 的連接埠。server_hostname 會開啟 SNI,讓位於共用 IP 之後的代理伺服器能夠路由到正確的憑證——這與 HTTPS 所用的機制相同。user / password 對應到 CONNECT 封包的使用者名稱/密碼欄位;代理伺服器會決定這些憑證是否授予對特定主題的發布或訂閱權限。
9.19.3. 訂閱與接收¶
若要接收訊息,用戶端需提供一個回呼函式並呼叫 subscribe()。該回呼函式會收到兩個位元組引數,即主題與酬載::
def on_message(topic, msg):
print('received on', topic.decode(), ':', msg.decode())
client = MQTTClient(
client_id='dashboard',
server='test.mosquitto.org',
port=1883,
callback=on_message,
)
client.connect()
client.subscribe(b'yard-cam/motion', qos=0)
while True:
client.wait_msg()
wait_msg() 會阻塞直到有一個 MQTT 封包抵達,將它剖析,若它是已訂閱主題上的 PUBLISH 則呼叫回呼函式,然後返回。已訂閱的回呼函式會從該呼叫內部觸發——這裡沒有背景執行緒。
對於需要持續做其他工作的互動式相機迴圈,check_msg() 是同樣的邏輯但以非阻塞形式呈現。它使用逾時為 50 ms 的 select.select(),若沒有任何待處理的內容就立即返回::
while True:
client.check_msg()
run_frame() # capture + processing
check_motion_threshold()
9.19.4. 乾淨地重新連線¶
任何長時間執行的 MQTT 用戶端都必須處理連線中斷。Wi-Fi 斷線、代理伺服器重新啟動、NAT 逾時,或單純地在沒有流量的情況下超過保活時間,都會結束這個 socket。隨附的用戶端會從察覺到中斷的那次呼叫中引發 OSError(或一個帶有代理伺服器回傳碼的裸例外),而標準做法是使用一個重試迴圈::
import time
def keep_publishing(client, topic, get_message):
while True:
try:
client.connect()
while True:
client.publish(topic, get_message())
time.sleep(5)
except OSError:
print('connection lost, reconnecting in 5s')
time.sleep(5)
除非用戶端在連線時傳入了 clean_session=False,否則訂閱 不會 在重新連線後保留下來,因此內層的 connect 在進入發布迴圈之前,也應重新發出所有的 subscribe() 呼叫。
9.19.5. 遺願(last-will)掛鉤¶
回報狀態的相機,應告訴代理伺服器在連線意外中斷時要代表相機送出什麼訊息。請在 connect() 之前 設定遺願::
client = MQTTClient(
client_id='yard-cam',
server='broker.example.com',
port=8883,
ssl_params={'server_hostname': 'broker.example.com'},
)
client.set_last_will(
b'yard-cam/status',
b'offline',
retain=True,
qos=0,
)
client.connect()
client.publish(b'yard-cam/status', b'online', retain=True)
如此一來,任何訂閱了 yard-cam/status 的儀表板,會在相機連線的當下看到 online,並在代理伺服器察覺相機已斷線時看到 offline。被保留的 offline 訊息會留存在代理伺服器上,因此即使是十分鐘後才連線的儀表板,仍能看到正確的當前狀態。
9.19.6. 何時該選擇 MQTT 而非 HTTP¶
webservers 章節涵蓋了相機作為 HTTP 伺服器、以及在雲端上傳頁面中作為向固定 URL 張貼 JPEG 的 HTTP 用戶端這兩種角色。兩者各有其用武之地。改用 MQTT 的恰當時機是:
同一份資料需要送給好幾個接收方(一個儀表板、一個通知服務、一個錄製器),而相機事先並不知道這份名單。
接收方可能來來去去,而相機不必重新啟動。
相機想要 訂閱——也就是接收來自控制端的命令——這是 HTTP 用戶端在沒有長輪詢、或沒有伺服器向回呼 URL 推送的情況下做不到的。
連線必須以低廉的成本撐過長時間的閒置期。
繼續使用 HTTP 的恰當時機:單一相機、單一伺服器、固定的請求/回應模式,而且本文內容大到無法放進單一 MQTT 主題(透過 MQTT 傳送 JPEG 影格雖然可行,但對代理伺服器很不禮貌;HTTP POST 才是自然的選擇)。
交叉連結:webservers 章節中的雲端上傳頁面展示了「相機 → 雲端封存」的 HTTP 版本。同一問題的 MQTT 版本能讓相機與封存的 URL 解耦,並讓第二個消費者(比方說一個手機警示應用程式)也能接入同一個資料流。