11.10. Работа в роли центрального устройства

Другая сторона разговора – центральное устройство – устройство, которое сканирует рекламирующие себя периферийные устройства, выбирает одно для общения, открывает соединение, обходит удалённую базу данных GATT и читает характеристики на нём или подписывается на них. Камера, которая собирает показания с носимого датчика, слушает маяк или общается с сопутствующим микроконтроллером, является центральным устройством.

Шаблон центрального устройства в aioble проходит через четыре этапа: сканирование, подключение, обнаружение, работа.

11.10.1. Сканирование

aioble.scan() возвращает асинхронный контекстный менеджер, который одновременно служит асинхронным итератором по обнаруженным устройствам. Типичное применение – сканировать, пока не появится интересующее устройство, затем выйти из итерации:

import aioble
import asyncio
import bluetooth

HR_SERVICE = bluetooth.UUID(0x180D)

async def find_heart_rate():
    async with aioble.scan(duration_ms=5000, active=True) as scanner:
        async for result in scanner:
            if HR_SERVICE in result.services():
                return result.device
    return None

duration_ms=5000 ограничивает, как долго выполняется сканирование; duration_ms=0 сканирует бесконечно (пока не завершится контекстный менеджер). active=True запрашивает ответы на сканирование, что удваивает размер полезной нагрузки на устройство ценой небольшой дополнительной передачи с обеих сторон. Остальные именованные аргументы interval_us / window_us настраивают собственный радиоцикл сканера и редко меняются относительно значений по умолчанию.

Каждый aioble.ScanResult предоставляет адрес устройства, последнее значение RSSI, необработанные байты рекламы и ответа на сканирование, а также вспомогательные методы, разбирающие стандартные поля:

  • result.deviceaioble.Device, готовый к вызову connect().

  • result.rssi – индикатор уровня принимаемого сигнала в дБм, полезен для логики «выбрать ближайшее».

  • result.name() – строка локального имени или None, если оно не рекламируется.

  • result.services() – генератор bluetooth.UUID для каждого сервиса, который рекламирует устройство.

  • result.manufacturer() – генератор кортежей (company_id, data) для полей, специфичных для производителя.

  • result.connectable – была ли самая последняя реклама пригодной для подключения.

Тот же ScanResult повторно выдаётся по мере поступления новых рекламных данных для того же устройства, так что пассивный слушатель, который просто хочет неограниченно отслеживать устройства, может выполнять асинхронный итератор бесконечно и реагировать на каждое событие.

11.10.2. Подключение

Как только целевое устройство определено, открытие соединения – это один await:

async def talk_to(device):
    connection = await device.connect()           # 10 s timeout
    async with connection:
        # ... do GATT work ...
        pass

aioble.Device.connect() принимает timeout_ms (как долго ждать установления соединения; по умолчанию 10 с) и min_conn_interval_us / max_conn_interval_us (запрашиваемый диапазон интервала соединения из Соединения).

11.10.2.1. Повторное подключение к известному узлу без сканирования

Как только с узлом установлена привязка, адрес уже известен, и ещё один раунд сканирования и выбора – это потраченное впустую радиовремя. Создайте aioble.Device напрямую с сохранённым адресом и сразу переходите к connect():

import aioble

KITCHEN_CAM = aioble.Device(aioble.ADDR_PUBLIC,
                            "aa:bb:cc:dd:ee:ff")

async def talk_to_kitchen():
    async with await KITCHEN_CAM.connect() as connection:
        # ... GATT work ...
        pass

Первый аргумент – один из aioble.ADDR_PUBLIC (заводской адрес контроллера) или aioble.ADDR_RANDOM (сгенерированный статический или разрешаемый приватный адрес); второй – либо шестибайтовое значение bytes, либо шестнадцатеричная строка, разделённая двоеточиями. Атрибуты addr_type и addr любого Device (например, полученного ранее из ScanResult) можно сохранить и снова передать сюда.

Возвращаемый aioble.DeviceConnection – это то, на чём держится вся остальная работа центрального устройства. async with гарантирует, что соединение будет закрыто при выходе из блока – при успехе, при отмене или при любом исключении, включая aioble.DeviceDisconnectedError из-за пропажи узла.

Если центральному устройству нужно значение характеристики больше, чем позволяет MTU по умолчанию в 23 байта, это место для его согласования:

await connection.exchange_mtu(512)

(exchange_mtu() возвращает фактически согласованный MTU, который является минимумом из запрошенного значения и того, что поддерживает узел.)

11.10.3. Обнаружение

Обнаружение обходит удалённую базу данных GATT, чтобы найти сервисы и характеристики по их UUID. Есть две разновидности: целенаправленная (вы знаете UUID и хотите одну конкретную вещь) и исчерпывающая (вы хотите всё).

Целенаправленная – распространённый случай:

service = await connection.service(HR_SERVICE)
if service is None:
    return                                        # no such service

char = await service.characteristic(HR_MEASUREMENT)
if char is None:
    return                                        # no such characteristic

aioble.DeviceConnection.service() и aioble.ClientService.characteristic() каждый принимают bluetooth.UUID и возвращают соответствующий объект (или None). Оба имеют именованный аргумент timeout_ms на каждое обнаружение, по умолчанию равный 2 с.

Исчерпывающая:

