9.19. MQTT in Python¶
Il modulo mqtt incluso in ogni cam OpenMV connessa in rete racchiude il protocollo MQTT a livello di trasmissione in un’unica classe, mqtt.MQTTClient. La classe apre il socket TCP, esegue l’handshake CONNECT, impacchetta e spacchetta i pacchetti a livello di byte, gestisce il keepalive PINGREQ e smista i messaggi PUBLISH in arrivo a una callback. Il codice dell’applicazione chiama connect(), publish(), subscribe() e wait_msg() / check_msg().
9.19.1. Un publisher in quindici righe¶
Il programma utile più piccolo è una singola pubblicazione. Connettersi, pubblicare un messaggio, disconnettersi:
from mqtt import MQTTClient
client = MQTTClient(
client_id='yard-cam',
server='test.mosquitto.org',
port=1883,
)
client.connect()
client.publish(b'yard-cam/motion', b'detected at 14:02', qos=0)
client.disconnect()
test.mosquitto.org è il broker di test pubblico gestito dal progetto Eclipse Mosquitto. Accetta connessioni TCP in chiaro sulla porta 1883 senza credenziali. Non usarlo per nulla di serio; non offre garanzie di privacy e lo spazio dei nomi dei topic è condiviso con ogni altro tester su internet.
client_id deve essere univoco per ogni connessione al broker: il broker lo usa per tracciare le sessioni. I topic e i payload dei messaggi sono byte; passa una str se è più comodo e il client la codificherà come UTF-8.
9.19.2. Connettersi tramite TLS¶
Per qualsiasi cosa al di là dei rapidi esperimenti, MQTT su TLS richiede un solo argomento aggiuntivo. Il dizionario ssl_params viene inoltrato a ssl.wrap_socket(), quindi tutto ciò che funziona lì funziona anche qui:
import ssl
client = MQTTClient(
client_id='yard-cam',
server='broker.example.com',
port=8883, # TLS-MQTT default port
ssl_params={'server_hostname': 'broker.example.com'},
user='yard-cam',
password=load_token(),
)
La porta 8883 è la porta TLS-MQTT riservata dalla IANA. server_hostname attiva la SNI in modo che i broker dietro un IP condiviso possano instradare verso il certificato corretto – lo stesso meccanismo usato da HTTPS. user / password corrispondono ai campi username/password del pacchetto CONNECT; il broker decide se quelle credenziali concedono diritti di pubblicazione o sottoscrizione su determinati topic.
9.19.3. Sottoscrivere e ricevere¶
Per ricevere messaggi un client fornisce una callback e chiama subscribe(). La callback riceve due argomenti di tipo byte, il topic e il payload:
def on_message(topic, msg):
print('received on', topic.decode(), ':', msg.decode())
client = MQTTClient(
client_id='dashboard',
server='test.mosquitto.org',
port=1883,
callback=on_message,
)
client.connect()
client.subscribe(b'yard-cam/motion', qos=0)
while True:
client.wait_msg()
wait_msg() si blocca finché non arriva un pacchetto MQTT, lo analizza, chiama la callback se si trattava di un PUBLISH su un topic sottoscritto e ritorna. Le callback sottoscritte vengono attivate dall’interno di quella chiamata: non esiste alcun thread in background.
Per un ciclo interattivo della cam che deve continuare a svolgere altro lavoro, check_msg() è la stessa logica in forma non bloccante. Utilizza select.select() con un timeout di 50 ms e ritorna immediatamente se non c’è nulla in sospeso:
while True:
client.check_msg()
run_frame() # capture + processing
check_motion_threshold()
9.19.4. Riconnettersi in modo pulito¶
Qualsiasi client MQTT a lungo termine deve gestire le connessioni interrotte. Le disconnessioni Wi-Fi, i riavvii del broker, i timeout NAT, o semplicemente il superamento del keepalive senza traffico, terminano tutti il socket. Il client incluso solleva un’eccezione OSError (o una semplice eccezione con il codice di ritorno del broker) dalla chiamata che ha notato l’interruzione, e lo schema standard è un ciclo di ritentativo:
import time
def keep_publishing(client, topic, get_message):
while True:
try:
client.connect()
while True:
client.publish(topic, get_message())
time.sleep(5)
except OSError:
print('connection lost, reconnecting in 5s')
time.sleep(5)
Le sottoscrizioni non vengono mantenute tra le riconnessioni a meno che il client non abbia passato clean_session=False in fase di connessione, quindi il connect interno dovrebbe anche rieseguire eventuali chiamate subscribe() prima di entrare nel ciclo di pubblicazione.
9.19.5. Il gancio del last-will¶
Una cam che segnala il proprio stato dovrebbe indicare al broker quale messaggio inviare per conto della cam se la connessione si interrompe inaspettatamente. Imposta il will prima di connect()
client = MQTTClient(
client_id='yard-cam',
server='broker.example.com',
port=8883,
ssl_params={'server_hostname': 'broker.example.com'},
)
client.set_last_will(
b'yard-cam/status',
b'offline',
retain=True,
qos=0,
)
client.connect()
client.publish(b'yard-cam/status', b'online', retain=True)
Ora qualsiasi dashboard sottoscritta a yard-cam/status vede online nel momento in cui la cam si connette e offline ogni volta che il broker si accorge che la cam si è disconnessa. Il messaggio offline mantenuto persiste sul broker, così una dashboard che si connette dieci minuti più tardi vede comunque lo stato corrente corretto.
9.19.6. Quando scegliere MQTT al posto di HTTP¶
Il capitolo sui webserver descrive la cam che agisce come server HTTP e, nella pagina sul caricamento nel cloud, come client HTTP che invia JPEG a un URL fisso. Entrambi hanno il loro posto. Il momento giusto per ricorrere invece a MQTT:
Gli stessi dati devono raggiungere diversi ascoltatori (una dashboard, un servizio di notifica, un registratore) senza che la cam conosca in anticipo l’elenco.
Gli ascoltatori possono comparire e scomparire senza che la cam si riavvii.
La cam vuole sottoscrivere – ricevere comandi da un controller – cosa che un client HTTP non può fare senza il long polling o un server che effettua il push su un URL di callback.
La connessione deve sopravvivere a lunghi periodi di inattività in modo economico.
Il momento giusto per restare con HTTP: una cam, un server, uno schema fisso richiesta/risposta con un corpo troppo grande per un singolo topic MQTT (i frame JPEG su MQTT funzionano ma sono scortesi verso il broker; il POST HTTP è la scelta naturale).
Collegamento incrociato: la pagina sul caricamento nel cloud nel capitolo sui webserver mostra la versione HTTP di «cam → archivio cloud». La versione MQTT dello stesso problema mantiene la cam disaccoppiata dall’URL dell’archivio e consente a un secondo consumatore (un’app di avvisi sul telefono, ad esempio) di attingere allo stesso flusso.