aioble — Async BLE

aioble — це обгортка вищого рівня з підтримкою asyncio навколо модуля bluetooth. Вона надає зручні корутини для сканування, підключення, реклами, служб GATT та каналів L2CAP.

Усі віддалені операції (connect, disconnect, client read/write, server indicate, l2cap recv/send, pair) є awaitable і підтримують тайм-аути.

Підтримувані ролі:

  • Broadcaster (advertiser) — формування корисних навантажень реклами та відповіді на сканування для стандартних полів, автоматичний розподіл навантаження між рекламою та відповіддю на сканування, рекламування безкінечно або протягом фіксованого часу.

  • Peripheral — очікування підключення від центру, очікування обміну MTU.

  • Observer (scanner) — пасивне та активне сканування, об’єднання навантажень реклами та відповіді на сканування для одного пристрою, розбір стандартних полів рекламних навантажень.

  • Central — підключення до периферійного пристрою, ініціювання обміну MTU.

  • GATT Client — виявлення служб / характеристик / дескрипторів (за бажанням за UUID); читання / запис / запис із відповіддю для характеристик і дескрипторів; підписка на сповіщення та показання (через CCCD); очікування сповіщень і показань.

  • GATT Server — реєстрація служб / характеристик / дескрипторів; очікування записів у характеристики та дескриптори; перехоплення запитів на читання; надсилання сповіщень і показань (з очікуванням відповіді).

  • L2CAP — прийом та встановлення каналів L2CAP, орієнтованих на з’єднання, керування потоком каналу.

  • Security — керування ключами/секретами на основі JSON, ініціювання сполучення, запит стану шифрування / автентифікації.

Приклади

Сканування навколишніх пристроїв BLE та виведення кожного з них у міру виявлення:

import aioble
import asyncio

async def find_devices():
    async with aioble.scan(duration_ms=5000, active=True) as scanner:
        async for result in scanner:
            print(result.device.addr_hex(), result.rssi, result.name())

asyncio.run(find_devices())

Підключення до периферійного пристрою, що рекламує службу частоти серцевих скорочень, у ролі central та підписка на сповіщення про вимірювання:

import aioble
import asyncio
import bluetooth

_HR_SERVICE = bluetooth.UUID(0x180D)
_HR_MEASUREMENT = bluetooth.UUID(0x2A37)

async def connect_and_read():
    device = None
    async with aioble.scan(duration_ms=5000, active=True) as scanner:
        async for result in scanner:
            if _HR_SERVICE in result.services():
                device = result.device
                break
    if device is None:
        return

    async with await device.connect() as conn:
        service = await conn.service(_HR_SERVICE)
        char = await service.characteristic(_HR_MEASUREMENT)
        await char.subscribe(notify=True)
        while True:
            data = await char.notified()
            print("notify:", data)

asyncio.run(connect_and_read())

Робота у ролі peripheral: реєстрація служби GATT, її реклама та надсилання сповіщень усім підключеним:

import aioble
import asyncio
import bluetooth
import struct

_ENV_SERVICE = bluetooth.UUID(0x181A)
_TEMP_CHAR = bluetooth.UUID(0x2A6E)

def encode_temperature(deg_c):
    # Bluetooth Temperature (0x2A6E) is sint16 little-endian, 0.01 degC units.
    return struct.pack("<h", round(deg_c * 100))

service = aioble.Service(_ENV_SERVICE)
temp_char = aioble.Characteristic(service, _TEMP_CHAR, read=True, notify=True)
aioble.register_services(service)

async def peripheral_task():
    while True:
        connection = await aioble.advertise(
            interval_us=250000,
            name="openmv-sensor",
            services=[_ENV_SERVICE],
            appearance=0x0300,
        )
        print("connected:", connection.device.addr_hex())
        async with connection:
            while connection.is_connected():
                temp_char.write(encode_temperature(23.68), send_update=True)
                await asyncio.sleep(1)

