9.19. MQTT in Python¶
De meegeleverde mqtt-module op elke OpenMV-cam met netwerk verpakt het MQTT-wire-protocol in één klasse, mqtt.MQTTClient. De klasse opent de TCP-socket, voert de CONNECT-handshake uit, pakt de byte-niveau-pakketten in en uit, handelt PINGREQ-keepalive af en verdeelt binnenkomende PUBLISH-berichten naar een callback. Applicatiecode roept connect(), publish(), subscribe() en wait_msg() / check_msg() aan.
9.19.1. Een publisher in vijftien regels¶
Het kleinste nuttige programma is een enkele publish. Verbinden, één bericht publiceren, verbreken:
from mqtt import MQTTClient
client = MQTTClient(
client_id='yard-cam',
server='test.mosquitto.org',
port=1883,
)
client.connect()
client.publish(b'yard-cam/motion', b'detected at 14:02', qos=0)
client.disconnect()
test.mosquitto.org is de openbare testbroker die door het Eclipse Mosquitto-project wordt beheerd. Hij accepteert platte-TCP-verbindingen op poort 1883 zonder inloggegevens. Gebruik hem niet voor iets serieus; hij biedt geen privacygaranties en de topic-namespace wordt gedeeld met elke andere tester op het internet.
client_id moet uniek zijn per brokerverbinding – de broker gebruikt het om sessies bij te houden. Topics en berichtpayloads zijn bytes; geef een str door als dat handiger is, dan codeert de client die als UTF-8.
9.19.2. Verbinden via TLS¶
Voor alles voorbij snelle experimenten is MQTT over TLS één extra argument. De ssl_params-dict wordt doorgegeven aan ssl.wrap_socket(), dus alles wat daar werkt, werkt hier ook:
import ssl
client = MQTTClient(
client_id='yard-cam',
server='broker.example.com',
port=8883, # TLS-MQTT default port
ssl_params={'server_hostname': 'broker.example.com'},
user='yard-cam',
password=load_token(),
)
Poort 8883 is de door IANA gereserveerde TLS-MQTT-poort. server_hostname schakelt SNI in zodat brokers achter een gedeeld IP-adres naar het juiste certificaat kunnen routeren – hetzelfde mechanisme dat HTTPS gebruikt. user / password worden afgebeeld op de gebruikersnaam-/wachtwoordvelden van het CONNECT-pakket; de broker beslist of die inloggegevens publish- of subscribe-rechten verlenen voor specifieke topics.
9.19.3. Abonneren en ontvangen¶
Om berichten te ontvangen levert een client een callback en roept subscribe() aan. De callback ontvangt twee bytes-argumenten, het topic en de payload:
def on_message(topic, msg):
print('received on', topic.decode(), ':', msg.decode())
client = MQTTClient(
client_id='dashboard',
server='test.mosquitto.org',
port=1883,
callback=on_message,
)
client.connect()
client.subscribe(b'yard-cam/motion', qos=0)
while True:
client.wait_msg()
wait_msg() blokkeert totdat er één MQTT-pakket binnenkomt, parseert het, roept de callback aan als het een PUBLISH op een geabonneerd topic was, en keert terug. Geabonneerde callbacks worden vanuit die aanroep geactiveerd – er is geen achtergrondthread.
Voor een interactieve cam-loop die ander werk moet blijven doen, is check_msg() dezelfde logica in niet-blokkerende vorm. Hij gebruikt select.select() met een time-out van 50 ms en keert onmiddellijk terug als er niets in behandeling is:
while True:
client.check_msg()
run_frame() # capture + processing
check_motion_threshold()
9.19.4. Netjes opnieuw verbinden¶
Elke langlopende MQTT-client moet verbroken verbindingen kunnen afhandelen. Wi-Fi-verbindingen die wegvallen, broker-herstarts, NAT-time-outs, of simpelweg de keepalive overschrijden zonder verkeer beëindigen allemaal de socket. De meegeleverde client genereert een OSError (of een kale exception met de returncode van de broker) vanuit de aanroep die de onderbreking opmerkte, en het standaardpatroon is een retry-loop:
import time
def keep_publishing(client, topic, get_message):
while True:
try:
client.connect()
while True:
client.publish(topic, get_message())
time.sleep(5)
except OSError:
print('connection lost, reconnecting in 5s')
time.sleep(5)
Abonnementen blijven niet behouden over herverbindingen heen tenzij de client clean_session=False heeft doorgegeven bij het verbinden, dus de binnenste connect zou ook eventuele subscribe()-aanroepen opnieuw moeten uitvoeren voordat hij in de publish-loop terechtkomt.
9.19.5. De last-will-hook¶
Een cam die status rapporteert, zou de broker moeten vertellen welk bericht hij namens de cam moet verzenden als de verbinding onverwacht wegvalt. Stel de will in vóór connect()
client = MQTTClient(
client_id='yard-cam',
server='broker.example.com',
port=8883,
ssl_params={'server_hostname': 'broker.example.com'},
)
client.set_last_will(
b'yard-cam/status',
b'offline',
retain=True,
qos=0,
)
client.connect()
client.publish(b'yard-cam/status', b'online', retain=True)
Nu ziet elk dashboard dat geabonneerd is op yard-cam/status de waarde online op het moment dat de cam verbinding maakt, en offline zodra de broker merkt dat de cam is weggevallen. Het bewaarde offline-bericht blijft op de broker behouden zodat een dashboard dat tien minuten later verbinding maakt nog steeds de juiste actuele status ziet.
9.19.6. Wanneer kies je MQTT boven HTTP¶
Het webservers-hoofdstuk behandelt de cam die als HTTP-server fungeert en, op de cloud-uploadpagina, als HTTP-client die JPEG’s naar een vaste URL post. Beide hebben hun plek. Het juiste moment om in plaats daarvan naar MQTT te grijpen:
Dezelfde gegevens moeten naar meerdere luisteraars (een dashboard, een notificatieservice, een recorder) zonder dat de cam de lijst van tevoren kent.
Luisteraars kunnen komen en gaan zonder dat de cam opnieuw opstart.
De cam wil abonneren – om opdrachten van een controller te ontvangen – wat een HTTP-client niet kan zonder long polling of een server die naar een callback-URL pusht.
De verbinding moet lange inactieve periodes goedkoop overleven.
Het juiste moment om bij HTTP te blijven: één cam, één server, een vast request/response-patroon met een body die te groot is voor één enkel MQTT-topic (JPEG-frames over MQTT werkt wel maar is onbeleefd tegen de broker; HTTP POST is de natuurlijke keuze).
Kruisverwijzing: de cloud-uploadpagina in het webservers-hoofdstuk toont de HTTP-versie van “cam → cloudarchief”. De MQTT-versie van hetzelfde probleem houdt de cam losgekoppeld van de URL van het archief en laat een tweede consument (bijvoorbeeld een telefoonmeldings-app) dezelfde stream aftappen.