9.19. PythonでのMQTT

ネットワーク対応のすべてのOpenMV camに同梱されている mqtt モジュールは、MQTTのワイヤープロトコルを1つのクラス mqtt.MQTTClient でラップします。このクラスはTCPソケットを開き、CONNECTハンドシェイクを実行し、バイトレベルのパケットをパックおよびアンパックし、PINGREQによるキープアライブを処理し、受信したPUBLISHメッセージをコールバックにディスパッチします。アプリケーションコードは connect()publish()subscribe()、そして wait_msg() / check_msg() を呼び出します。

9.19.1. 15行のパブリッシャ

最も小さく実用的なプログラムは単一のパブリッシュです。接続し、1つのメッセージをパブリッシュし、切断します:

from mqtt import MQTTClient

client = MQTTClient(
    client_id='yard-cam',
    server='test.mosquitto.org',
    port=1883,
)
client.connect()
client.publish(b'yard-cam/motion', b'detected at 14:02', qos=0)
client.disconnect()

test.mosquitto.org は、Eclipse Mosquittoプロジェクトが運営する公開テストブローカーです。資格情報なしでポート1883上の平文TCP接続を受け付けます。本格的な用途には使用しないでください。プライバシーの保証はなく、トピック名前空間はインターネット上の他のすべてのテスターと共有されています。

client_id はブローカー接続ごとに一意でなければなりません。ブローカーはこれを使ってセッションを追跡します。トピックとメッセージのペイロードはバイト列です。その方が便利であれば str を渡すことができ、クライアントがそれをUTF-8としてエンコードします。

9.19.2. TLS経由での接続

簡単な実験を超えるあらゆる用途では、TLS経由のMQTTは引数を1つ追加するだけです。ssl_params の辞書は ssl.wrap_socket() に転送されるため、そこで動作するものはここでも動作します:

import ssl

client = MQTTClient(
    client_id='yard-cam',
    server='broker.example.com',
    port=8883,                          # TLS-MQTT default port
    ssl_params={'server_hostname': 'broker.example.com'},
    user='yard-cam',
    password=load_token(),
)

ポート 8883 はIANAが予約したTLS-MQTTポートです。server_hostname はSNIを有効にし、共有IPの背後にあるブローカーが正しい証明書にルーティングできるようにします。これはHTTPSが使うのと同じ仕組みです。user / password はCONNECTパケットのユーザー名/パスワードフィールドに対応します。ブローカーは、それらの資格情報が特定のトピックへのパブリッシュまたはサブスクライブの権限を与えるかどうかを判断します。

9.19.3. サブスクライブと受信

メッセージを受信するには、クライアントがコールバックを提供して subscribe() を呼び出します。コールバックは、トピックとペイロードという2つのバイト列引数を受け取ります:

def on_message(topic, msg):
    print('received on', topic.decode(), ':', msg.decode())

client = MQTTClient(
    client_id='dashboard',
    server='test.mosquitto.org',
    port=1883,
    callback=on_message,
)
client.connect()
client.subscribe(b'yard-cam/motion', qos=0)
while True:
    client.wait_msg()

wait_msg() は、1つのMQTTパケットが到着するまでブロックし、それをパースし、サブスクライブしているトピック上のPUBLISHであればコールバックを呼び出して戻ります。サブスクライブしたコールバックはその呼び出しの内部から発火します。バックグラウンドスレッドはありません。

他の処理を続ける必要があるインタラクティブなcamループには、check_msg() が同じロジックを非ブロッキング形式で提供します。これは50 msのタイムアウトで select.select() を使い、保留中のものがなければ即座に戻ります:

while True:
    client.check_msg()
    run_frame()                  # capture + processing
    check_motion_threshold()

9.19.4. クリーンな再接続

長時間動作するMQTTクライアントは、接続の切断を処理しなければなりません。Wi-Fiの切断、ブローカーの再起動、NATのタイムアウト、あるいは単にトラフィックなしでキープアライブを過ぎてしまうことなど、いずれもソケットを終了させます。同梱のクライアントは、切断を検知した呼び出しから OSError(またはブローカーの戻りコードを伴う素の例外)を送出し、標準的なパターンはリトライループです:

import time

def keep_publishing(client, topic, get_message):
    while True:
        try:
            client.connect()
            while True:
                client.publish(topic, get_message())
                time.sleep(5)
        except OSError:
            print('connection lost, reconnecting in 5s')
            time.sleep(5)

クライアントが接続時に clean_session=False を渡さない限り、サブスクリプションは再接続をまたいで永続化 されません。そのため、内側の connect は、パブリッシュループに入る前にすべての subscribe() 呼び出しを再発行する必要もあります。

9.19.5. ラストウィルフック

ステータスを報告するcamは、接続が予期せず切断された場合にcamに代わってブローカーがどのメッセージを送るべきかを伝えておくべきです。connect() にウィルを設定します:

client = MQTTClient(
    client_id='yard-cam',
    server='broker.example.com',
    port=8883,
    ssl_params={'server_hostname': 'broker.example.com'},
)
client.set_last_will(
    b'yard-cam/status',
    b'offline',
    retain=True,
    qos=0,
)
client.connect()
client.publish(b'yard-cam/status', b'online', retain=True)

これで、yard-cam/status をサブスクライブしているダッシュボードは、camが接続した瞬間に online を、ブローカーがcamの切断に気づいたときにはいつでも offline を見ることになります。保持された offline メッセージはブローカー上に永続化されるため、10分後に接続するダッシュボードでも正しい現在の状態を見ることができます。

9.19.6. HTTPよりMQTTを選ぶべき場合

Webサーバーの章では、camがHTTPサーバーとして動作する場合、そしてクラウドアップロードのページでは、固定URLにJPEGをPOSTするHTTPクライアントとして動作する場合を扱います。どちらにもそれぞれの用途があります。代わりにMQTTに手を伸ばすのに適したタイミングは次のとおりです。

  • 同じデータを、camが事前にリストを知ることなく、複数のリスナー(ダッシュボード、通知サービス、レコーダー)に送る必要がある場合。

  • camを再起動することなく、リスナーが現れたり消えたりする可能性がある場合。

  • camが サブスクライブ したい場合、つまりコントローラーからコマンドを受信したい場合。これは、ロングポーリングやコールバックURLへのサーバープッシュなしには、HTTPクライアントには行えません。

  • 接続が長いアイドル期間を安価に生き延びなければならない場合。

HTTPに留まるのに適したタイミングは次のとおりです。1台のcam、1台のサーバー、そして単一のMQTTトピックには大きすぎるボディを伴う固定のリクエスト/レスポンスパターンの場合(MQTT経由のJPEGフレームは動作しますがブローカーに対して無作法です。HTTP POSTが自然な選択です)。

クロスリンク: Webサーバーの章のクラウドアップロードのページでは、「cam → クラウドアーカイブ」のHTTP版を示しています。同じ問題のMQTT版では、camをアーカイブのURLから切り離した状態に保ち、2番目のコンシューマ(たとえばスマートフォンのアラートアプリ)が同じストリームをタップできるようにします。