asyncio.run(peripheral_task())

Функції рівня модуля

aioble.config(*args, **kwargs) Any

Перенаправляє до bluetooth.BLE.config(), спочатку переконавшись, що радіомодуль BLE активний.

args

Необов’язкове ім’я одного параметра для запиту.

kwargs

Іменовані аргументи для встановлення значень конфігурації.

aioble.stop() None

Деактивує базовий радіомодуль BLE та виконує всі зареєстровані обробники завершення роботи підмодулів. Після виклику всі сканери, рекламодавці, з’єднання та канали L2CAP завершують роботу.

aioble.scan(duration_ms: int, interval_us: int | None = None, window_us: int | None = None, active: bool = False) scan

Повертає scan — асинхронний менеджер контексту / асинхронний ітератор, що видає екземпляри ScanResult для кожного унікального виявленого пристрою (або для кожного нового рекламного повідомлення від відомого пристрою).

duration_ms

Тривалість сканування в мілісекундах. Передайте 0 для безперервного сканування до виходу з менеджера контексту.

interval_us

Інтервал сканування в мікросекундах. За замовчуванням 1 280 000.

window_us

Вікно сканування в мікросекундах (має бути менше або рівне interval_us). За замовчуванням 11 250.

active

Якщо True, виконується активне сканування (запит даних відповіді на сканування). За замовчуванням False.

aioble.advertise(interval_us: int, adv_data: bytes | None = None, resp_data: bytes | None = None, connectable: bool = True, limited_disc: bool = False, br_edr: bool = False, name: str | None = None, services: list | None = None, appearance: int = 0, manufacturer: tuple | None = None, timeout_ms: int | None = None) DeviceConnection

Асинхронна корутина, що починає рекламування та чекає на вхідне підключення від центру. Повертає DeviceConnection, що представляє підключений центр, або генерує asyncio.TimeoutError при тайм-ауті.

interval_us

Інтервал реклами в мікросекундах.

adv_data

Необроблене рекламне навантаження. Якщо не задано, adv_data будується з решти іменованих аргументів.

resp_data

Необроблене навантаження відповіді на сканування. Автоматично заповнюється надлишками з adv_data за потреби.

connectable

Якщо True, це реклама з можливістю підключення.

limited_disc

Використовує прапорець обмеженого виявлення замість загального.

br_edr

Встановлює прапорець підтримки BR/EDR.

name

Необов’язкове повне локальне ім’я для вбудовування.

services

Ітерований об’єкт bluetooth.UUID для реклами.

appearance

16-бітне значення зовнішнього вигляду (див. призначені номери Bluetooth).

manufacturer

Кортеж (company_id, data_bytes) для реклами як специфічних для виробника даних.

timeout_ms

Зупинити рекламування після цієї кількості мілісекунд без підключення. None означає рекламувати до підключення.

aioble.register_services(*services: Service) None

Реєструє один або кілька об’єктів Service (разом із їхніми характеристиками та дескрипторами) на сервері GATT. Має бути викликано один раз перед запуском advertise. Наступні виклики замінюють попередню реєстрацію.

services

Один або кілька екземплярів Service.

Константи рівня модуля

aioble.ADDR_PUBLIC

Тип відкритої адреси пристрою BLE (0).

aioble.ADDR_RANDOM

Тип випадкової адреси пристрою BLE (1).

Винятки

exception aioble.GattError

Генерується, коли віддалена операція GATT (read / write / indicate) завершується з ненульовим статусом. Код статусу доступний у атрибуті _status.

exception aioble.DeviceDisconnectedError

Генерується всередині асинхронної операції (наприклад, read, write, notified), коли під час очікування розривається базове з’єднання.

exception aioble.L2CAPDisconnectedError

Генерується, коли операція надсилання/отримання/очищення L2CAP виконується на відключеному каналі (або переривається ним).

exception aioble.L2CAPConnectionError

