9.19. MQTT em Python

O módulo mqtt incluído em todas as câmaras OpenMV com rede encapsula o protocolo MQTT numa classe, mqtt.MQTTClient. A classe abre o socket TCP, realiza o handshake CONNECT, empacota e desempacota os pacotes ao nível de bytes, gere o keepalive PINGREQ e envia as mensagens PUBLISH recebidas para um callback. O código da aplicação chama connect(), publish(), subscribe() e wait_msg() / check_msg().

9.19.1. Um publicador em quinze linhas

O menor programa útil é uma única publicação. Ligar, publicar uma mensagem, desligar:

from mqtt import MQTTClient

client = MQTTClient(
    client_id='yard-cam',
    server='test.mosquitto.org',
    port=1883,
)
client.connect()
client.publish(b'yard-cam/motion', b'detected at 14:02', qos=0)
client.disconnect()

test.mosquitto.org é o broker de testes público gerido pelo projeto Eclipse Mosquitto. Aceita ligações TCP simples na porta 1883 sem credenciais. Não o utilize para nada sério; não tem garantias de privacidade e o espaço de nomes de tópicos é partilhado com todos os outros utilizadores de testes na internet.

client_id deve ser único por ligação ao broker — o broker usa-o para controlar sessões. Os tópicos e as cargas úteis das mensagens são bytes; passe str se for mais conveniente e o cliente irá codificá-lo como UTF-8.

9.19.2. Ligar por TLS

Para qualquer coisa além de experiências rápidas, MQTT sobre TLS é um argumento extra. O dicionário ssl_params é reencaminhado para ssl.wrap_socket(), pelo que qualquer coisa que funcione lá funciona aqui:

import ssl

client = MQTTClient(
    client_id='yard-cam',
    server='broker.example.com',
    port=8883,                          # TLS-MQTT default port
    ssl_params={'server_hostname': 'broker.example.com'},
    user='yard-cam',
    password=load_token(),
)

A porta 8883 é a porta TLS-MQTT reservada pela IANA. server_hostname ativa o SNI para que os brokers por trás de um IP partilhado possam encaminhar para o certificado correto — o mesmo mecanismo que o HTTPS utiliza. user / password mapeiam para os campos de utilizador/palavra-passe do pacote CONNECT; o broker decide se essas credenciais concedem direitos de publicação ou subscrição em tópicos específicos.

9.19.3. Subscrever e receber

Para receber mensagens, um cliente fornece um callback e chama subscribe(). O callback recebe dois argumentos de bytes, o tópico e a carga útil:

def on_message(topic, msg):
    print('received on', topic.decode(), ':', msg.decode())

client = MQTTClient(
    client_id='dashboard',
    server='test.mosquitto.org',
    port=1883,
    callback=on_message,
)
client.connect()
client.subscribe(b'yard-cam/motion', qos=0)
while True:
    client.wait_msg()

wait_msg() bloqueia até que chegue um pacote MQTT, analisa-o, chama o callback se era um PUBLISH num tópico subscrito e retorna. Os callbacks subscritos são ativados dentro dessa chamada — não há thread em segundo plano.

Para um ciclo de câmara interativo que precisa de continuar a fazer outro trabalho, check_msg() é a mesma lógica em modo não bloqueante. Usa select.select() com um timeout de 50 ms e retorna imediatamente se não houver nada pendente:

while True:
    client.check_msg()
    run_frame()                  # capture + processing
    check_motion_threshold()

9.19.4. Reconectar de forma limpa

Qualquer cliente MQTT de longa duração tem de lidar com ligações interrompidas. Desconexões Wi-Fi, reinícios do broker, timeouts de NAT ou simplesmente ultrapassar o keepalive sem tráfego terminam o socket. O cliente incluído lança OSError (ou uma exceção simples com o código de retorno do broker) a partir da chamada que detetou a interrupção, e o padrão habitual é um ciclo de tentativas:

import time

def keep_publishing(client, topic, get_message):
    while True:
        try:
            client.connect()
            while True:
                client.publish(topic, get_message())
                time.sleep(5)
        except OSError:
            print('connection lost, reconnecting in 5s')
            time.sleep(5)

As subscrições não são persistidas entre reconexões a menos que o cliente tenha passado clean_session=False na ligação, pelo que o connect interno também deve re-emitir quaisquer chamadas subscribe() antes de entrar no ciclo de publicação.

9.19.5. O gancho de última vontade

Uma câmara que reporta estado deve informar o broker da mensagem a enviar em seu nome se a ligação terminar inesperadamente. Defina a última vontade antes de connect()

client = MQTTClient(
    client_id='yard-cam',
    server='broker.example.com',
    port=8883,
    ssl_params={'server_hostname': 'broker.example.com'},
)
client.set_last_will(
    b'yard-cam/status',
    b'offline',
    retain=True,
    qos=0,
)
client.connect()
client.publish(b'yard-cam/status', b'online', retain=True)

Agora qualquer painel que subscreva yard-cam/statusonline no momento em que a câmara se liga e offline sempre que o broker deteta que a câmara caiu. A mensagem offline retida persiste no broker, para que um painel que se ligue dez minutos depois ainda veja o estado atual correto.

9.19.6. Quando usar MQTT em vez de HTTP

O capítulo sobre servidores web aborda a câmara a funcionar como servidor HTTP e, na página de carregamento para a nuvem, como cliente HTTP a publicar JPEGs num URL fixo. Ambos têm o seu lugar. A altura certa para optar pelo MQTT em vez disso:

  • Os mesmos dados precisam de chegar a vários receptores (um painel, um serviço de notificações, um gravador) sem que a câmara conheça a lista antecipadamente.

  • Os receptores podem entrar e sair sem que a câmara reinicie.

  • A câmara quer subscrever — receber comandos de um controlador — o que um cliente HTTP não consegue fazer sem long polling ou um servidor a enviar para um URL de callback.

  • A ligação tem de sobreviver a longos períodos de inatividade de forma económica.

A altura certa para continuar com HTTP: uma câmara, um servidor, um padrão fixo de pedido/resposta com um corpo demasiado grande para um único tópico MQTT (fotogramas JPEG sobre MQTT funciona, mas é inconveniente para o broker; HTTP POST é a escolha natural).

Referência cruzada: a página de carregamento para a nuvem no capítulo de servidores web mostra a versão HTTP de «câmara → arquivo na nuvem». A versão MQTT do mesmo problema mantém a câmara desacoplada do URL do arquivo e permite que um segundo consumidor (uma aplicação de alertas para telemóvel, por exemplo) aceda ao mesmo fluxo.