9.19. MQTT in Python¶
Das mitgelieferte mqtt-Modul auf jeder netzwerkfähigen OpenMV Cam kapselt das MQTT-Wire-Protokoll in einer einzigen Klasse, mqtt.MQTTClient. Die Klasse öffnet den TCP-Socket, führt den CONNECT-Handshake durch, packt und entpackt die Pakete auf Byte-Ebene, verwaltet das PINGREQ-Keepalive und leitet eingehende PUBLISH-Nachrichten an einen Callback weiter. Anwendungscode ruft connect(), publish(), subscribe() und wait_msg() / check_msg() auf.
9.19.1. Ein Publisher in fünfzehn Zeilen¶
Das kleinste nützliche Programm ist ein einzelnes Publish. Verbinden, eine Nachricht veröffentlichen, trennen:
from mqtt import MQTTClient
client = MQTTClient(
client_id='yard-cam',
server='test.mosquitto.org',
port=1883,
)
client.connect()
client.publish(b'yard-cam/motion', b'detected at 14:02', qos=0)
client.disconnect()
test.mosquitto.org ist der öffentliche Test-Broker, der vom Eclipse-Mosquitto-Projekt betrieben wird. Er akzeptiert reine TCP-Verbindungen auf Port 1883 ohne Anmeldedaten. Verwenden Sie ihn nicht für ernsthafte Zwecke; er bietet keinerlei Datenschutzgarantien, und der Topic-Namensraum wird mit jedem anderen Tester im Internet geteilt.
client_id muss pro Broker-Verbindung eindeutig sein – der Broker verwendet sie, um Sitzungen zu verfolgen. Topics und Nachrichten-Nutzlasten sind Bytes; übergeben Sie str, wenn das praktischer ist, und der Client kodiert ihn als UTF-8.
9.19.2. Verbindung über TLS¶
Für alles, was über schnelle Experimente hinausgeht, ist MQTT über TLS nur ein zusätzliches Argument. Das ssl_params-Dict wird an ssl.wrap_socket() weitergereicht, sodass alles, was dort funktioniert, auch hier funktioniert:
import ssl
client = MQTTClient(
client_id='yard-cam',
server='broker.example.com',
port=8883, # TLS-MQTT default port
ssl_params={'server_hostname': 'broker.example.com'},
user='yard-cam',
password=load_token(),
)
Port 8883 ist der von der IANA reservierte TLS-MQTT-Port. server_hostname aktiviert SNI, sodass Broker hinter einer gemeinsam genutzten IP zum richtigen Zertifikat weiterleiten können – derselbe Mechanismus, den HTTPS verwendet. user / password werden auf die Felder Benutzername/Passwort des CONNECT-Pakets abgebildet; der Broker entscheidet, ob diese Anmeldedaten Publish- oder Subscribe-Rechte für bestimmte Topics gewähren.
9.19.3. Abonnieren und Empfangen¶
Um Nachrichten zu empfangen, stellt ein Client einen Callback bereit und ruft subscribe() auf. Der Callback erhält zwei Byte-Argumente, das Topic und die Nutzlast:
def on_message(topic, msg):
print('received on', topic.decode(), ':', msg.decode())
client = MQTTClient(
client_id='dashboard',
server='test.mosquitto.org',
port=1883,
callback=on_message,
)
client.connect()
client.subscribe(b'yard-cam/motion', qos=0)
while True:
client.wait_msg()
wait_msg() blockiert, bis ein MQTT-Paket eintrifft, parst es, ruft den Callback auf, falls es ein PUBLISH auf einem abonnierten Topic war, und kehrt zurück. Abonnierte Callbacks werden aus diesem Aufruf heraus ausgelöst – es gibt keinen Hintergrund-Thread.
Für eine interaktive Kameraschleife, die weiterhin andere Arbeiten erledigen muss, ist check_msg() dieselbe Logik in nicht-blockierender Form. Es verwendet select.select() mit einem Timeout von 50 ms und kehrt sofort zurück, wenn nichts ansteht:
while True:
client.check_msg()
run_frame() # capture + processing
check_motion_threshold()
9.19.4. Sauber wiederverbinden¶
Jeder langlaufende MQTT-Client muss abgebrochene Verbindungen behandeln. Wi-Fi-Trennungen, Broker-Neustarts, NAT-Timeouts oder einfach das Überschreiten des Keepalive-Intervalls ohne Datenverkehr beenden alle den Socket. Der mitgelieferte Client löst OSError (oder eine schlichte Exception mit dem Rückgabecode des Brokers) aus dem Aufruf aus, der den Abbruch bemerkt hat, und das Standardmuster ist eine Retry-Schleife:
import time
def keep_publishing(client, topic, get_message):
while True:
try:
client.connect()
while True:
client.publish(topic, get_message())
time.sleep(5)
except OSError:
print('connection lost, reconnecting in 5s')
time.sleep(5)
Abonnements werden über Wiederverbindungen hinweg nicht beibehalten, es sei denn, der Client hat beim Verbinden clean_session=False übergeben. Daher sollte das innere connect auch alle subscribe()-Aufrufe erneut absetzen, bevor es in die Publish-Schleife übergeht.
9.19.5. Der Last-Will-Hook¶
Eine Kamera, die ihren Status meldet, sollte dem Broker mitteilen, welche Nachricht er im Namen der Kamera senden soll, falls die Verbindung unerwartet abbricht. Setzen Sie das Will vor connect():
client = MQTTClient(
client_id='yard-cam',
server='broker.example.com',
port=8883,
ssl_params={'server_hostname': 'broker.example.com'},
)
client.set_last_will(
b'yard-cam/status',
b'offline',
retain=True,
qos=0,
)
client.connect()
client.publish(b'yard-cam/status', b'online', retain=True)
Nun sieht jedes Dashboard, das yard-cam/status abonniert hat, online in dem Moment, in dem sich die Kamera verbindet, und offline, sobald der Broker bemerkt, dass die Kamera weggefallen ist. Die beibehaltene offline-Nachricht bleibt auf dem Broker erhalten, sodass ein Dashboard, das sich zehn Minuten später verbindet, immer noch den korrekten aktuellen Zustand sieht.
9.19.6. Wann MQTT statt HTTP zu wählen ist¶
Das Kapitel über Webserver behandelt die Kamera als HTTP-Server und, auf der Cloud-Upload-Seite, als HTTP-Client, der JPEGs an eine feste URL postet. Beides hat seine Berechtigung. Der richtige Zeitpunkt, stattdessen zu MQTT zu greifen:
Dieselben Daten müssen an mehrere Empfänger gehen (ein Dashboard, ein Benachrichtigungsdienst, ein Rekorder), ohne dass die Kamera die Liste im Voraus kennt.
Empfänger können kommen und gehen, ohne dass die Kamera neu startet.
Die Kamera möchte selbst abonnieren – um Befehle von einem Controller zu empfangen –, was ein HTTP-Client ohne Long Polling oder einen Server, der über eine Callback-URL pusht, nicht kann.
Die Verbindung muss lange Leerlaufzeiten kostengünstig überstehen.
Der richtige Zeitpunkt, bei HTTP zu bleiben: eine Kamera, ein Server, ein festes Anfrage/Antwort-Muster mit einem Body, der für ein einzelnes MQTT-Topic zu groß ist (JPEG-Frames über MQTT funktioniert, ist aber unhöflich gegenüber dem Broker; HTTP POST ist die natürliche Wahl).
Querverweis: Die Cloud-Upload-Seite im Kapitel über Webserver zeigt die HTTP-Variante von „Kamera → Cloud-Archiv“. Die MQTT-Variante desselben Problems hält die Kamera von der URL des Archivs entkoppelt und ermöglicht es einem zweiten Konsumenten (etwa einer Handy-Benachrichtigungs-App), denselben Stream anzuzapfen.