Генерується методом DeviceConnection.l2cap_connect, коли встановлення каналу завершується невдало. Код статусу Bluetooth є першим аргументом.

Класи

class aioble.Device(addr_type: int, addr: bytes | str)

Представляє віддалений пристрій BLE за адресою. Два екземпляри Device вважаються рівними, якщо збігаються обидва параметри addr_type і addr. Використовується як дескриптор для ініціювання з’єднань.

addr_type

Або ADDR_PUBLIC, або ADDR_RANDOM.

addr

Шестибайтна адреса у вигляді bytes або рядок у шістнадцятковому форматі через двокрапку (наприклад, "aa:bb:cc:dd:ee:ff").

addr_type

Тип адреси, з яким був створений пристрій.

addr

Необроблена шестибайтна адреса пристрою.

addr_hex() str

Повертає адресу у форматі шістнадцяткового рядка, розділеного двокрапками.

connect(timeout_ms: int = 10000, scan_duration_ms: int | None = None, min_conn_interval_us: int | None = None, max_conn_interval_us: int | None = None) Awaitable[DeviceConnection]

Асинхронний. Ініціює GAP-з’єднання з цим пристроєм і повертає отриманий DeviceConnection. Скасовує будь-яке поточне сканування.

timeout_ms

Час очікування завершення з’єднання.

scan_duration_ms

Початкова тривалість сканування перед підключенням (залежить від контролера).

min_conn_interval_us / max_conn_interval_us

Необов’язкові межі інтервалу з’єднання в мікросекундах.

class aioble.DeviceConnection

Активне GAP-з’єднання з Device. Повертається методами Device.connect() або advertise. Підтримує використання як менеджера контексту async with, що автоматично відключається при виході.

Не конструювати безпосередньо.

device

Базовий Device.

encrypted

True після шифрування з’єднання (наприклад, після сполучення).

authenticated

True, якщо з’єднання було автентифіковано (сполучення із захистом від MITM).

bonded

True, якщо в результаті сполучення були отримані ключі прив’язки.

key_size

Узгоджений розмір ключа шифрування в байтах або False, якщо не зашифровано.

mtu

Узгоджений ATT MTU після exchange_mtu або None до встановлення.

is_connected() bool

Повертає, чи активне з’єднання.

disconnect(timeout_ms: int = 2000) Awaitable[None]

Асинхронний. Відключається та чекає на IRQ відключення.

timeout_ms

Максимальний час очікування відключення.

disconnected(timeout_ms: int | None = None, disconnect: bool = False) Awaitable[None]

Асинхронний. Чекає, поки з’єднання не буде завершено будь-якою стороною. Якщо disconnect дорівнює True, спочатку активно відключається.

timeout_ms

Максимальний час очікування. None означає чекати вічно.

disconnect

Якщо True, ініціює відключення.

timeout(timeout_ms: int | None) DeviceTimeout

Повертає менеджер контексту, що скасовує своє тіло, якщо закінчується тайм-аут (генеруючи asyncio.TimeoutError) або пристрій відключається (генеруючи DeviceDisconnectedError).

timeout_ms

Тайм-аут у мілісекундах або None без тайм-ауту.

exchange_mtu(mtu: int | None = None, timeout_ms: int = 1000) Awaitable[int]

Асинхронний. Ініціює обмін ATT MTU і повертає узгоджений MTU.

mtu

Необов’язковий бажаний MTU для встановлення в базовому інтерфейсі BLE перед обміном.

timeout_ms

Тайм-аут для обміну.

service(uuid: bluetooth.UUID, timeout_ms: int = 2000) Awaitable[ClientService | None]

Асинхронний. Виявляє єдину віддалену службу з відповідним uuid або None, якщо не знайдено.

services(uuid: bluetooth.UUID | None = None, timeout_ms: int = 2000) ClientDiscover

Повертає асинхронний ітератор віддалених об’єктів ClientService. Використовується з async for і виконується до завершення.

