9.19. MQTT en Python

El módulo mqtt incluido en cada cámara OpenMV con conexión a red envuelve el protocolo de cable de MQTT en una sola clase, mqtt.MQTTClient. La clase abre el socket TCP, realiza el handshake CONNECT, empaqueta y desempaqueta los paquetes a nivel de bytes, gestiona el keepalive PINGREQ y reparte los mensajes PUBLISH entrantes a una función de retorno (callback). El código de la aplicación llama a connect(), publish(), subscribe() y wait_msg() / check_msg().

9.19.1. Un publicador en quince líneas

El programa útil más pequeño es una sola publicación. Conectar, publicar un mensaje y desconectar:

from mqtt import MQTTClient

client = MQTTClient(
    client_id='yard-cam',
    server='test.mosquitto.org',
    port=1883,
)
client.connect()
client.publish(b'yard-cam/motion', b'detected at 14:02', qos=0)
client.disconnect()

test.mosquitto.org es el broker de pruebas público gestionado por el proyecto Eclipse Mosquitto. Acepta conexiones TCP en texto plano en el puerto 1883 sin credenciales. No lo uses para nada serio; no ofrece garantías de privacidad y el espacio de nombres de temas se comparte con todos los demás que estén haciendo pruebas en internet.

client_id debe ser único por cada conexión al broker: el broker lo utiliza para hacer seguimiento de las sesiones. Los temas y las cargas útiles de los mensajes son bytes; pasa un str si te resulta más cómodo y el cliente lo codificará como UTF-8.

9.19.2. Conexión sobre TLS

Para cualquier cosa que vaya más allá de experimentos rápidos, MQTT sobre TLS es solo un argumento adicional. El diccionario ssl_params se reenvía a ssl.wrap_socket(), así que todo lo que funciona allí funciona aquí:

import ssl

client = MQTTClient(
    client_id='yard-cam',
    server='broker.example.com',
    port=8883,                          # TLS-MQTT default port
    ssl_params={'server_hostname': 'broker.example.com'},
    user='yard-cam',
    password=load_token(),
)

El puerto 8883 es el puerto TLS-MQTT reservado por la IANA. server_hostname activa SNI para que los brokers detrás de una IP compartida puedan dirigir hacia el certificado correcto – el mismo mecanismo que usa HTTPS. user / password se corresponden con los campos de nombre de usuario/contraseña del paquete CONNECT; el broker decide si esas credenciales conceden derechos de publicación o suscripción a temas específicos.

9.19.3. Suscripción y recepción

Para recibir mensajes, un cliente proporciona una función de retorno (callback) y llama a subscribe(). La función de retorno recibe dos argumentos de tipo bytes, el tema y la carga útil:

def on_message(topic, msg):
    print('received on', topic.decode(), ':', msg.decode())

client = MQTTClient(
    client_id='dashboard',
    server='test.mosquitto.org',
    port=1883,
    callback=on_message,
)
client.connect()
client.subscribe(b'yard-cam/motion', qos=0)
while True:
    client.wait_msg()

wait_msg() se bloquea hasta que llega un paquete MQTT, lo analiza, llama a la función de retorno si era un PUBLISH sobre un tema suscrito, y retorna. Las funciones de retorno suscritas se disparan desde dentro de esa llamada: no hay ningún hilo en segundo plano.

Para un bucle interactivo de la cámara que necesita seguir haciendo otras tareas, check_msg() es la misma lógica en forma no bloqueante. Utiliza select.select() con un tiempo de espera de 50 ms y retorna de inmediato si no hay nada pendiente:

while True:
    client.check_msg()
    run_frame()                  # capture + processing
    check_motion_threshold()

9.19.4. Reconexión limpia

Cualquier cliente MQTT de larga duración tiene que gestionar las caídas de conexión. Las desconexiones de Wi-Fi, los reinicios del broker, los tiempos de espera de NAT, o simplemente superar el keepalive sin tráfico, todo ello finaliza el socket. El cliente incluido lanza OSError (o una excepción simple con el código de retorno del broker) desde la llamada que detectó la caída, y el patrón estándar es un bucle de reintento:

import time

def keep_publishing(client, topic, get_message):
    while True:
        try:
            client.connect()
            while True:
                client.publish(topic, get_message())
                time.sleep(5)
        except OSError:
            print('connection lost, reconnecting in 5s')
            time.sleep(5)

Las suscripciones no persisten entre reconexiones a menos que el cliente haya pasado clean_session=False al conectar, por lo que el connect interno también debería volver a emitir cualquier llamada a subscribe() antes de entrar en el bucle de publicación.

9.19.5. El gancho de último mensaje (last-will)

Una cámara que informa de su estado debería decirle al broker qué mensaje enviar en nombre de la cámara si la conexión muere de forma inesperada. Configura el mensaje de despedida (will) antes de connect():

client = MQTTClient(
    client_id='yard-cam',
    server='broker.example.com',
    port=8883,
    ssl_params={'server_hostname': 'broker.example.com'},
)
client.set_last_will(
    b'yard-cam/status',
    b'offline',
    retain=True,
    qos=0,
)
client.connect()
client.publish(b'yard-cam/status', b'online', retain=True)

Ahora cualquier panel suscrito a yard-cam/status ve online en el momento en que la cámara se conecta y offline siempre que el broker detecta que la cámara se ha caído. El mensaje offline retenido persiste en el broker, de modo que un panel que se conecte diez minutos más tarde sigue viendo el estado actual correcto.

9.19.6. Cuándo elegir MQTT en lugar de HTTP

El capítulo sobre servidores web cubre la cámara actuando como servidor HTTP y, en la página de carga a la nube, como cliente HTTP que publica JPEG en una URL fija. Ambos tienen su lugar. El momento adecuado para recurrir a MQTT en su lugar:

  • Los mismos datos tienen que llegar a varios receptores (un panel, un servicio de notificaciones, un grabador) sin que la cámara conozca la lista de antemano.

  • Los receptores pueden aparecer y desaparecer sin que la cámara se reinicie.

  • La cámara quiere suscribirse – para recibir comandos de un controlador – algo que un cliente HTTP no puede hacer sin sondeo prolongado (long polling) o un servidor que envíe a una URL de retorno.

  • La conexión tiene que sobrevivir a largos periodos de inactividad de forma económica.

El momento adecuado para quedarse con HTTP: una cámara, un servidor, un patrón fijo de petición/respuesta con un cuerpo demasiado grande para un solo tema MQTT (los fotogramas JPEG sobre MQTT funcionan, pero resultan descorteses con el broker; HTTP POST es lo natural).

Enlace cruzado: la página de carga a la nube del capítulo sobre servidores web muestra la versión HTTP de «cámara → archivo en la nube». La versión MQTT del mismo problema mantiene la cámara desacoplada de la URL del archivo y permite que un segundo consumidor (una aplicación de alertas en el teléfono, por ejemplo) se conecte al mismo flujo.