11.10. Робота в ролі central

Інша сторона взаємодії — це central — пристрій, який сканує рекламні периферійні пристрої, обирає один для спілкування, відкриває з’єднання, обходить віддалену базу даних GATT і зчитує характеристики або підписується на них. Камера, що збирає показники від носимого датчика, слухає маяк або спілкується із супровідним мікроконтролером, є central.

Шаблон central у aioble проходить через чотири етапи: сканування, підключення, виявлення, робота.

11.10.1. Сканування

aioble.scan() повертає асинхронний контекстний менеджер, що водночас є асинхронним ітератором для виявлених пристроїв. Типове використання — сканувати до появи потрібного пристрою, а потім перервати ітерацію:

import aioble
import asyncio
import bluetooth

HR_SERVICE = bluetooth.UUID(0x180D)

async def find_heart_rate():
    async with aioble.scan(duration_ms=5000, active=True) as scanner:
        async for result in scanner:
            if HR_SERVICE in result.services():
                return result.device
    return None

duration_ms=5000 обмежує тривалість сканування; duration_ms=0 сканує нескінченно (до виходу із контекстного менеджера). active=True запитує відповіді на сканування, що вдвічі збільшує розмір корисного навантаження на пристрій за рахунок додаткової передачі з обох сторін. Решта аргументів interval_us / window_us налаштовують власний радіо-цикл роботи сканера і рідко змінюються від типових значень.

Кожен aioble.ScanResult містить адресу пристрою, останній RSSI, необроблені байти реклами та відповіді на сканування, а також допоміжні методи для розбору стандартних полів:

  • result.device — об’єкт aioble.Device, готовий для виклику connect().

  • result.rssi — показник рівня отриманого сигналу в дБм, корисний для логіки «обрати найближчий».

  • result.name() — рядок локального імені або None, якщо воно не рекламується.

  • result.services() — генератор bluetooth.UUID для кожної служби, яку рекламує пристрій.

  • result.manufacturer() — генератор кортежів (company_id, data) для специфічних полів виробника.

  • result.connectable — чи є найостанніша реклама такою, що допускає підключення.

Той самий ScanResult повертається повторно при надходженні нових рекламних даних від того самого пристрою, тому пасивний слухач, який хоче нескінченно відстежувати пристрої, може запускати асинхронний ітератор вічно та обробляти кожну подію.

11.10.2. Підключення

Після ідентифікації цільового пристрою відкрити з’єднання можна одним await

async def talk_to(device):
    connection = await device.connect()           # 10 s timeout
    async with connection:
        # ... do GATT work ...
        pass

aioble.Device.connect() приймає timeout_ms (час очікування встановлення з’єднання; типово 10 с) та min_conn_interval_us / max_conn_interval_us (запитуваний діапазон інтервалу з’єднання з З’єднання).

11.10.2.1. Повторне підключення до відомого пристрою без сканування

Якщо зв’язок із пристроєм вже встановлено, адреса відома і новий раунд сканування-та-вибору є зайвою витратою радіочасу. Створіть aioble.Device безпосередньо зі збереженою адресою і перейдіть одразу до connect()

import aioble

KITCHEN_CAM = aioble.Device(aioble.ADDR_PUBLIC,
                            "aa:bb:cc:dd:ee:ff")

async def talk_to_kitchen():
    async with await KITCHEN_CAM.connect() as connection:
        # ... GATT work ...
        pass

Перший аргумент — один із aioble.ADDR_PUBLIC (заводська адреса контролера) або aioble.ADDR_RANDOM (згенерована статична або приватна адреса, що розпізнається); другий — або шестибайтове значення bytes, або рядок у шістнадцятковому форматі, розділений двокрапками. Атрибути addr_type та addr будь-якого Device (наприклад, отриманого раніше з ScanResult) можна зберегти та передати сюди.

Повернутий aioble.DeviceConnection є тим, на чому тримається вся подальша робота central. async with гарантує закриття з’єднання при виході з блоку — успішному, при скасуванні або при будь-якому винятку, включаючи aioble.DeviceDisconnectedError від відключення пристрою.

Якщо central потребує більшого значення характеристики, ніж дозволяє типовий MTU у 23 байти, тут можна провести переговори:

await connection.exchange_mtu(512)

(exchange_mtu() повертає фактично узгоджений MTU, який є мінімумом між запитуваним значенням і тим, що підтримує пристрій.)

11.10.3. Виявлення

Виявлення обходить віддалену базу даних GATT для пошуку служб і характеристик за їхніми UUID. Є два варіанти: цільовий (ви знаєте UUID і хочете одну конкретну річ) та вичерпний (ви хочете все).

Цільовий — найпоширений випадок:

service = await connection.service(HR_SERVICE)
if service is None:
    return                                        # no such service

char = await service.characteristic(HR_MEASUREMENT)
if char is None:
    return                                        # no such characteristic

aioble.DeviceConnection.service() і aioble.ClientService.characteristic() кожен приймає bluetooth.UUID і повертає відповідний об’єкт (або None). Обидва мають аргумент timeout_ms для окремого виявлення, який типово дорівнює 2 с.

Вичерпний:

async for service in connection.services():
    print("service:", service.uuid)
    async for char in service.characteristics():
        print("  characteristic:", char.uuid, "properties:", hex(char.properties))

Саме це роблять загальні Bluetooth-розвідники — корисно для розробки, менш доцільно для виробничого коду, який знає, яких UUID слід очікувати.

11.10.3.1. Перевірка можливостей характеристики

Виявлення повертає бітову маску властивостей GATT, яку пристрій оголосив для кожної характеристики, у вигляді properties. Біти є стандартними GATT-визначеними: read (0x02), write-without-response (0x04), write (0x08), notify (0x10), indicate (0x20) та інші. Перевірка бітової маски перед виконанням операції дозволяє загальному клієнту адаптуватися до характеристик, можливості яких йому заздалегідь невідомі:

_PROP_READ = const(0x02)
_PROP_NOTIFY = const(0x10)

char = await service.characteristic(STATUS_UUID)
if char.properties & _PROP_NOTIFY:
    await char.subscribe(notify=True)
    value = await char.notified()
elif char.properties & _PROP_READ:
    value = await char.read()
else:
    value = None                                  # nothing the client can do

Виробничий код, який вже знає профіль GATT пристрою, зазвичай не потребує цього — UUID задокументовані заздалегідь. Загальні/дослідницькі клієнти (сторінка налаштувань, що обходить невідомий пристрій, хост плагінів) покладаються на це.

11.10.4. Операції

Після отримання central об’єкта ClientCharacteristic кожна операція GATT виконується одним викликом корутини:

  • Читання. Виконати читання GATT і отримати значення:

    value = await char.read()
    print("value:", value)
    

    Довгі читання (значення, більші за MTU) обробляються прозоро.

  • Запис. Надіслати нове значення на сервер:

    await char.write(b"\\x01")
    

    response=True чекає на відповідь запису та генерує aioble.GattError, якщо сервер відхиляє запис. response=False — запис без відповіді: надіслав і забув. response=None (типово) автоматично обирає режим на основі того, що оголосив пристрій.

  • Підписка. Увімкнути сповіщення або вказівки, записавши в CCCD характеристики:

    await char.subscribe(notify=True)
    

    Після повернення central може чекати на вхідні пуш-повідомлення.

  • Отримання сповіщення / вказівки. Зачекати на наступне пуш-повідомлення від сервера:

    while True:
        data = await char.notified()
        print("push:", data)
    

    timeout_ms=None (типово) чекає нескінченно; передайте ціле число в мілісекундах для завершення через певний час.

Поєднання чотирьох кроків дає канонічну центральну програму «підключитися, підписатися, стримінг»:

async def stream_heart_rate():
    async with aioble.scan(duration_ms=5000, active=True) as scanner:
        async for result in scanner:
            if HR_SERVICE in result.services():
                device = result.device
                break
        else:
            return

    async with await device.connect() as connection:
        service = await connection.service(HR_SERVICE)
        char = await service.characteristic(HR_MEASUREMENT)
        await char.subscribe(notify=True)
        while connection.is_connected():
            data = await char.notified()
            print("hr push:", data)

asyncio.run(stream_heart_rate())

Весь код займає близько десятка рядків і охоплює процес від «Bluetooth не запущено» до «потокова передача живих даних». Ітератор сканування відповідає шаблону мовник/спостерігач, connect відкриває GAP-з’єднання, service / characteristic обходить дерево GATT, subscribe записує CCCD, а notified чекає на пуш-повідомлення.

11.10.5. Відключення та повторне підключення

Все, що відбувається з радіоканалом, виникає в корутині, яка на нього чекала. aioble.DeviceDisconnectedError — це сигнал про те, що пристрій зник або спрацював таймаут контролю; виняток завершує будь-який виклик read(), write() або notified(), що виконувався, а будь-який блок async with connection завершується коректно.

Central, який повинен повторно підключатися після втрати зв’язку, огортає роботу у власний зовнішній цикл:

async def keep_streaming():
    while True:
        try:
            await stream_heart_rate()
        except aioble.DeviceDisconnectedError:
            print("disconnected, retrying...")
            await asyncio.sleep(2)

11.10.5.1. Обмеження послідовності за допомогою timeout()

Коли кілька операцій GATT підряд мають завершитися в межах одного бюджету — не кожна окремо зі своїм timeout_ms, а всі разом — використовуйте aioble.DeviceConnection.timeout() для їх обгортання. Повернутий контекстний менеджер скасовує своє тіло, якщо бюджет вичерпано (генерує asyncio.TimeoutError) або якщо пристрій відключився (генерує aioble.DeviceDisconnectedError):

async with await device.connect() as connection:
    try:
        with connection.timeout(2000):                    # 2 s for the whole block
            service = await connection.service(HR_SERVICE)
            char = await service.characteristic(HR_MEASUREMENT)
            await char.subscribe(notify=True)
    except asyncio.TimeoutError:
        print("discovery + subscribe took too long")

Це більш чистий варіант порівняно з обгортанням кожного виклику окремо у asyncio.wait_for() та уникненням хибних успіхів, коли кожен виклик вкладається у свій дедлайн, але послідовність загалом виходить за межі. Передача timeout_ms=None у timeout() вимикає дедлайн і залишає активним лише захист від відключення.