uuid

Необов’язковий фільтр UUID. None повертає кожну службу.

timeout_ms

Тайм-аут для кожного виявлення.

pair(bond: bool = True, le_secure: bool = True, mitm: bool = False, io: int = 3, timeout_ms: int = 20000) Awaitable[None]

Асинхронний. Ініціює сполучення для цього з’єднання. Оновлює атрибути encrypted / authenticated / bonded / key_size після завершення.

bond

Зберігати ключі сполучення.

le_secure

Використовувати LE Secure Connections.

mitm

Вимагати захист від атак «людина посередині».

io

Константа можливостей вводу-виводу (наприклад, 3 для відсутності введення/виведення).

timeout_ms

Тайм-аут сполучення.

l2cap_accept(psm: int, mtu: int, timeout_ms: int | None = None) Awaitable[L2CAPChannel]

Асинхронний. Прослуховує заданий PSM і повертає L2CAPChannel після того, як віддалена сторона його відкриє.

psm

Protocol/Service Multiplexer для прослуховування.

mtu

Максимальний розмір отримання в байтах.

timeout_ms

Максимальний час очікування підключення від віддаленої сторони.

l2cap_connect(psm: int, mtu: int, timeout_ms: int = 1000) Awaitable[L2CAPChannel]

Асинхронний. Відкриває канал L2CAP до віддаленої сторони на заданому PSM.

psm

Protocol/Service Multiplexer для підключення.

mtu

Максимальний розмір отримання в байтах.

timeout_ms

Тайм-аут підключення.

class aioble.ScanResult

Єдиний пристрій, виявлений під час scan. Той самий екземпляр повторно видається при надходженні нових рекламних даних.

Не конструювати безпосередньо.

device

Базовий Device.

rssi

Останнє зафіксоване значення RSSI в дБм.

adv_data

Необроблене рекламне навантаження (bytes або None).

resp_data

Необроблене навантаження відповіді на сканування (bytes або None), якщо активне сканування увімкнено.

connectable

True, якщо остання реклама допускала підключення.

name() str | None

Декодує повне (або скорочене) рекламоване локальне ім’я з навантаження або None, якщо воно відсутнє.

services() Iterator[bluetooth.UUID]

Генератор, що видає кожен bluetooth.UUID, рекламований у 16/32/128-бітних полях списку служб.

manufacturer(filter: int | None = None) Iterator[tuple[int, bytes]]

Генератор, що видає кортежі (company_id, data) із специфічних для виробника рекламних полів.

filter

Якщо задано, видавати лише записи, ідентифікатор компанії яких збігається.

class aioble.Service(uuid: bluetooth.UUID)

Локальна служба GATT. Створіть службу з одним або кількома екземплярами Characteristic, потім передайте її в register_services.

uuid

UUID служби.

uuid

UUID служби.

characteristics

Список об’єктів Characteristic, прив’язаних до цієї служби.

class aioble.Characteristic(service: Service, uuid: bluetooth.UUID, read: bool = False, write: bool = False, write_no_response: bool = False, notify: bool = False, indicate: bool = False, initial: bytes | None = None, capture: bool = False)

Локальна характеристика GATT. Конструювання автоматично додає її до service.

service

Служба-власник Service.

uuid

UUID характеристики.

read, write, write_no_response, notify, indicate

Булеві значення, що вибирають підтримувані операції GATT.

initial

Необов’язкове початкове значення (bytes).

capture

Якщо True, записані значення ставляться в чергу (до 10 елементів), щоб швидкі послідовні записи не втрачалися. Кожен виклик written тоді повертає кортеж (connection, data).

uuid

UUID характеристики.

flags

Бітова маска прапорців властивостей GATT, побудована з конструктора.

descriptors

Список об’єктів Descriptor, прив’язаних до цієї характеристики.

read() bytes

Зчитує поточне значення з локальної бази даних GATT.

