9.19. MQTT in Python¶
The bundled mqtt module on every networked OpenMV cam wraps
the MQTT wire protocol in one class, mqtt.MQTTClient. The
class opens the TCP socket, performs the CONNECT handshake, packs
and unpacks the byte-level packets, handles PINGREQ keepalive, and
dispatches incoming PUBLISH messages to a callback. Application code calls connect(), publish(),
subscribe(), and wait_msg() / check_msg().
9.19.1. A publisher in fifteen lines¶
The smallest useful program is a single publish. Connect, publish one message, disconnect:
from mqtt import MQTTClient
client = MQTTClient(
client_id='yard-cam',
server='test.mosquitto.org',
port=1883,
)
client.connect()
client.publish(b'yard-cam/motion', b'detected at 14:02', qos=0)
client.disconnect()
test.mosquitto.org is the public test broker run by the Eclipse
Mosquitto project. It accepts plain-TCP connections on port 1883 with
no credentials. Don’t use it for anything serious; it has no privacy
guarantees and the topic namespace is shared with every other tester
on the internet.
client_id must be unique per broker connection – the broker uses
it to track sessions. Topics and message payloads are bytes; pass
str if it’s more convenient and the client will encode it as
UTF-8.
9.19.2. Connecting over TLS¶
For anything past quick experiments, MQTT over TLS is one extra
argument. The ssl_params dict is forwarded to ssl.wrap_socket(),
so anything that works there works here:
import ssl
client = MQTTClient(
client_id='yard-cam',
server='broker.example.com',
port=8883, # TLS-MQTT default port
ssl_params={'server_hostname': 'broker.example.com'},
user='yard-cam',
password=load_token(),
)
Port 8883 is the IANA-reserved TLS-MQTT port. server_hostname
turns on SNI so brokers behind a shared IP can route to the right
certificate – the same mechanism HTTPS uses. user /
password map to the CONNECT packet’s username/password fields;
the broker decides whether those credentials grant publish or
subscribe rights to specific topics.
9.19.3. Subscribing and receiving¶
To receive messages a client provides a callback and calls
subscribe(). The callback receives two bytes arguments, the
topic and the payload:
def on_message(topic, msg):
print('received on', topic.decode(), ':', msg.decode())
client = MQTTClient(
client_id='dashboard',
server='test.mosquitto.org',
port=1883,
callback=on_message,
)
client.connect()
client.subscribe(b'yard-cam/motion', qos=0)
while True:
client.wait_msg()
wait_msg() blocks until one MQTT packet arrives, parses it,
calls the callback if it was a PUBLISH on a subscribed topic, and
returns. Subscribed callbacks fire from inside that call – there is
no background thread.
For an interactive cam loop that needs to keep doing other work,
check_msg() is the same logic in non-blocking form. It uses
select.select() with a 50 ms timeout and returns immediately if
nothing is pending:
while True:
client.check_msg()
run_frame() # capture + processing
check_motion_threshold()
9.19.4. Reconnecting cleanly¶
Any long-running MQTT client has to handle dropped connections.
Wi-Fi disconnects, broker restarts, NAT timeouts, or simply running
past keepalive without traffic all end the socket. The bundled
client raises OSError (or a bare exception with the broker’s
return code) from the call that noticed the drop, and the standard
pattern is a retry loop:
import time
def keep_publishing(client, topic, get_message):
while True:
try:
client.connect()
while True:
client.publish(topic, get_message())
time.sleep(5)
except OSError:
print('connection lost, reconnecting in 5s')
time.sleep(5)
Subscriptions are not persisted across reconnects unless the
client passed clean_session=False on connect, so the inner
connect should also re-issue any subscribe() calls before
falling into the publish loop.
9.19.5. The last-will hook¶
A cam reporting status should tell the broker what message to send
on the cam’s behalf if the connection dies unexpectedly. Set the
will before connect():
client = MQTTClient(
client_id='yard-cam',
server='broker.example.com',
port=8883,
ssl_params={'server_hostname': 'broker.example.com'},
)
client.set_last_will(
b'yard-cam/status',
b'offline',
retain=True,
qos=0,
)
client.connect()
client.publish(b'yard-cam/status', b'online', retain=True)
Now any dashboard subscribed to yard-cam/status sees online
the moment the cam connects and offline whenever the broker
notices the cam dropped. The retained offline message persists
on the broker so a dashboard that connects ten minutes later still
sees the correct current state.
9.19.6. When to pick MQTT over HTTP¶
The webservers chapter covers the cam acting as an HTTP server and, on the cloud-upload page, as an HTTP client posting JPEGs to a fixed URL. Both have their place. The right time to reach for MQTT instead:
The same data needs to go to several listeners (a dashboard, a notification service, a recorder) without the cam knowing the list in advance.
Listeners may come and go without the cam restarting.
The cam wants to subscribe – to receive commands from a controller – which an HTTP client can’t do without long polling or a server pushing on a callback URL.
The connection has to survive long idle periods cheaply.
The right time to stick with HTTP: one cam, one server, a fixed request/response pattern with a body that’s too large for a single MQTT topic (JPEG frames over MQTT works but is rude to the broker; HTTP POST is the natural fit).
Cross-link: the cloud-upload page in the webservers chapter shows the HTTP version of “cam → cloud archive”. The MQTT version of the same problem keeps the cam decoupled from the archive’s URL and lets a second consumer (a phone alert app, say) tap the same stream.