9.19. MQTT în Python

Modulul mqtt inclus pe fiecare cameră OpenMV conectată la rețea încapsulează protocolul de comunicație MQTT într-o singură clasă, mqtt.MQTTClient. Clasa deschide socketul TCP, efectuează strângerea de mână CONNECT, împachetează și despachetează pachetele la nivel de octet, gestionează păstrarea în viață prin PINGREQ și distribuie mesajele PUBLISH primite către o funcție de retroapelare (callback). Codul aplicației apelează connect(), publish(), subscribe() și wait_msg() / check_msg().

9.19.1. Un publisher în cincisprezece linii

Cel mai mic program util este o singură publicare. Conectează-te, publică un mesaj, deconectează-te:

from mqtt import MQTTClient

client = MQTTClient(
    client_id='yard-cam',
    server='test.mosquitto.org',
    port=1883,
)
client.connect()
client.publish(b'yard-cam/motion', b'detected at 14:02', qos=0)
client.disconnect()

test.mosquitto.org este brokerul public de test administrat de proiectul Eclipse Mosquitto. Acceptă conexiuni TCP simple pe portul 1883 fără credențiale. Nu îl folosi pentru nimic serios; nu oferă garanții de confidențialitate, iar spațiul de nume al subiectelor este partajat cu orice alt testator de pe internet.

client_id trebuie să fie unic per conexiune la broker – brokerul îl folosește pentru a urmări sesiunile. Subiectele și sarcinile utile ale mesajelor sunt octeți; transmite str dacă este mai convenabil, iar clientul îl va codifica drept UTF-8.

9.19.2. Conectarea prin TLS

Pentru orice depășește experimentele rapide, MQTT peste TLS înseamnă un singur argument în plus. Dicționarul ssl_params este transmis mai departe către ssl.wrap_socket(), așa că orice funcționează acolo funcționează și aici:

import ssl

client = MQTTClient(
    client_id='yard-cam',
    server='broker.example.com',
    port=8883,                          # TLS-MQTT default port
    ssl_params={'server_hostname': 'broker.example.com'},
    user='yard-cam',
    password=load_token(),
)

Portul 8883 este portul TLS-MQTT rezervat de IANA. server_hostname activează SNI, astfel încât brokerele aflate în spatele unui IP partajat să poată direcționa către certificatul corect – același mecanism pe care îl folosește HTTPS. user / password se mapează la câmpurile de nume de utilizator/parolă ale pachetului CONNECT; brokerul decide dacă acele credențiale acordă drepturi de publicare sau de abonare la subiecte specifice.

9.19.3. Abonarea și recepționarea

Pentru a primi mesaje, un client furnizează o funcție de retroapelare (callback) și apelează subscribe(). Funcția de retroapelare primește două argumente de tip bytes, subiectul și sarcina utilă:

def on_message(topic, msg):
    print('received on', topic.decode(), ':', msg.decode())

client = MQTTClient(
    client_id='dashboard',
    server='test.mosquitto.org',
    port=1883,
    callback=on_message,
)
client.connect()
client.subscribe(b'yard-cam/motion', qos=0)
while True:
    client.wait_msg()

wait_msg() se blochează până când sosește un pachet MQTT, îl analizează, apelează funcția de retroapelare dacă a fost un PUBLISH pe un subiect abonat și revine. Funcțiile de retroapelare abonate se declanșează din interiorul acelui apel – nu există niciun fir de execuție în fundal.

Pentru o buclă interactivă de cameră care trebuie să continue să facă alte lucruri, check_msg() este aceeași logică în formă neblocantă. Folosește select.select() cu un timeout de 50 ms și revine imediat dacă nu este nimic în așteptare:

while True:
    client.check_msg()
    run_frame()                  # capture + processing
    check_motion_threshold()

9.19.4. Reconectarea în mod curat

Orice client MQTT de lungă durată trebuie să gestioneze conexiunile întrerupte. Deconectările Wi-Fi, repornirile brokerului, timeout-urile NAT sau pur și simplu depășirea intervalului de păstrare în viață fără trafic toate închid socketul. Clientul inclus ridică OSError (sau o excepție simplă cu codul de retur al brokerului) din apelul care a observat întreruperea, iar tiparul standard este o buclă de reîncercare:

import time

def keep_publishing(client, topic, get_message):
    while True:
        try:
            client.connect()
            while True:
                client.publish(topic, get_message())
                time.sleep(5)
        except OSError:
            print('connection lost, reconnecting in 5s')
            time.sleep(5)

Abonamentele nu sunt păstrate la reconectări decât dacă clientul a transmis clean_session=False la conectare, așa că connect din interior ar trebui de asemenea să reemită orice apeluri subscribe() înainte de a intra în bucla de publicare.

9.19.5. Cârligul de testament (last-will)

O cameră care raportează starea ar trebui să îi spună brokerului ce mesaj să trimită în numele camerei dacă conexiunea se întrerupe în mod neașteptat. Setează testamentul înainte de connect()

client = MQTTClient(
    client_id='yard-cam',
    server='broker.example.com',
    port=8883,
    ssl_params={'server_hostname': 'broker.example.com'},
)
client.set_last_will(
    b'yard-cam/status',
    b'offline',
    retain=True,
    qos=0,
)
client.connect()
client.publish(b'yard-cam/status', b'online', retain=True)

Acum orice tablou de bord abonat la yard-cam/status vede online în momentul în care camera se conectează și offline ori de câte ori brokerul observă că s-a întrerupt camera. Mesajul offline reținut persistă pe broker, astfel încât un tablou de bord care se conectează zece minute mai târziu vede în continuare starea curentă corectă.

9.19.6. Când să alegi MQTT în locul HTTP

Capitolul despre serverele web acoperă camera acționând ca server HTTP și, pe pagina de încărcare în cloud, ca client HTTP care trimite imagini JPEG către un URL fix. Ambele își au locul lor. Momentul potrivit pentru a apela în schimb la MQTT:

  • Aceleași date trebuie să ajungă la mai mulți ascultători (un tablou de bord, un serviciu de notificare, un înregistrator) fără ca camera să cunoască lista în avans.

  • Ascultătorii pot apărea și dispărea fără ca camera să repornească.

  • Camera dorește să se aboneze – să primească comenzi de la un controler – ceea ce un client HTTP nu poate face fără interogare prelungită (long polling) sau fără ca un server să trimită pe un URL de retroapelare.

  • Conexiunea trebuie să supraviețuiască ieftin unor perioade lungi de inactivitate.

Momentul potrivit pentru a rămâne la HTTP: o cameră, un server, un tipar fix de cerere/răspuns cu un corp care este prea mare pentru un singur subiect MQTT (cadrele JPEG peste MQTT funcționează, dar sunt o impolitețe față de broker; HTTP POST este alegerea naturală).

Legătură încrucișată: pagina de încărcare în cloud din capitolul despre serverele web arată versiunea HTTP a fluxului „cameră → arhivă în cloud”. Versiunea MQTT a aceleiași probleme menține camera decuplată de URL-ul arhivei și permite unui al doilea consumator (de exemplu o aplicație de alertă pe telefon) să se conecteze la același flux.