write(data: bytes, send_update: bool = False) None

Оновлює значення в локальній базі даних GATT.

data

Нові байти значення.

send_update

Якщо True, також сповіщає/показує кожне підписане з’єднання.

notify(connection: DeviceConnection, data: bytes | None = None) None

Надсилає GATT Notify до connection.

connection

Цільове клієнтське з’єднання.

data

Навантаження для надсилання. Якщо None, надсилається поточне локальне значення.

indicate(connection: DeviceConnection, data: bytes | None = None, timeout_ms: int = 1000) Awaitable[None]

Асинхронний. Надсилає GATT Indicate до connection та чекає підтвердження від клієнта. Генерує GattError при ненульовому статусі.

connection

Цільове клієнтське з’єднання.

data

Навантаження для показання або None для надсилання локального значення.

timeout_ms

Максимальний час очікування підтвердження.

written(timeout_ms: int | None = None) Awaitable[DeviceConnection | tuple[DeviceConnection, bytes]]

Асинхронний. Чекає на віддалений запис. Повертає DeviceConnection, що пише, або (connection, data), якщо характеристика була створена з capture=True.

timeout_ms

Максимальний час очікування. None чекає вічно.

on_read(connection: DeviceConnection) int

Хук перевизначення, що викликається синхронно при отриманні віддаленого запиту читання. Поверніть 0, щоб дозволити читання, або ненульовий код помилки ATT, щоб відхилити. За замовчуванням повертає 0.

class aioble.BufferedCharacteristic(service: Service, uuid: bluetooth.UUID, max_len: int = 20, append: bool = False, **kwargs)

Характеристика Characteristic, чий резервний буфер GATT можна налаштувати. Корисна для отримання значень, більших за розмір атрибута за замовчуванням, або для постановки в чергу послідовних записів.

max_len

Розмір буфера в байтах.

append

Якщо True, послідовні записи додаються до буфера замість перезапису.

Інші аргументи передаються до Characteristic.

class aioble.Descriptor(characteristic: Characteristic, uuid: bluetooth.UUID, read: bool = False, write: bool = False, initial: bytes | None = None)

Локальний дескриптор GATT. Конструювання автоматично додає його до characteristic. Успадковує read, write та written від Characteristic.

characteristic

Характеристика-власник Characteristic.

uuid

UUID дескриптора.

read, write

Булеві значення, що вибирають підтримувані операції GATT.

initial

Необов’язкове початкове значення (bytes).

class aioble.ClientService

Віддалена служба GATT, виявлена на пері. Повертається методом DeviceConnection.service() або ітерується з DeviceConnection.services().

Не конструювати безпосередньо.

connection

Власник DeviceConnection.

uuid

UUID віддаленої служби.

characteristic(uuid: bluetooth.UUID, timeout_ms: int = 2000) Awaitable[ClientCharacteristic | None]

Асинхронний. Виявляє єдину характеристику за UUID або None, якщо не знайдено.

characteristics(uuid: bluetooth.UUID | None = None, timeout_ms: int = 2000) ClientDiscover

Повертає асинхронний ітератор об’єктів ClientCharacteristic. Використовується з async for і виконується до завершення.

uuid

Необов’язковий фільтр UUID.

timeout_ms

Тайм-аут для кожного виявлення.

class aioble.ClientCharacteristic

Віддалена характеристика GATT, виявлена на пері. Повертається методом ClientService.characteristic() або ітерується з ClientService.characteristics().

Не конструювати безпосередньо.

service

Служба-власник ClientService.

uuid

UUID характеристики.

properties

Бітова маска підтримуваних операцій GATT, як повідомляє пір.

read(timeout_ms: int = 1000) Awaitable[bytes]

Асинхронний. Виконує GATT Read і повертає значення. Генерує GattError при ненульовому статусі.

timeout_ms

Тайм-аут читання.

write(data: bytes, response: bool | None = None, timeout_ms: int = 1000) Awaitable[None]

