9.19. MQTT в Python¶
Встроенный модуль mqtt на каждой сетевой камере OpenMV оборачивает протокол MQTT в один класс – mqtt.MQTTClient. Этот класс открывает TCP-сокет, выполняет рукопожатие CONNECT, упаковывает и распаковывает пакеты на уровне байтов, обрабатывает поддержание соединения PINGREQ и направляет входящие сообщения PUBLISH в функцию обратного вызова. Код приложения вызывает connect(), publish(), subscribe() и wait_msg() / check_msg().
9.19.1. Публикатор в пятнадцать строк¶
Самая маленькая полезная программа – это одна публикация. Подключиться, опубликовать одно сообщение, отключиться:
from mqtt import MQTTClient
client = MQTTClient(
client_id='yard-cam',
server='test.mosquitto.org',
port=1883,
)
client.connect()
client.publish(b'yard-cam/motion', b'detected at 14:02', qos=0)
client.disconnect()
test.mosquitto.org – это публичный тестовый брокер, поддерживаемый проектом Eclipse Mosquitto. Он принимает обычные TCP-соединения на порту 1883 без учётных данных. Не используйте его для чего-то серьёзного; он не даёт никаких гарантий конфиденциальности, и пространство имён топиков совместно используется со всеми остальными тестировщиками в интернете.
client_id должен быть уникальным для каждого подключения к брокеру – брокер использует его для отслеживания сессий. Топики и полезные нагрузки сообщений представляют собой байты; передайте str, если так удобнее, и клиент закодирует его в UTF-8.
9.19.2. Подключение по TLS¶
Для всего, что выходит за рамки быстрых экспериментов, MQTT поверх TLS – это всего один дополнительный аргумент. Словарь ssl_params передаётся в ssl.wrap_socket(), поэтому всё, что работает там, работает и здесь:
import ssl
client = MQTTClient(
client_id='yard-cam',
server='broker.example.com',
port=8883, # TLS-MQTT default port
ssl_params={'server_hostname': 'broker.example.com'},
user='yard-cam',
password=load_token(),
)
Порт 8883 – это зарезервированный IANA порт для TLS-MQTT. server_hostname включает SNI, чтобы брокеры за общим IP-адресом могли направить запрос к нужному сертификату – тот же механизм, что использует HTTPS. user / password сопоставляются с полями имени пользователя/пароля пакета CONNECT; брокер решает, дают ли эти учётные данные право публикации или подписки на конкретные топики.
9.19.3. Подписка и приём¶
Чтобы получать сообщения, клиент предоставляет функцию обратного вызова и вызывает subscribe(). Функция обратного вызова получает два аргумента типа bytes – топик и полезную нагрузку:
def on_message(topic, msg):
print('received on', topic.decode(), ':', msg.decode())
client = MQTTClient(
client_id='dashboard',
server='test.mosquitto.org',
port=1883,
callback=on_message,
)
client.connect()
client.subscribe(b'yard-cam/motion', qos=0)
while True:
client.wait_msg()
wait_msg() блокирует выполнение, пока не придёт один пакет MQTT, разбирает его, вызывает функцию обратного вызова, если это было сообщение PUBLISH в топике, на который оформлена подписка, и возвращает управление. Функции обратного вызова подписок срабатывают изнутри этого вызова – фонового потока нет.
Для интерактивного цикла камеры, которому нужно продолжать выполнять другую работу, check_msg() – это та же логика в неблокирующей форме. Он использует select.select() с таймаутом 50 мс и сразу же возвращает управление, если ничего нет в ожидании:
while True:
client.check_msg()
run_frame() # capture + processing
check_motion_threshold()
9.19.4. Чистое переподключение¶
Любой долго работающий клиент MQTT должен обрабатывать разрывы соединения. Отключения Wi-Fi, перезапуски брокера, тайм-ауты NAT или просто превышение интервала поддержания соединения без трафика – всё это завершает сокет. Встроенный клиент возбуждает OSError (или простое исключение с кодом возврата брокера) из того вызова, который заметил разрыв, и стандартный приём – это цикл повторных попыток:
import time
def keep_publishing(client, topic, get_message):
while True:
try:
client.connect()
while True:
client.publish(topic, get_message())
time.sleep(5)
except OSError:
print('connection lost, reconnecting in 5s')
time.sleep(5)
Подписки не сохраняются между переподключениями, если клиент не передал clean_session=False при подключении, поэтому внутренний connect должен также повторно отправить все вызовы subscribe(), прежде чем перейти в цикл публикации.
9.19.5. Хук последней воли (last-will)¶
Камера, сообщающая о своём состоянии, должна указать брокеру, какое сообщение отправить от её имени, если соединение неожиданно прервётся. Установите завещание до connect():
client = MQTTClient(
client_id='yard-cam',
server='broker.example.com',
port=8883,
ssl_params={'server_hostname': 'broker.example.com'},
)
client.set_last_will(
b'yard-cam/status',
b'offline',
retain=True,
qos=0,
)
client.connect()
client.publish(b'yard-cam/status', b'online', retain=True)
Теперь любая панель мониторинга, подписанная на yard-cam/status, увидит online в момент подключения камеры и offline, как только брокер заметит, что камера отключилась. Сохранённое сообщение offline остаётся на брокере, поэтому панель мониторинга, подключившаяся десятью минутами позже, всё равно увидит правильное текущее состояние.
9.19.6. Когда выбирать MQTT вместо HTTP¶
В главе о веб-серверах рассматривается камера в роли HTTP-сервера, а на странице о загрузке в облако – в роли HTTP-клиента, отправляющего JPEG-изображения на фиксированный URL. У обоих вариантов есть своё место. Подходящий момент, чтобы обратиться к MQTT вместо этого:
Одни и те же данные нужно отправить нескольким слушателям (панель мониторинга, служба уведомлений, регистратор), причём камера заранее не знает их списка.
Слушатели могут появляться и исчезать без перезапуска камеры.
Камера хочет подписаться – получать команды от контроллера, – чего HTTP-клиент не может сделать без длинного опроса (long polling) или сервера, отправляющего данные на URL обратного вызова.
Соединение должно дёшево переживать долгие периоды простоя.
Подходящий момент, чтобы остаться с HTTP: одна камера, один сервер, фиксированная схема запрос/ответ с телом, которое слишком велико для одного топика MQTT (передача JPEG-кадров через MQTT работает, но это неучтиво по отношению к брокеру; HTTP POST подходит для этого естественным образом).
Перекрёстная ссылка: страница о загрузке в облако в главе о веб-серверах показывает HTTP-версию сценария «камера → облачный архив». MQTT-версия той же задачи держит камеру независимой от URL архива и позволяет второму потребителю (например, приложению для оповещений на телефоне) подключиться к тому же потоку.