9.19. MQTT w Pythonie

Dołączony moduł mqtt na każdej sieciowej kamerze OpenMV opakowuje protokół przewodowy MQTT w jednej klasie, mqtt.MQTTClient. Klasa otwiera gniazdo TCP, przeprowadza handshake CONNECT, pakuje i rozpakowuje pakiety na poziomie bajtów, obsługuje podtrzymanie połączenia PINGREQ oraz przekazuje przychodzące wiadomości PUBLISH do wywołania zwrotnego. Kod aplikacji wywołuje connect(), publish(), subscribe() oraz wait_msg() / check_msg().

9.19.1. Wydawca w piętnastu wierszach

Najmniejszy użyteczny program to pojedyncza publikacja. Połącz się, opublikuj jedną wiadomość, rozłącz się:

from mqtt import MQTTClient

client = MQTTClient(
    client_id='yard-cam',
    server='test.mosquitto.org',
    port=1883,
)
client.connect()
client.publish(b'yard-cam/motion', b'detected at 14:02', qos=0)
client.disconnect()

test.mosquitto.org to publiczny broker testowy prowadzony przez projekt Eclipse Mosquitto. Przyjmuje połączenia w czystym TCP na porcie 1883 bez poświadczeń. Nie używaj go do niczego poważnego; nie daje żadnych gwarancji prywatności, a przestrzeń nazw tematów jest współdzielona z każdym innym testującym w internecie.

client_id musi być unikalny dla każdego połączenia z brokerem – broker używa go do śledzenia sesji. Tematy i ładunki wiadomości to bajty; przekaż str, jeśli jest to wygodniejsze, a klient zakoduje go jako UTF-8.

9.19.2. Łączenie przez TLS

W przypadku czegokolwiek poza szybkimi eksperymentami MQTT przez TLS to jeden dodatkowy argument. Słownik ssl_params jest przekazywany do ssl.wrap_socket(), więc wszystko, co działa tam, działa również tutaj:

import ssl

client = MQTTClient(
    client_id='yard-cam',
    server='broker.example.com',
    port=8883,                          # TLS-MQTT default port
    ssl_params={'server_hostname': 'broker.example.com'},
    user='yard-cam',
    password=load_token(),
)

Port 8883 to zarezerwowany przez IANA port TLS-MQTT. server_hostname włącza SNI, dzięki czemu brokery za współdzielonym adresem IP mogą kierować ruch do właściwego certyfikatu – to ten sam mechanizm, którego używa HTTPS. user / password odwzorowują się na pola nazwy użytkownika/hasła pakietu CONNECT; broker decyduje, czy te poświadczenia przyznają prawa do publikowania lub subskrybowania określonych tematów.

9.19.3. Subskrybowanie i odbieranie

Aby odbierać wiadomości, klient dostarcza wywołanie zwrotne i wywołuje subscribe(). Wywołanie zwrotne otrzymuje dwa argumenty typu bytes, temat oraz ładunek:

def on_message(topic, msg):
    print('received on', topic.decode(), ':', msg.decode())

client = MQTTClient(
    client_id='dashboard',
    server='test.mosquitto.org',
    port=1883,
    callback=on_message,
)
client.connect()
client.subscribe(b'yard-cam/motion', qos=0)
while True:
    client.wait_msg()

wait_msg() blokuje, dopóki nie dotrze jeden pakiet MQTT, parsuje go, wywołuje wywołanie zwrotne, jeśli był to PUBLISH na subskrybowanym temacie, i wraca. Subskrybowane wywołania zwrotne uruchamiają się z wnętrza tego wywołania – nie ma żadnego wątku działającego w tle.

Dla interaktywnej pętli kamery, która musi wykonywać inne zadania, check_msg() to ta sama logika w formie nieblokującej. Używa select.select() z limitem czasu 50 ms i wraca natychmiast, jeśli nic nie oczekuje:

while True:
    client.check_msg()
    run_frame()                  # capture + processing
    check_motion_threshold()

9.19.4. Czyste ponowne łączenie

Każdy długo działający klient MQTT musi obsługiwać zerwane połączenia. Rozłączenia Wi-Fi, restarty brokera, przekroczenia czasu NAT albo po prostu przekroczenie czasu podtrzymania połączenia bez ruchu – wszystko to kończy działanie gniazda. Dołączony klient zgłasza OSError (lub zwykły wyjątek z kodem powrotu brokera) z wywołania, które zauważyło zerwanie, a standardowym wzorcem jest pętla ponawiania:

import time

def keep_publishing(client, topic, get_message):
    while True:
        try:
            client.connect()
            while True:
                client.publish(topic, get_message())
                time.sleep(5)
        except OSError:
            print('connection lost, reconnecting in 5s')
            time.sleep(5)

Subskrypcje nie są zachowywane między ponownymi połączeniami, chyba że klient przekazał clean_session=False przy łączeniu, więc wewnętrzne connect powinno również ponownie wydać wszystkie wywołania subscribe() przed przejściem do pętli publikowania.

9.19.5. Hak last-will

Kamera raportująca status powinna powiedzieć brokerowi, jaką wiadomość ma wysłać w imieniu kamery, jeśli połączenie zostanie niespodziewanie zerwane. Ustaw testament przed connect()

client = MQTTClient(
    client_id='yard-cam',
    server='broker.example.com',
    port=8883,
    ssl_params={'server_hostname': 'broker.example.com'},
)
client.set_last_will(
    b'yard-cam/status',
    b'offline',
    retain=True,
    qos=0,
)
client.connect()
client.publish(b'yard-cam/status', b'online', retain=True)

Teraz każdy panel subskrybujący yard-cam/status widzi online w momencie, gdy kamera się połączy, oraz offline, gdy broker zauważy, że kamera odpadła. Zachowana wiadomość offline utrzymuje się na brokerze, więc panel, który połączy się dziesięć minut później, nadal widzi poprawny bieżący stan.

9.19.6. Kiedy wybrać MQTT zamiast HTTP

Rozdział o serwerach WWW opisuje kamerę działającą jako serwer HTTP oraz, na stronie o wysyłaniu do chmury, jako klient HTTP wysyłający pliki JPEG pod ustalony adres URL. Oba mają swoje zastosowanie. Właściwy moment, aby sięgnąć po MQTT:

  • Te same dane muszą trafić do kilku odbiorców (panelu, usługi powiadomień, rejestratora), przy czym kamera nie zna z góry tej listy.

  • Odbiorcy mogą pojawiać się i znikać bez ponownego uruchamiania kamery.

  • Kamera chce subskrybować – odbierać polecenia od kontrolera – czego klient HTTP nie potrafi zrobić bez długiego odpytywania (long polling) lub serwera wysyłającego dane pod adres URL wywołania zwrotnego.

  • Połączenie musi tanio przetrwać długie okresy bezczynności.

Właściwy moment, aby pozostać przy HTTP: jedna kamera, jeden serwer, ustalony wzorzec żądanie/odpowiedź z treścią zbyt dużą dla pojedynczego tematu MQTT (ramki JPEG przez MQTT działają, ale są niegrzeczne wobec brokera; HTTP POST jest naturalnym dopasowaniem).

Powiązanie: strona o wysyłaniu do chmury w rozdziale o serwerach WWW pokazuje wersję HTTP scenariusza „kamera → archiwum w chmurze”. Wersja MQTT tego samego problemu utrzymuje kamerę odseparowaną od adresu URL archiwum i pozwala drugiemu konsumentowi (powiedzmy aplikacji z alertami na telefon) podłączyć się do tego samego strumienia.