Асинхронний. Виконує GATT Write.

data

Значення для запису.

response

True для вимоги відповіді на запис (і генерації GattError при помилці). False для запису без відповіді. None (за замовчуванням) автоматично вибирає на основі того, що рекламує пір.

timeout_ms

Тайм-аут запису (актуальний лише якщо response дорівнює True).

notified(timeout_ms: int | None = None) Awaitable[bytes]

Асинхронний. Чекає на наступне сповіщення для цієї характеристики і повертає його навантаження. Повертається негайно, якщо сповіщення вже стоїть у черзі.

timeout_ms

Максимальний час очікування. None чекає вічно.

indicated(timeout_ms: int | None = None) Awaitable[bytes]

Асинхронний. Чекає на наступне показання для цієї характеристики і повертає його навантаження.

timeout_ms

Максимальний час очікування.

subscribe(notify: bool = True, indicate: bool = False) Awaitable[None]

Асинхронний. Записує дескриптор конфігурації клієнтської характеристики (CCCD) для підписки (або скасування підписки) на сповіщення та/або показання.

notify

Увімкнути сповіщення.

indicate

Увімкнути показання.

descriptor(uuid: bluetooth.UUID, timeout_ms: int = 2000) Awaitable[ClientDescriptor | None]

Асинхронний. Виявляє єдиний дескриптор за UUID або None, якщо не знайдено.

descriptors(timeout_ms: int = 2000) ClientDiscover

Повертає асинхронний ітератор об’єктів ClientDescriptor. Використовується з async for і виконується до завершення.

class aioble.ClientDescriptor

Віддалений дескриптор GATT, виявлений на пері. Успадковує read та write від ClientCharacteristic.

Не конструювати безпосередньо.

characteristic

Власник ClientCharacteristic.

uuid

UUID дескриптора.

class aioble.L2CAPChannel

Активний канал L2CAP, орієнтований на з’єднання. Повертається методами DeviceConnection.l2cap_accept() або DeviceConnection.l2cap_connect(). Підтримує використання як менеджера контексту async with, що автоматично відключається при виході.

Не конструювати безпосередньо.

our_mtu

Максимальний розмір у байтах, який пір може надіслати нам в одному SDU.

peer_mtu

Максимальний розмір у байтах, який ми можемо надіслати піру в одному SDU.

available() bool

Синхронно повертає True, якщо буферизовані отримані дані готові (тобто recvinto не блокуватиметься).

recvinto(buf: bytearray, timeout_ms: int | None = None) Awaitable[int]

Асинхронний. Отримує дані в buf, повертаючи кількість прочитаних байтів. Очікує нових даних, якщо канал порожній.

buf

Попередньо виділений буфер для заповнення.

timeout_ms

Максимальний час очікування. None чекає вічно.

send(buf: bytes, timeout_ms: int | None = None, chunk_size: int | None = None) Awaitable[None]

Асинхронний. Надсилає buf через канал, розбиваючи більші навантаження на шматки розміром MTU. Очікує кредити управління потоком за потреби.

buf

Байто-подібний об’єкт для надсилання.

timeout_ms

Максимальний час очікування на кожен шматок.

chunk_size

Необов’язкове перевизначення розміру шматка для кожного виклику. Обмежено до min(our_mtu * 2, peer_mtu).

flush(timeout_ms: int | None = None) Awaitable[None]

Асинхронний. Чекає, поки будь-яке заблоковане send не буде вичерпано контролером.

timeout_ms

Максимальний час очікування.

disconnect(timeout_ms: int = 1000) Awaitable[None]

Асинхронний. Відключає канал та чекає на IRQ відключення.

timeout_ms

Максимальний час очікування.

disconnected(timeout_ms: int = 1000) Awaitable[None]

Асинхронний. Чекає, поки канал не буде відключено будь-якою стороною.

timeout_ms

Максимальний час очікування.