9.19. MQTT Pythonissa

Jokaisessa verkotetussa OpenMV-kamerassa mukana toimitettava mqtt-moduuli kääräisee MQTT-siirtoprotokollan yhteen luokkaan, mqtt.MQTTClient. Luokka avaa TCP-soketin, suorittaa CONNECT-kättelyn, pakkaa ja purkaa tavutason paketit, hoitaa PINGREQ-elossapidon ja jakaa saapuvat PUBLISH-viestit takaisinkutsulle. Sovelluskoodi kutsuu connect()-, publish()-, subscribe()- ja wait_msg() / check_msg()-metodeja.

9.19.1. Julkaisija viidellätoista rivillä

Pienin hyödyllinen ohjelma on yksittäinen julkaisu. Yhdistä, julkaise yksi viesti, katkaise yhteys:

from mqtt import MQTTClient

client = MQTTClient(
    client_id='yard-cam',
    server='test.mosquitto.org',
    port=1883,
)
client.connect()
client.publish(b'yard-cam/motion', b'detected at 14:02', qos=0)
client.disconnect()

test.mosquitto.org on Eclipse Mosquitto -projektin ylläpitämä julkinen testivälittäjä. Se hyväksyy pelkät TCP-yhteydet portissa 1883 ilman tunnistetietoja. Älä käytä sitä mihinkään vakavaan; sillä ei ole mitään yksityisyystakuita ja aihenimiavaruus on jaettu kaikkien muiden internetin testaajien kanssa.

client_id-arvon on oltava yksilöllinen kutakin välittäjäyhteyttä kohden – välittäjä käyttää sitä istuntojen seuraamiseen. Aiheet ja viestin hyötykuormat ovat tavuja; välitä str, jos se on kätevämpää, niin asiakas koodaa sen UTF-8-muotoon.

9.19.2. Yhdistäminen TLS:n kautta

Kaikkeen pikakokeiluja pidemmälle menevään MQTT TLS:n kautta on yhden lisäargumentin asia. ssl_params-sanakirja välitetään ssl.wrap_socket()-funktiolle, joten kaikki mikä toimii siellä, toimii myös täällä:

import ssl

client = MQTTClient(
    client_id='yard-cam',
    server='broker.example.com',
    port=8883,                          # TLS-MQTT default port
    ssl_params={'server_hostname': 'broker.example.com'},
    user='yard-cam',
    password=load_token(),
)

Portti 8883 on IANA:n varaama TLS-MQTT-portti. server_hostname ottaa käyttöön SNI:n, jotta jaetun IP:n takana olevat välittäjät voivat reitittää oikeaan varmenteeseen – sama mekanismi, jota HTTPS käyttää. user / password vastaavat CONNECT-paketin käyttäjänimi- ja salasanakenttiä; välittäjä päättää, antavatko nämä tunnistetiedot oikeudet julkaista tai tilata tiettyjä aiheita.

9.19.3. Tilaaminen ja vastaanottaminen

Vastaanottaakseen viestejä asiakas tarjoaa takaisinkutsun ja kutsuu subscribe(). Takaisinkutsu saa kaksi tavu-argumenttia, aiheen ja hyötykuorman:

def on_message(topic, msg):
    print('received on', topic.decode(), ':', msg.decode())

client = MQTTClient(
    client_id='dashboard',
    server='test.mosquitto.org',
    port=1883,
    callback=on_message,
)
client.connect()
client.subscribe(b'yard-cam/motion', qos=0)
while True:
    client.wait_msg()

wait_msg() estyy, kunnes yksi MQTT-paketti saapuu, jäsentää sen, kutsuu takaisinkutsua jos kyseessä oli PUBLISH tilatussa aiheessa, ja palaa. Tilatut takaisinkutsut laukeavat tuon kutsun sisältä – taustasäiettä ei ole.

Interaktiiviselle kamerasilmukalle, jonka on jatkettava muiden tehtävien tekemistä, check_msg() on sama logiikka ei-estävässä muodossa. Se käyttää select.select()-funktiota 50 ms:n aikakatkaisulla ja palaa välittömästi, jos mitään ei ole odottamassa:

while True:
    client.check_msg()
    run_frame()                  # capture + processing
    check_motion_threshold()

9.19.4. Siisti uudelleenyhdistäminen

Jokaisen pitkään käynnissä olevan MQTT-asiakkaan on käsiteltävä katkenneet yhteydet. Wi-Fi-katkokset, välittäjän uudelleenkäynnistykset, NAT-aikakatkaisut tai pelkkä elossapidon ohittaminen ilman liikennettä päättävät kaikki soketin. Mukana toimitettava asiakas nostaa OSError-poikkeuksen (tai pelkän poikkeuksen välittäjän paluukoodilla) siitä kutsusta, joka havaitsi katkoksen, ja vakiomalli on uudelleenyrityssilmukka:

import time

def keep_publishing(client, topic, get_message):
    while True:
        try:
            client.connect()
            while True:
                client.publish(topic, get_message())
                time.sleep(5)
        except OSError:
            print('connection lost, reconnecting in 5s')
            time.sleep(5)

Tilauksia ei säilytetä uudelleenyhdistämisten yli, ellei asiakas välittänyt clean_session=False-arvoa yhdistettäessä, joten sisemmän connect-kutsun pitäisi myös uudelleenantaa kaikki subscribe()-kutsut ennen julkaisusilmukkaan siirtymistä.

9.19.5. Viimeisen tahdon koukku

Tilaa raportoivan kameran pitäisi kertoa välittäjälle, mikä viesti lähetetään kameran puolesta, jos yhteys katkeaa odottamatta. Aseta tahto ennen connect()-kutsua:

client = MQTTClient(
    client_id='yard-cam',
    server='broker.example.com',
    port=8883,
    ssl_params={'server_hostname': 'broker.example.com'},
)
client.set_last_will(
    b'yard-cam/status',
    b'offline',
    retain=True,
    qos=0,
)
client.connect()
client.publish(b'yard-cam/status', b'online', retain=True)

Nyt mikä tahansa yard-cam/status-aihetta tilannut kojelauta näkee online heti kun kamera yhdistää, ja offline aina kun välittäjä huomaa kameran katkenneen. Säilytetty offline-viesti pysyy välittäjässä, joten kymmenen minuuttia myöhemmin yhdistävä kojelauta näkee silti oikean nykytilan.

9.19.6. Milloin valita MQTT HTTP:n sijaan

Verkkopalvelimet-luku käsittelee kameraa HTTP-palvelimena ja pilveen lataaminen -sivulla HTTP-asiakkaana, joka lähettää JPEG-kuvia kiinteään URL-osoitteeseen. Molemmilla on paikkansa. Oikea aika tarttua MQTT:hen sen sijaan:

  • Saman datan on mentävä usealle kuuntelijalle (kojelauta, ilmoituspalvelu, tallennin) ilman että kamera tietää luetteloa etukäteen.

  • Kuuntelijat voivat tulla ja mennä ilman että kamera käynnistyy uudelleen.

  • Kamera haluaa tilata – vastaanottaa komentoja ohjaimelta – mitä HTTP-asiakas ei voi tehdä ilman pitkää kyselyä tai palvelinta, joka työntää tietoa takaisinkutsu-URL:iin.

  • Yhteyden on selvittävä pitkistä jouten olevista jaksoista edullisesti.

Oikea aika pysyä HTTP:ssä: yksi kamera, yksi palvelin, kiinteä pyyntö-vastaus-malli rungolla, joka on liian suuri yhdelle MQTT-aiheelle (JPEG-kehysten lähettäminen MQTT:llä toimii mutta on epäkohteliasta välittäjää kohtaan; HTTP POST on luonteva valinta).

Ristiviite: pilveen lataaminen -sivu verkkopalvelimet-luvussa näyttää HTTP-version ”kamera → pilviarkisto” -ratkaisusta. Saman ongelman MQTT-versio pitää kameran irrotettuna arkiston URL-osoitteesta ja antaa toisen kuluttajan (vaikkapa puhelimen hälytyssovelluksen) kytkeytyä samaan virtaan.