9.19. MQTT u Pythonu

Ugrađeni modul mqtt na svakoj umreženoj OpenMV kameri omata MQTT žičani protokol u jednu klasu, mqtt.MQTTClient. Klasa otvara TCP utičnicu, izvodi CONNECT rukovanje, pakira i raspakirava pakete na razini bajtova, obrađuje PINGREQ održavanje veze i šalje dolazne PUBLISH poruke povratnom pozivu. Aplikacijski kod poziva connect(), publish(), subscribe() te wait_msg() / check_msg().

9.19.1. Objavljivač u petnaest redaka

Najmanji koristan program je jedna objava. Spojite se, objavite jednu poruku, prekinite vezu:

from mqtt import MQTTClient

client = MQTTClient(
    client_id='yard-cam',
    server='test.mosquitto.org',
    port=1883,
)
client.connect()
client.publish(b'yard-cam/motion', b'detected at 14:02', qos=0)
client.disconnect()

test.mosquitto.org je javni testni broker koji vodi projekt Eclipse Mosquitto. Prihvaća obične TCP veze na priključku 1883 bez vjerodajnica. Ne koristite ga ni za što ozbiljno; nema jamstva privatnosti, a prostor naziva tema dijeli se sa svim ostalim testerima na internetu.

client_id mora biti jedinstven po vezi s brokerom – broker ga koristi za praćenje sesija. Teme i korisni tereti poruka su bajtovi; proslijedite str ako je to prikladnije i klijent će ga kodirati kao UTF-8.

9.19.2. Spajanje preko TLS-a

Za sve osim brzih eksperimenata, MQTT preko TLS-a jedan je dodatni argument. Rječnik ssl_params prosljeđuje se funkciji ssl.wrap_socket(), pa sve što tamo radi radi i ovdje:

import ssl

client = MQTTClient(
    client_id='yard-cam',
    server='broker.example.com',
    port=8883,                          # TLS-MQTT default port
    ssl_params={'server_hostname': 'broker.example.com'},
    user='yard-cam',
    password=load_token(),
)

Priključak 8883 je TLS-MQTT priključak rezerviran od strane IANA-e. server_hostname uključuje SNI tako da brokeri iza dijeljene IP adrese mogu usmjeriti na ispravan certifikat – isti mehanizam koji koristi HTTPS. user / password preslikavaju se na polja korisničkog imena/lozinke CONNECT paketa; broker odlučuje daju li te vjerodajnice pravo objavljivanja ili pretplate na određene teme.

9.19.3. Pretplata i primanje

Za primanje poruka klijent navodi povratni poziv i poziva subscribe(). Povratni poziv prima dva argumenta tipa bytes, temu i korisni teret:

def on_message(topic, msg):
    print('received on', topic.decode(), ':', msg.decode())

client = MQTTClient(
    client_id='dashboard',
    server='test.mosquitto.org',
    port=1883,
    callback=on_message,
)
client.connect()
client.subscribe(b'yard-cam/motion', qos=0)
while True:
    client.wait_msg()

wait_msg() blokira dok ne stigne jedan MQTT paket, raščlanjuje ga, poziva povratni poziv ako je bila riječ o PUBLISH poruci na pretplaćenoj temi i vraća se. Pretplaćeni povratni pozivi okidaju iznutra tog poziva – nema pozadinske dretve.

Za interaktivnu petlju kamere koja treba nastaviti obavljati druge poslove, check_msg() je ista logika u neblokirajućem obliku. Koristi select.select() s istekom od 50 ms i vraća se odmah ako ništa nije na čekanju:

while True:
    client.check_msg()
    run_frame()                  # capture + processing
    check_motion_threshold()

9.19.4. Čisto ponovno spajanje

Svaki dugotrajni MQTT klijent mora obraditi prekinute veze. Wi-Fi prekidi, ponovna pokretanja brokera, istek NAT-a ili jednostavno premašivanje vremena održavanja veze bez prometa – sve to okončava utičnicu. Ugrađeni klijent podiže OSError (ili goli iznimak s povratnim kodom brokera) iz poziva koji je primijetio prekid, a standardni obrazac je petlja ponovnih pokušaja:

import time

def keep_publishing(client, topic, get_message):
    while True:
        try:
            client.connect()
            while True:
                client.publish(topic, get_message())
                time.sleep(5)
        except OSError:
            print('connection lost, reconnecting in 5s')
            time.sleep(5)

Pretplate se ne zadržavaju kroz ponovna spajanja osim ako je klijent pri spajanju proslijedio clean_session=False, pa bi unutarnji connect također trebao ponovno izdati sve subscribe() pozive prije nego što uđe u petlju objavljivanja.

9.19.5. Kuka za posljednju volju (last-will)

Kamera koja izvještava o statusu trebala bi reći brokeru koju poruku da pošalje u ime kamere ako se veza neočekivano prekine. Postavite volju prije connect()

client = MQTTClient(
    client_id='yard-cam',
    server='broker.example.com',
    port=8883,
    ssl_params={'server_hostname': 'broker.example.com'},
)
client.set_last_will(
    b'yard-cam/status',
    b'offline',
    retain=True,
    qos=0,
)
client.connect()
client.publish(b'yard-cam/status', b'online', retain=True)

Sada svaka nadzorna ploča pretplaćena na yard-cam/status vidi online u trenutku kada se kamera spoji i offline kad god broker primijeti da je kamera otpala. Zadržana poruka offline ostaje na brokeru pa nadzorna ploča koja se spoji deset minuta kasnije i dalje vidi ispravno trenutno stanje.

9.19.6. Kada odabrati MQTT umjesto HTTP-a

Poglavlje o web poslužiteljima pokriva kameru koja djeluje kao HTTP poslužitelj te, na stranici o učitavanju u oblak, kao HTTP klijent koji šalje JPEG-ove na fiksni URL. Oboje ima svoje mjesto. Pravi trenutak da posegnete za MQTT-om umjesto toga:

  • Isti podaci moraju ići prema više slušatelja (nadzorna ploča, obavijesna usluga, snimač), a da kamera unaprijed ne zna popis.

  • Slušatelji mogu dolaziti i odlaziti bez ponovnog pokretanja kamere.

  • Kamera se želi pretplatiti – primati naredbe od upravljača – što HTTP klijent ne može učiniti bez dugog ispitivanja (long polling) ili poslužitelja koji gura podatke na URL povratnog poziva.

  • Veza mora jeftino preživjeti duga razdoblja mirovanja.

Pravi trenutak da ostanete kod HTTP-a: jedna kamera, jedan poslužitelj, fiksni obrazac zahtjeva/odgovora s tijelom koje je preveliko za jednu MQTT temu (JPEG sličice preko MQTT-a rade, ali su nepristojne prema brokeru; HTTP POST je prirodno rješenje).

Unakrsna poveznica: stranica o učitavanju u oblak u poglavlju o web poslužiteljima prikazuje HTTP verziju „kamera → arhiva u oblaku”. MQTT verzija istog problema drži kameru razdvojenom od URL-a arhive i omogućuje drugom potrošaču (recimo aplikaciji za obavijesti na telefonu) da se priključi istom toku.