9.19. MQTT em Python¶
O módulo mqtt incluído em todas as câmaras OpenMV com rede encapsula o protocolo MQTT numa classe, mqtt.MQTTClient. A classe abre o socket TCP, realiza o handshake CONNECT, empacota e desempacota os pacotes ao nível de bytes, gere o keepalive PINGREQ e envia as mensagens PUBLISH recebidas para um callback. O código da aplicação chama connect(), publish(), subscribe() e wait_msg() / check_msg().
9.19.1. Um publicador em quinze linhas¶
O menor programa útil é uma única publicação. Ligar, publicar uma mensagem, desligar:
from mqtt import MQTTClient
client = MQTTClient(
client_id='yard-cam',
server='test.mosquitto.org',
port=1883,
)
client.connect()
client.publish(b'yard-cam/motion', b'detected at 14:02', qos=0)
client.disconnect()
test.mosquitto.org é o broker de testes público gerido pelo projeto Eclipse Mosquitto. Aceita ligações TCP simples na porta 1883 sem credenciais. Não o utilize para nada sério; não tem garantias de privacidade e o espaço de nomes de tópicos é partilhado com todos os outros utilizadores de testes na internet.
client_id deve ser único por ligação ao broker — o broker usa-o para controlar sessões. Os tópicos e as cargas úteis das mensagens são bytes; passe str se for mais conveniente e o cliente irá codificá-lo como UTF-8.
9.19.2. Ligar por TLS¶
Para qualquer coisa além de experiências rápidas, MQTT sobre TLS é um argumento extra. O dicionário ssl_params é reencaminhado para ssl.wrap_socket(), pelo que qualquer coisa que funcione lá funciona aqui:
import ssl
client = MQTTClient(
client_id='yard-cam',
server='broker.example.com',
port=8883, # TLS-MQTT default port
ssl_params={'server_hostname': 'broker.example.com'},
user='yard-cam',
password=load_token(),
)
A porta 8883 é a porta TLS-MQTT reservada pela IANA. server_hostname ativa o SNI para que os brokers por trás de um IP partilhado possam encaminhar para o certificado correto — o mesmo mecanismo que o HTTPS utiliza. user / password mapeiam para os campos de utilizador/palavra-passe do pacote CONNECT; o broker decide se essas credenciais concedem direitos de publicação ou subscrição em tópicos específicos.
9.19.3. Subscrever e receber¶
Para receber mensagens, um cliente fornece um callback e chama subscribe(). O callback recebe dois argumentos de bytes, o tópico e a carga útil:
def on_message(topic, msg):
print('received on', topic.decode(), ':', msg.decode())
client = MQTTClient(
client_id='dashboard',
server='test.mosquitto.org',
port=1883,
callback=on_message,
)
client.connect()
client.subscribe(b'yard-cam/motion', qos=0)
while True:
client.wait_msg()
wait_msg() bloqueia até que chegue um pacote MQTT, analisa-o, chama o callback se era um PUBLISH num tópico subscrito e retorna. Os callbacks subscritos são ativados dentro dessa chamada — não há thread em segundo plano.
Para um ciclo de câmara interativo que precisa de continuar a fazer outro trabalho, check_msg() é a mesma lógica em modo não bloqueante. Usa select.select() com um timeout de 50 ms e retorna imediatamente se não houver nada pendente:
while True:
client.check_msg()
run_frame() # capture + processing
check_motion_threshold()
9.19.4. Reconectar de forma limpa¶
Qualquer cliente MQTT de longa duração tem de lidar com ligações interrompidas. Desconexões Wi-Fi, reinícios do broker, timeouts de NAT ou simplesmente ultrapassar o keepalive sem tráfego terminam o socket. O cliente incluído lança OSError (ou uma exceção simples com o código de retorno do broker) a partir da chamada que detetou a interrupção, e o padrão habitual é um ciclo de tentativas:
import time
def keep_publishing(client, topic, get_message):
while True:
try:
client.connect()
while True:
client.publish(topic, get_message())
time.sleep(5)
except OSError:
print('connection lost, reconnecting in 5s')
time.sleep(5)
As subscrições não são persistidas entre reconexões a menos que o cliente tenha passado clean_session=False na ligação, pelo que o connect interno também deve re-emitir quaisquer chamadas subscribe() antes de entrar no ciclo de publicação.
9.19.5. O gancho de última vontade¶
Uma câmara que reporta estado deve informar o broker da mensagem a enviar em seu nome se a ligação terminar inesperadamente. Defina a última vontade antes de connect()
client = MQTTClient(
client_id='yard-cam',
server='broker.example.com',
port=8883,
ssl_params={'server_hostname': 'broker.example.com'},
)
client.set_last_will(
b'yard-cam/status',
b'offline',
retain=True,
qos=0,
)
client.connect()
client.publish(b'yard-cam/status', b'online', retain=True)
Agora qualquer painel que subscreva yard-cam/status vê online no momento em que a câmara se liga e offline sempre que o broker deteta que a câmara caiu. A mensagem offline retida persiste no broker, para que um painel que se ligue dez minutos depois ainda veja o estado atual correto.
9.19.6. Quando usar MQTT em vez de HTTP¶
O capítulo sobre servidores web aborda a câmara a funcionar como servidor HTTP e, na página de carregamento para a nuvem, como cliente HTTP a publicar JPEGs num URL fixo. Ambos têm o seu lugar. A altura certa para optar pelo MQTT em vez disso:
Os mesmos dados precisam de chegar a vários receptores (um painel, um serviço de notificações, um gravador) sem que a câmara conheça a lista antecipadamente.
Os receptores podem entrar e sair sem que a câmara reinicie.
A câmara quer subscrever — receber comandos de um controlador — o que um cliente HTTP não consegue fazer sem long polling ou um servidor a enviar para um URL de callback.
A ligação tem de sobreviver a longos períodos de inatividade de forma económica.
A altura certa para continuar com HTTP: uma câmara, um servidor, um padrão fixo de pedido/resposta com um corpo demasiado grande para um único tópico MQTT (fotogramas JPEG sobre MQTT funciona, mas é inconveniente para o broker; HTTP POST é a escolha natural).
Referência cruzada: a página de carregamento para a nuvem no capítulo de servidores web mostra a versão HTTP de «câmara → arquivo na nuvem». A versão MQTT do mesmo problema mantém a câmara desacoplada do URL do arquivo e permite que um segundo consumidor (uma aplicação de alertas para telemóvel, por exemplo) aceda ao mesmo fluxo.