9.19. MQTT у Python

Вбудований модуль mqtt на кожній мережевій OpenMV камері обгортає дротовий протокол MQTT в один клас, mqtt.MQTTClient. Клас відкриває TCP-сокет, виконує рукостискання CONNECT, пакує та розпаковує пакети на рівні байтів, обробляє підтримку з’єднання PINGREQ і направляє вхідні повідомлення PUBLISH до зворотного виклику. Код застосунку викликає connect(), publish(), subscribe() та wait_msg() / check_msg().

9.19.1. Видавець у п’ятнадцяти рядках

Найменш складна корисна програма — це одноразова публікація. Підключитися, опублікувати одне повідомлення, відключитися:

from mqtt import MQTTClient

client = MQTTClient(
    client_id='yard-cam',
    server='test.mosquitto.org',
    port=1883,
)
client.connect()
client.publish(b'yard-cam/motion', b'detected at 14:02', qos=0)
client.disconnect()

test.mosquitto.org — публічний тестовий брокер, що підтримується проектом Eclipse Mosquitto. Він приймає підключення по TCP на порту 1883 без облікових даних. Не використовуйте його для чогось серйозного: він не гарантує конфіденційності, а простір імен тем є спільним для всіх інших тестувальників в інтернеті.

client_id має бути унікальним для кожного підключення до брокера — брокер використовує його для відстеження сеансів. Теми та корисні навантаження повідомлень — це байти; передайте str, якщо це зручніше, і клієнт закодує його як UTF-8.

9.19.2. Підключення через TLS

Для всього, що виходить за рамки швидких експериментів, MQTT через TLS потребує лише одного додаткового аргументу. Словник ssl_params пересилається до ssl.wrap_socket(), тому все, що там працює, працює і тут:

import ssl

client = MQTTClient(
    client_id='yard-cam',
    server='broker.example.com',
    port=8883,                          # TLS-MQTT default port
    ssl_params={'server_hostname': 'broker.example.com'},
    user='yard-cam',
    password=load_token(),
)

Порт 8883 — зарезервований IANA порт TLS-MQTT. server_hostname вмикає SNI, щоб брокери за спільною IP-адресою могли маршрутизуватися до правильного сертифіката — той самий механізм, що використовує HTTPS. user / password відповідають полям імені користувача/пароля пакета CONNECT; брокер вирішує, чи надають ці облікові дані права на публікацію або підписку на конкретні теми.

9.19.3. Підписка та отримання

Щоб отримувати повідомлення, клієнт надає зворотний виклик і викликає subscribe(). Зворотний виклик отримує два аргументи у вигляді байтів: тему та корисне навантаження:

def on_message(topic, msg):
    print('received on', topic.decode(), ':', msg.decode())

client = MQTTClient(
    client_id='dashboard',
    server='test.mosquitto.org',
    port=1883,
    callback=on_message,
)
client.connect()
client.subscribe(b'yard-cam/motion', qos=0)
while True:
    client.wait_msg()

wait_msg() блокує виконання до надходження одного пакету MQTT, розбирає його, викликає зворотний виклик, якщо це було повідомлення PUBLISH на підписану тему, і повертається. Підписані зворотні виклики спрацьовують усередині цього виклику — фонового потоку немає.

Для інтерактивного циклу камери, якому потрібно продовжувати виконувати іншу роботу, check_msg() — це та сама логіка у неблокуючій формі. Він використовує select.select() з тайм-аутом 50 мс і негайно повертається, якщо нічого не очікується:

while True:
    client.check_msg()
    run_frame()                  # capture + processing
    check_motion_threshold()

9.19.4. Відновлення з’єднання у чистому вигляді

Будь-який довготривалий MQTT-клієнт повинен обробляти розриви з’єднання. Відключення Wi-Fi, перезапуски брокера, тайм-аути NAT або просто перевищення тривалості keepalive без трафіку — все це завершує сокет. Вбудований клієнт генерує OSError (або просте виключення з кодом повернення брокера) від виклику, який виявив розрив, і стандартна модель — це цикл повторних спроб:

import time

def keep_publishing(client, topic, get_message):
    while True:
        try:
            client.connect()
            while True:
                client.publish(topic, get_message())
                time.sleep(5)
        except OSError:
            print('connection lost, reconnecting in 5s')
            time.sleep(5)

Підписки не зберігаються між повторними підключеннями, якщо клієнт не передав clean_session=False під час підключення, тому внутрішній connect повинен також повторно виконати будь-які виклики subscribe() перед входом у цикл публікації.

9.19.5. Хук останньої волі

Камера, що повідомляє про статус, повинна повідомити брокеру, яке повідомлення надіслати від імені камери, якщо з’єднання несподівано обривається. Встановіть волю перед connect()

client = MQTTClient(
    client_id='yard-cam',
    server='broker.example.com',
    port=8883,
    ssl_params={'server_hostname': 'broker.example.com'},
)
client.set_last_will(
    b'yard-cam/status',
    b'offline',
    retain=True,
    qos=0,
)
client.connect()
client.publish(b'yard-cam/status', b'online', retain=True)

Тепер будь-яка інформаційна панель, підписана на yard-cam/status, побачить online в момент підключення камери та offline щоразу, коли брокер помічає відключення камери. Збережене повідомлення offline зберігається на брокері, тому інформаційна панель, що підключається десять хвилин потому, все одно бачить правильний поточний стан.

9.19.6. Коли обирати MQTT замість HTTP

Розділ про веб-сервери охоплює роботу камери як HTTP-сервера і, на сторінці завантаження в хмару, як HTTP-клієнта, що надсилає JPEG-файли на фіксовану URL-адресу. Обидва мають своє місце. Правильний час обрати MQTT:

  • Ті самі дані повинні надходити до кількох слухачів (інформаційна панель, сервіс сповіщень, реєстратор) без того, щоб камера знала список заздалегідь.

  • Слухачі можуть з’являтися і зникати без перезапуску камери.

  • Камера хоче підписатися — отримувати команди від контролера — що HTTP-клієнт не може зробити без тривалого опитування або сервера, що надсилає дані на URL зворотного виклику.

  • З’єднання повинно витримувати тривалі простої з мінімальними витратами.

Правильний час залишитися на HTTP: одна камера, один сервер, фіксований шаблон запит/відповідь з тілом, що занадто велике для однієї теми MQTT (передача JPEG-кадрів через MQTT працює, але навантажує брокер; HTTP POST є природним рішенням).

Перехресне посилання: сторінка завантаження в хмару в розділі веб-серверів показує HTTP-версію «камера → хмарний архів». Версія MQTT тієї самої задачі тримає камеру незалежною від URL-адреси архіву і дозволяє другому споживачу (наприклад, застосунку сповіщень на телефоні) підключитися до того самого потоку.