9.19. MQTT en Python

Le module mqtt intégré à chaque caméra OpenMV connectée enveloppe le protocole filaire MQTT dans une seule classe, mqtt.MQTTClient. Cette classe ouvre le socket TCP, effectue la poignée de main CONNECT, assemble et désassemble les paquets au niveau des octets, gère le keepalive PINGREQ et distribue les messages PUBLISH entrants vers une fonction de rappel. Le code applicatif appelle connect(), publish(), subscribe(), et wait_msg() / check_msg().

9.19.1. Un éditeur en quinze lignes

Le plus petit programme utile est une simple publication. Se connecter, publier un message, se déconnecter

from mqtt import MQTTClient

client = MQTTClient(
    client_id='yard-cam',
    server='test.mosquitto.org',
    port=1883,
)
client.connect()
client.publish(b'yard-cam/motion', b'detected at 14:02', qos=0)
client.disconnect()

test.mosquitto.org est le broker de test public géré par le projet Eclipse Mosquitto. Il accepte les connexions en TCP simple sur le port 1883 sans identifiants. Ne l’utilisez pour rien de sérieux ; il n’offre aucune garantie de confidentialité et l’espace de noms des topics est partagé avec tous les autres testeurs sur Internet.

client_id doit être unique par connexion au broker – le broker l’utilise pour suivre les sessions. Les topics et les charges utiles des messages sont des octets ; passez un str si c’est plus pratique et le client l’encodera en UTF-8.

9.19.2. Se connecter via TLS

Pour tout ce qui dépasse les expériences rapides, MQTT sur TLS ne demande qu’un argument supplémentaire. Le dictionnaire ssl_params est transmis à ssl.wrap_socket(), donc tout ce qui fonctionne là-bas fonctionne ici

import ssl

client = MQTTClient(
    client_id='yard-cam',
    server='broker.example.com',
    port=8883,                          # TLS-MQTT default port
    ssl_params={'server_hostname': 'broker.example.com'},
    user='yard-cam',
    password=load_token(),
)

Le port 8883 est le port TLS-MQTT réservé par l’IANA. server_hostname active le SNI afin que les brokers derrière une IP partagée puissent router vers le bon certificat – le même mécanisme qu’utilise HTTPS. user / password correspondent aux champs nom d’utilisateur/mot de passe du paquet CONNECT ; le broker décide si ces identifiants accordent des droits de publication ou d’abonnement à des topics spécifiques.

9.19.3. S’abonner et recevoir

Pour recevoir des messages, un client fournit une fonction de rappel et appelle subscribe(). La fonction de rappel reçoit deux arguments de type octets, le topic et la charge utile

def on_message(topic, msg):
    print('received on', topic.decode(), ':', msg.decode())

client = MQTTClient(
    client_id='dashboard',
    server='test.mosquitto.org',
    port=1883,
    callback=on_message,
)
client.connect()
client.subscribe(b'yard-cam/motion', qos=0)
while True:
    client.wait_msg()

wait_msg() bloque jusqu’à l’arrivée d’un paquet MQTT, l’analyse, appelle la fonction de rappel s’il s’agissait d’un PUBLISH sur un topic abonné, puis retourne. Les fonctions de rappel abonnées se déclenchent depuis l’intérieur de cet appel – il n’y a aucun thread d’arrière-plan.

Pour une boucle de caméra interactive qui doit continuer à effectuer d’autres tâches, check_msg() applique la même logique sous forme non bloquante. Elle utilise select.select() avec un délai d’attente de 50 ms et retourne immédiatement si rien n’est en attente

while True:
    client.check_msg()
    run_frame()                  # capture + processing
    check_motion_threshold()

9.19.4. Se reconnecter proprement

Tout client MQTT à exécution prolongée doit gérer les connexions interrompues. Les déconnexions Wi-Fi, les redémarrages du broker, les expirations NAT, ou simplement le dépassement du keepalive sans trafic mettent tous fin au socket. Le client intégré lève une OSError (ou une exception nue avec le code de retour du broker) depuis l’appel qui a constaté l’interruption, et le schéma standard consiste en une boucle de relance

import time

def keep_publishing(client, topic, get_message):
    while True:
        try:
            client.connect()
            while True:
                client.publish(topic, get_message())
                time.sleep(5)
        except OSError:
            print('connection lost, reconnecting in 5s')
            time.sleep(5)

Les abonnements ne sont pas conservés à travers les reconnexions à moins que le client n’ait passé clean_session=False lors de la connexion ; aussi, l’appel interne à connect devrait également réémettre tout appel subscribe() avant de retomber dans la boucle de publication.

9.19.5. Le hook de dernière volonté (last-will)

Une caméra qui rapporte son état doit indiquer au broker quel message envoyer en son nom si la connexion meurt de façon inattendue. Définissez la volonté avant connect()

client = MQTTClient(
    client_id='yard-cam',
    server='broker.example.com',
    port=8883,
    ssl_params={'server_hostname': 'broker.example.com'},
)
client.set_last_will(
    b'yard-cam/status',
    b'offline',
    retain=True,
    qos=0,
)
client.connect()
client.publish(b'yard-cam/status', b'online', retain=True)

Désormais, tout tableau de bord abonné à yard-cam/status voit online dès que la caméra se connecte et offline chaque fois que le broker constate que la caméra a décroché. Le message offline retenu persiste sur le broker, de sorte qu’un tableau de bord qui se connecte dix minutes plus tard voit toujours l’état actuel correct.

9.19.6. Quand choisir MQTT plutôt que HTTP

Le chapitre sur les serveurs web couvre la caméra agissant comme serveur HTTP et, sur la page de téléversement vers le cloud, comme client HTTP envoyant des JPEG vers une URL fixe. Les deux ont leur utilité. Le bon moment pour opter pour MQTT à la place :

  • Les mêmes données doivent parvenir à plusieurs destinataires (un tableau de bord, un service de notification, un enregistreur) sans que la caméra connaisse la liste à l’avance.

  • Les destinataires peuvent apparaître et disparaître sans que la caméra redémarre.

  • La caméra veut s’abonner – pour recevoir des commandes d’un contrôleur – ce qu’un client HTTP ne peut pas faire sans long polling ou sans qu’un serveur pousse les données sur une URL de rappel.

  • La connexion doit survivre à de longues périodes d’inactivité à moindre coût.

Le bon moment pour s’en tenir à HTTP : une caméra, un serveur, un schéma requête/réponse fixe avec un corps trop volumineux pour un seul topic MQTT (les trames JPEG sur MQTT fonctionnent mais sont inélégantes pour le broker ; le POST HTTP est le choix naturel).

Lien croisé : la page de téléversement vers le cloud dans le chapitre sur les serveurs web montre la version HTTP de « caméra → archive cloud ». La version MQTT du même problème garde la caméra découplée de l’URL de l’archive et permet à un second consommateur (une application d’alerte sur téléphone, par exemple) de se brancher sur le même flux.