mqtt — Простой клиент MQTT

Модуль mqtt предоставляет минимальную реализацию клиента MQTT v3.1.1, пригодную для использования на устройствах с ограниченной памятью. Он поддерживает подключение к брокеру (с опциональным TLS), публикацию сообщений, подписку на топики, настройку last-will и keep-alive пинги.

Пример:

from mqtt import MQTTClient

def callback(topic, msg):
    print(topic, msg)

client = MQTTClient("openmv", "broker.example.com", 1883)
client.set_callback(callback)
client.connect()
client.subscribe(b"openmv/in")
client.publish(b"openmv/out", b"hello")
while True:
    client.check_msg()

Исключения

exception mqtt.MQTTException

Возникает, когда брокер отклоняет запрос CONNECT или когда запрос SUBSCRIBE отклоняется. Единственным аргументом является числовой код возврата, предоставленный брокером.

Классы

class mqtt.MQTTClient(client_id: bytes | str, server: str, port: int, ssl_params: dict | None = None, user: bytes | str | None = None, password: bytes | str | None = None, keepalive: int = 0, callback: callable | None = None)

Создаёт клиент MQTT.

Аргументы:

  • client_id — уникальный идентификатор клиента, отправляемый брокеру.

  • server — имя хоста или IP-адрес брокера (разрешается с помощью socket.getaddrinfo()).

  • port — TCP-порт брокера (обычно 1883 для открытого соединения или 8883 для TLS).

  • ssl_params — если не None, сокет оборачивается с помощью ssl.wrap_socket(), а ssl_params передаётся как именованные аргументы. Передайте {}, чтобы включить TLS с настройками по умолчанию.

  • user — необязательное имя пользователя для аутентификации на брокере. Если задано, password также должен быть указан.

  • password — необязательный пароль, используемый вместе с user.

  • keepalive — интервал keep-alive в секундах (0 отключает). Должен быть меньше 65536.

  • callback — вызываемый объект, вызываемый как callback(topic, msg) для каждого сообщения PUBLISH, доставленного от брокера. Также может быть установлен позже с помощью set_callback().

Методы

set_callback(f: callable) None

Устанавливает функцию обратного вызова, вызываемую методами wait_msg() и check_msg() при получении сообщения PUBLISH. Функция обратного вызова вызывается как f(topic, msg), где оба аргумента имеют тип bytes.

set_last_will(topic: bytes | str, msg: bytes | str, retain: bool = False, qos: int = 0) None

Настраивает Last Will and Testament для MQTT. Брокер публикует msg в topic, если клиент отключается некорректно. Должен быть вызван перед connect().

Аргументы:

  • topic — топик last-will (должен быть непустым).

  • msg — полезная нагрузка last-will.

  • retain — если True, брокер сохраняет сообщение will как удержанное сообщение.

  • qos — уровень QoS для last-will. Должен быть 0, 1 или 2.

connect(clean_session: bool = True, timeout: float = 5.0) int

Открывает TCP-соединение (и опционально TLS) с брокером и отправляет пакет CONNECT.

Аргументы:

  • clean_session — если True, запрашивает чистую сессию; в противном случае брокер возобновляет любое предыдущее состояние сессии.

  • timeout — таймаут сокета в секундах, применяемый к нижележащему сокету.

Возвращает флаг брокера session present (0 или 1). Возбуждает MQTTException, если брокер возвращает ненулевой код возврата CONNACK.

disconnect() None

Отправляет пакет DISCONNECT и закрывает нижележащий сокет.

ping() None

Отправляет пакет PINGREQ брокеру. Должен вызываться периодически, если keepalive не равен нулю, чтобы брокер не разрывал соединение.

publish(topic: bytes | str, msg: bytes | str, retain: bool = False, qos: int = 0) None

Публикует msg в topic.

Аргументы:

  • topic — имя топика для публикации.

  • msg — байты полезной нагрузки.

  • retain — если True, указывает брокеру удерживать сообщение для новых подписчиков.

  • qos — уровень качества обслуживания. Поддерживаются 0 (отправить и забыть) и 1 (с подтверждением). 2 не реализован и вызовет AssertionError.

Для qos 1 вызов блокируется до получения соответствующего PUBACK. Общий размер пакета должен быть меньше 2097152 байтов.

subscribe(topic: bytes | str, qos: int = 0) None

Подписывается на topic с заданным уровнем qos. Функция обратного вызова должна быть зарегистрирована через set_callback() или предоставлена конструктору; в противном случае возбуждается AssertionError.

Блокируется до получения соответствующего SUBACK. Возбуждает MQTTException, если брокер отклоняет подписку (код возврата 0x80).

wait_msg() int | None

Блокируется в ожидании одного входящего пакета MQTT и обрабатывает его. Пакеты PUBLISH доставляются зарегистрированной функции обратного вызова. Пакеты PINGRESP поглощаются незаметно. Для других управляющих пакетов возвращается необработанный первый байт. Возвращает None, если данные недоступны или если был обработан PINGRESP.

check_msg() int | None

Неблокирующий вариант wait_msg(). Опрашивает сокет в течение не более ~50 мс с помощью select.select(); если данные ожидают обработки, выполняет ту же обработку, что и wait_msg(), в противном случае возвращает None.