mqtt --- シンプルな MQTT クライアント

mqtt モジュールは、メモリに制約のあるデバイスでの使用に適した、最小限の MQTT v3.1.1 クライアント実装を提供します。ブローカーへの接続(省略可能な TLS 付き)、メッセージのパブリッシュ、トピックへのサブスクライブ、ラストウィルの構成、およびキープアライブの ping をサポートしています。

例:

from mqtt import MQTTClient

def callback(topic, msg):
    print(topic, msg)

client = MQTTClient("openmv", "broker.example.com", 1883)
client.set_callback(callback)
client.connect()
client.subscribe(b"openmv/in")
client.publish(b"openmv/out", b"hello")
while True:
    client.check_msg()

例外

exception mqtt.MQTTException

ブローカーが CONNECT リクエストを拒否した場合、または SUBSCRIBE リクエストが拒否された場合に発生します。唯一の引数はブローカーから提供される数値のリターンコードです。

クラス

class mqtt.MQTTClient(client_id: bytes | str, server: str, port: int, ssl_params: dict | None = None, user: bytes | str | None = None, password: bytes | str | None = None, keepalive: int = 0, callback: callable | None = None)

MQTT クライアントを構築します。

引数:

  • client_id -- ブローカーに送信される一意のクライアント識別子です。

  • server -- ブローカーのホスト名または IP アドレス(socket.getaddrinfo() で解決されます)。

  • port -- ブローカーの TCP ポート(通常は平文の 1883 または TLS の場合は 8883)。

  • ssl_params -- None でない場合、ソケットは ssl.wrap_socket() でラップされ、ssl_params がキーワード引数として転送されます。デフォルト設定で TLS を有効にするには {} を渡してください。

  • user -- ブローカー認証用の省略可能なユーザー名です。指定する場合は、password も指定する必要があります。

  • password -- user と一緒に使用される省略可能なパスワードです。

  • keepalive -- キープアライブ間隔(秒単位、0 は無効化)。65536 未満でなければなりません。

  • callback -- ブローカーから配信される PUBLISH ごとに callback(topic, msg) として呼び出される呼び出し可能オブジェクトです。set_callback() で後から設定することもできます。

メソッド

set_callback(f: callable) None

PUBLISH メッセージを受信したときに wait_msg() および check_msg() によって呼び出されるコールバックを設定します。コールバックは、両方の引数を bytes として f(topic, msg) のように呼び出されます。

set_last_will(topic: bytes | str, msg: bytes | str, retain: bool = False, qos: int = 0) None

MQTT のラストウィルおよびテスタメント(Last Will and Testament)を構成します。クライアントが正常に切断されなかった場合、ブローカーは topicmsg をパブリッシュします。connect() の前に呼び出す必要があります。

引数:

  • topic -- ラストウィルのトピック(空であってはなりません)。

  • msg -- ラストウィルのペイロードです。

  • retain -- True の場合、ブローカーはウィルメッセージを保持メッセージとして保存します。

  • qos -- ラストウィルの QoS レベルです。01、または 2 でなければなりません。

connect(clean_session: bool = True, timeout: float = 5.0) int

ブローカーへの TCP(および省略可能な TLS)接続を開き、CONNECT パケットを送信します。

引数:

  • clean_session -- True の場合、クリーンセッションを要求します。それ以外の場合、ブローカーは以前のセッション状態を再開します。

  • timeout -- 基礎となるソケットに適用されるソケットタイムアウト(秒単位)です。

ブローカーの session present フラグ(0 または 1)を返します。ブローカーがゼロ以外の CONNACK リターンコードを返した場合、MQTTException を発生させます。

disconnect() None

DISCONNECT パケットを送信し、基礎となるソケットを閉じます。

ping() None

ブローカーに PINGREQ パケットを送信します。ブローカーが接続を切断しないように、keepalive がゼロ以外の場合は定期的に呼び出す必要があります。

publish(topic: bytes | str, msg: bytes | str, retain: bool = False, qos: int = 0) None

msgtopic にパブリッシュします。

引数:

  • topic -- パブリッシュ先のトピック名です。

  • msg -- ペイロードバイトです。

  • retain -- True の場合、新しいサブスクライバーのためにメッセージを保持するようブローカーに指示します。

  • qos -- サービス品質(QoS)レベルです。0(送りっぱなし)と 1(確認応答付き)がサポートされています。2 は実装されておらず、AssertionError を発生させます。

qos1 の場合、対応する PUBACK を受信するまで呼び出しはブロックします。パケット全体のサイズは 2097152 バイト未満でなければなりません。

subscribe(topic: bytes | str, qos: int = 0) None

指定された qos レベルで topic にサブスクライブします。コールバックが set_callback() を介して登録されているか、コンストラクタに渡されている必要があります。そうでない場合は AssertionError が発生します。

対応する SUBACK を受信するまでブロックします。ブローカーがサブスクリプションを拒否した場合(リターンコード 0x80)、MQTTException を発生させます。

wait_msg() int | None

単一の受信 MQTT パケットを待機してブロックし、それを処理します。PUBLISH パケットは登録されたコールバックに配信されます。PINGRESP パケットは警告なく消費されます。その他の制御パケットについては、生の最初のバイトが返されます。データが利用できない場合、または PINGRESP が処理された場合は None を返します。

check_msg() int | None

wait_msg() の非ブロッキング版です。select.select() を使用して最大約50ミリ秒ソケットをポーリングします。データが保留中の場合は wait_msg() と同じ処理を実行し、そうでない場合は None を返します。