async for service in connection.services():
    print("service:", service.uuid)
    async for char in service.characteristics():
        print("  characteristic:", char.uuid, "properties:", hex(char.properties))

Это то, что делают обобщённые приложения-обозреватели Bluetooth – полезно для разработки, менее полезно для рабочего кода, который знает, какие UUID он ожидает.

11.10.3.1. Проверка того, что поддерживает характеристика

Обнаружение возвращает битовую маску свойств GATT, которую узел рекламировал для каждой характеристики, как properties. Биты – это определённые в GATT: чтение (0x02), запись без ответа (0x04), запись (0x08), уведомление (0x10), индикация (0x20) и тому подобные. Проверка битовой маски перед выполнением операции позволяет обобщённому клиенту адаптироваться к характеристикам, чьи возможности ему заранее неизвестны:

_PROP_READ = const(0x02)
_PROP_NOTIFY = const(0x10)

char = await service.characteristic(STATUS_UUID)
if char.properties & _PROP_NOTIFY:
    await char.subscribe(notify=True)
    value = await char.notified()
elif char.properties & _PROP_READ:
    value = await char.read()
else:
    value = None                                  # nothing the client can do

Рабочему коду, который уже знает профиль GATT узла, это обычно не нужно – UUID были задокументированы заранее. На это опираются обобщённые / исследовательские клиенты (страница настроек, обходящая неизвестное устройство, хост плагинов).

11.10.4. Работа

Как только центральное устройство держит ClientCharacteristic, каждая операция GATT – это один вызов сопрограммы:

  • Чтение. Выполнить чтение GATT и получить значение обратно:

    value = await char.read()
    print("value:", value)
    

    Длинные чтения (значения больше MTU) обрабатываются прозрачно.

  • Запись. Отправить новое значение на сервер:

    await char.write(b"\\x01")
    

    response=True ожидает ответа на запись и вызывает aioble.GattError, если сервер отклоняет запись. response=False – это запись без ответа: отправил и забыл. response=None (по умолчанию) автоматически выбирает на основе того, что рекламировал узел.

  • Подписка. Включить уведомления или индикации записью в CCCD характеристики:

    await char.subscribe(notify=True)
    

    После возврата центральное устройство может ожидать входящие отправки.

  • Уведомление / индикация. Ожидать следующей отправки от сервера:

    while True:
        data = await char.notified()
        print("push:", data)
    

    timeout_ms=None (по умолчанию) ожидает бесконечно; передайте целое число в миллисекундах, чтобы сдаться через некоторое время.

Объединение этих четырёх этапов даёт каноническую программу центрального устройства «подключиться, подписаться, передавать поток»:

async def stream_heart_rate():
    async with aioble.scan(duration_ms=5000, active=True) as scanner:
        async for result in scanner:
            if HR_SERVICE in result.services():
                device = result.device
                break
        else:
            return

    async with await device.connect() as connection:
        service = await connection.service(HR_SERVICE)
        char = await service.characteristic(HR_MEASUREMENT)
        await char.subscribe(notify=True)
        while connection.is_connected():
            data = await char.notified()
            print("hr push:", data)

asyncio.run(stream_heart_rate())

Всё это занимает около дюжины строк и охватывает поток от «Bluetooth не запущен» до «живые данные передаются потоком». Итератор сканирования соответствует шаблону вещателя/наблюдателя, connect открывает соединение GAP, service / characteristic обходят дерево GATT, subscribe записывает CCCD, а notified ожидает отправок.

11.10.5. Отключения и повторное подключение

Всё, что происходит с радиоканалом, проявляется в сопрограмме, которая его ожидала. aioble.DeviceDisconnectedError – это сигнал того, что узел пропал или сработал тайм-аут наблюдения; исключение завершает любой выполняющийся вызов read(), write() или notified(), и любой блок async with connection завершается корректно.

Центральное устройство, которое должно повторно подключаться при потере, оборачивает работу в свой собственный внешний цикл:

async def keep_streaming():
    while True:
        try:
            await stream_heart_rate()
        except aioble.DeviceDisconnectedError:
            print("disconnected, retrying...")
            await asyncio.sleep(2)

11.10.5.1. Заключение последовательности в timeout()

Когда несколько операций GATT подряд должны все завершиться в рамках одного бюджета – а не каждая по отдельности на своём timeout_ms – используйте aioble.DeviceConnection.timeout(), чтобы обернуть их. Возвращаемый контекстный менеджер отменяет своё тело, если бюджет истекает (вызывая asyncio.TimeoutError) или если узел отключается (вызывая aioble.DeviceDisconnectedError):

async with await device.connect() as connection:
    try:
        with connection.timeout(2000):                    # 2 s for the whole block
            service = await connection.service(HR_SERVICE)
            char = await service.characteristic(HR_MEASUREMENT)
            await char.subscribe(notify=True)
    except asyncio.TimeoutError:
        print("discovery + subscribe took too long")

Это более чистая альтернатива оборачиванию каждого вызова по отдельности в asyncio.wait_for() и позволяет избежать ложных успехов, когда каждый вызов укладывается в свой собственный крайний срок, а последовательность в целом выходит за рамки. Передача timeout_ms=None в timeout() отключает крайний срок и оставляет активной только защиту от отключения.