9.19. MQTT v Pythonu

Přibalený modul mqtt na každé síťové OpenMV cam obaluje drátový protokol MQTT do jediné třídy, mqtt.MQTTClient. Tato třída otevírá TCP socket, provádí handshake CONNECT, balí a rozbaluje pakety na úrovni bajtů, obstarává keepalive PINGREQ a předává příchozí zprávy PUBLISH do callbacku. Aplikační kód volá connect(), publish(), subscribe() a wait_msg() / check_msg().

9.19.1. Publisher v patnácti řádcích

Nejmenším užitečným programem je jediná publikace. Připojit se, publikovat jednu zprávu, odpojit se:

from mqtt import MQTTClient

client = MQTTClient(
    client_id='yard-cam',
    server='test.mosquitto.org',
    port=1883,
)
client.connect()
client.publish(b'yard-cam/motion', b'detected at 14:02', qos=0)
client.disconnect()

test.mosquitto.org je veřejný testovací broker provozovaný projektem Eclipse Mosquitto. Přijímá nešifrovaná TCP připojení na portu 1883 bez přihlašovacích údajů. Nepoužívejte jej pro nic vážného; neposkytuje žádné záruky soukromí a jmenný prostor témat je sdílený se všemi ostatními testery na internetu.

client_id musí být jedinečné pro každé připojení k brokeru – broker jej používá ke sledování relací. Témata a užitečná data zpráv jsou bajty; předejte str, pokud je to pohodlnější, a klient jej zakóduje jako UTF-8.

9.19.2. Připojení přes TLS

Pro cokoli za hranicí rychlých experimentů je MQTT přes TLS jen jeden argument navíc. Slovník ssl_params se předává funkci ssl.wrap_socket(), takže cokoli, co funguje tam, funguje i zde:

import ssl

client = MQTTClient(
    client_id='yard-cam',
    server='broker.example.com',
    port=8883,                          # TLS-MQTT default port
    ssl_params={'server_hostname': 'broker.example.com'},
    user='yard-cam',
    password=load_token(),
)

Port 8883 je port pro TLS-MQTT rezervovaný organizací IANA. server_hostname zapíná SNI, aby brokeři za sdílenou IP adresou mohli směrovat na správný certifikát – jde o stejný mechanismus, jaký používá HTTPS. user / password se mapují na pole uživatelského jména a hesla v paketu CONNECT; broker rozhoduje, zda tyto přihlašovací údaje udělují právo publikovat nebo odebírat konkrétní témata.

9.19.3. Odebírání a příjem

Aby klient přijímal zprávy, poskytne callback a zavolá subscribe(). Callback obdrží dva argumenty typu bytes, téma a užitečná data:

def on_message(topic, msg):
    print('received on', topic.decode(), ':', msg.decode())

client = MQTTClient(
    client_id='dashboard',
    server='test.mosquitto.org',
    port=1883,
    callback=on_message,
)
client.connect()
client.subscribe(b'yard-cam/motion', qos=0)
while True:
    client.wait_msg()

wait_msg() blokuje, dokud nepřijde jeden MQTT paket, zpracuje jej, zavolá callback, pokud šlo o PUBLISH na odebíraném tématu, a vrátí se. Callbacky odběrů se spouštějí zevnitř tohoto volání – žádné vlákno na pozadí neexistuje.

Pro interaktivní smyčku kamery, která musí dál vykonávat jinou práci, je check_msg() stejnou logikou v neblokující podobě. Používá select.select() s časovým limitem 50 ms a vrací se okamžitě, pokud nic nečeká:

while True:
    client.check_msg()
    run_frame()                  # capture + processing
    check_motion_threshold()

9.19.4. Čisté znovupřipojení

Každý dlouho běžící MQTT klient musí zvládat přerušená připojení. Výpadky Wi-Fi, restarty brokeru, časové limity NAT nebo prostě překročení keepalive bez provozu – to vše socket ukončí. Přibalený klient vyvolá OSError (nebo holou výjimku s návratovým kódem brokeru) z volání, které výpadek zaznamenalo, a standardním vzorem je smyčka opakovaných pokusů:

import time

def keep_publishing(client, topic, get_message):
    while True:
        try:
            client.connect()
            while True:
                client.publish(topic, get_message())
                time.sleep(5)
        except OSError:
            print('connection lost, reconnecting in 5s')
            time.sleep(5)

Odběry nejsou zachovány napříč znovupřipojeními, pokud klient při připojení nepředal clean_session=False, takže vnitřní connect by měl také znovu vydat všechna volání subscribe(), než se dostane do publikační smyčky.

9.19.5. Háček poslední vůle (last will)

Kamera hlásící svůj stav by měla brokeru sdělit, jakou zprávu má jejím jménem odeslat, pokud připojení neočekávaně zanikne. Nastavte poslední vůli před connect()

client = MQTTClient(
    client_id='yard-cam',
    server='broker.example.com',
    port=8883,
    ssl_params={'server_hostname': 'broker.example.com'},
)
client.set_last_will(
    b'yard-cam/status',
    b'offline',
    retain=True,
    qos=0,
)
client.connect()
client.publish(b'yard-cam/status', b'online', retain=True)

Nyní každý dashboard odebírající yard-cam/status uvidí online ve chvíli, kdy se kamera připojí, a offline pokaždé, když broker zjistí, že se kamera odpojila. Uchovaná zpráva offline přetrvává na brokeru, takže dashboard, který se připojí o deset minut později, stále vidí správný aktuální stav.

9.19.6. Kdy zvolit MQTT místo HTTP

Kapitola o webových serverech popisuje kameru ve roli HTTP serveru a na stránce o nahrávání do cloudu ve roli HTTP klienta odesílajícího JPEGy na pevnou URL. Obojí má své místo. Správný okamžik sáhnout místo toho po MQTT:

  • Stejná data se musí dostat k několika posluchačům (dashboard, notifikační služba, záznamník), aniž by kamera předem znala jejich seznam.

  • Posluchači mohou přicházet a odcházet, aniž by se kamera restartovala.

  • Kamera chce odebírat – přijímat příkazy od řídicí jednotky – což HTTP klient nedokáže bez dlouhého dotazování (long polling) nebo serveru, který odesílá data na callback URL.

  • Připojení musí levně přečkat dlouhá období nečinnosti.

Správný okamžik zůstat u HTTP: jedna kamera, jeden server, pevný vzor požadavek/odpověď s tělem, které je příliš velké pro jediné MQTT téma (JPEG snímky přes MQTT fungují, ale jsou k brokeru neslušné; HTTP POST je přirozenější volba).

Křížový odkaz: stránka o nahrávání do cloudu v kapitole o webových serverech ukazuje HTTP verzi „kamera → cloudový archiv“. MQTT verze téhož problému ponechává kameru oddělenou od URL archivu a umožňuje druhému konzumentovi (řekněme aplikaci pro upozornění na telefonu) napojit se na tentýž proud dat.