aioble — Asynchronní BLE

aioble je vysokoúrovňový obal nad modulem bluetooth, přívětivý k asyncio. Poskytuje přehledné korutiny pro skenování, připojování, vysílání advertisingu, GATT služby a L2CAP kanály.

Všechny vzdálené operace (connect, disconnect, čtení/zápis klienta, indicate serveru, l2cap recv/send, pair) lze awaitovat a podporují timeouty.

Podporované role:

  • Broadcaster (vysílač) — generuje payloady advertisingu a scan-response pro běžná pole, automaticky rozděluje payload mezi advertising a scan response, vysílá neomezeně nebo po pevně danou dobu.

  • Peripheral — čeká na připojení od centrálního zařízení, čeká na výměnu MTU.

  • Observer (skener) — pasivní a aktivní skenování, kombinuje payloady advertisingu a scan-response pro totéž zařízení, parsuje běžná pole z payloadů advertisingu.

  • Central — připojí se k peripheral, iniciuje výměnu MTU.

  • GATT klient — objevuje služby / charakteristiky / deskriptory (volitelně podle UUID); čtení / zápis / write-with-response na charakteristikách a deskriptorech; přihlašuje se k notifikacím a indikacím (přes CCCD); čeká na notifikace a indikace.

  • GATT server — registruje služby / charakteristiky / deskriptory; čeká na zápisy do charakteristik a deskriptorů; zachytává požadavky na čtení; odesílá notifikace a indikace (a čeká na odpověď).

  • L2CAP — přijímá a navazuje L2CAP spojení orientované na kanál, spravuje řízení toku kanálu.

  • Zabezpečení — správa klíčů/tajemství zálohovaná JSONem, iniciace párování, dotazování na stav šifrování / autentizace.

Příklady

Skenování blízkých BLE zařízení a vytisknutí každého z nich, jakmile je spatřeno:

import aioble
import asyncio

async def find_devices():
    async with aioble.scan(duration_ms=5000, active=True) as scanner:
        async for result in scanner:
            print(result.device.addr_hex(), result.rssi, result.name())

asyncio.run(find_devices())

Připojení k peripheral vysílajícímu službu Heart Rate jako central a přihlášení k jeho notifikacím měření:

import aioble
import asyncio
import bluetooth

_HR_SERVICE = bluetooth.UUID(0x180D)
_HR_MEASUREMENT = bluetooth.UUID(0x2A37)

async def connect_and_read():
    device = None
    async with aioble.scan(duration_ms=5000, active=True) as scanner:
        async for result in scanner:
            if _HR_SERVICE in result.services():
                device = result.device
                break
    if device is None:
        return

    async with await device.connect() as conn:
        service = await conn.service(_HR_SERVICE)
        char = await service.characteristic(_HR_MEASUREMENT)
        await char.subscribe(notify=True)
        while True:
            data = await char.notified()
            print("notify:", data)

asyncio.run(connect_and_read())

Chování jako peripheral: registrace GATT služby, její vysílání a odesílání notifikací každému, kdo se připojí:

import aioble
import asyncio
import bluetooth
import struct

_ENV_SERVICE = bluetooth.UUID(0x181A)
_TEMP_CHAR = bluetooth.UUID(0x2A6E)

def encode_temperature(deg_c):
    # Bluetooth Temperature (0x2A6E) is sint16 little-endian, 0.01 degC units.
    return struct.pack("<h", round(deg_c * 100))

service = aioble.Service(_ENV_SERVICE)
temp_char = aioble.Characteristic(service, _TEMP_CHAR, read=True, notify=True)
aioble.register_services(service)

async def peripheral_task():
    while True:
        connection = await aioble.advertise(
            interval_us=250000,
            name="openmv-sensor",
            services=[_ENV_SERVICE],
            appearance=0x0300,
        )
        print("connected:", connection.device.addr_hex())
        async with connection:
            while connection.is_connected():
                temp_char.write(encode_temperature(23.68), send_update=True)
                await asyncio.sleep(1)

asyncio.run(peripheral_task())

Funkce na úrovni modulu

aioble.config(*args, **kwargs) Any

Předává volání metodě bluetooth.BLE.config() a nejprve zajistí, že BLE rádio je aktivní.

args

Volitelný název jediného parametru k dotazování.

kwargs

Klíčové argumenty pro nastavení konfiguračních hodnot.

aioble.stop() None

Deaktivuje podkladové BLE rádio a spustí všechny registrované obslužné rutiny vypnutí podmodulů. Po tomto volání jsou všechny skenery, vysílače, spojení a L2CAP kanály zrušeny.

aioble.scan(duration_ms: int, interval_us: int | None = None, window_us: int | None = None, active: bool = False) scan

Vrací scan jako asynchronní kontextový manažer / asynchronní iterátor, který poskytuje instance ScanResult pro každé jedinečné objevené zařízení (nebo pro každý nový kus advertisingových dat ze známého zařízení).

duration_ms

Jak dlouho skenovat, v milisekundách. Předejte 0 pro neomezené skenování až do ukončení kontextového manažeru.

interval_us

Interval skenování v mikrosekundách. Výchozí hodnota je 1 280 000.

window_us

Okno skenování v mikrosekundách (musí být menší nebo rovno interval_us). Výchozí hodnota je 11 250.

active

Pokud je True, provede aktivní skenování (požaduje data scan response). Výchozí hodnota je False.

aioble.advertise(interval_us: int, adv_data: bytes | None = None, resp_data: bytes | None = None, connectable: bool = True, limited_disc: bool = False, br_edr: bool = False, name: str | None = None, services: list | None = None, appearance: int = 0, manufacturer: tuple | None = None, timeout_ms: int | None = None) DeviceConnection

Asynchronní korutina, která zahájí vysílání advertisingu a čeká na příchozí připojení od centrálního zařízení. Vrací DeviceConnection reprezentující připojené centrální zařízení, nebo při timeoutu vyvolá asyncio.TimeoutError.

interval_us

Interval advertisingu v mikrosekundách.

adv_data

Surový payload advertisingu. Pokud není nastaven, adv_data se sestaví ze zbývajících klíčových argumentů.

resp_data

Surový payload scan response. V případě potřeby se automaticky naplní přetečením z adv_data.

connectable

Pokud je True, jde o připojitelný advertisement.

limited_disc

Použít příznak limited-discoverable namísto general.

br_edr

Nastavit příznak BR/EDR-supported.

name

Volitelný úplný místní název k vložení.

services

Iterovatelný objekt bluetooth.UUID k vysílání.

appearance

16bitová hodnota appearance (viz přidělená čísla Bluetooth).

manufacturer

N-tice (company_id, data_bytes) k vysílání jako data specifická pro výrobce.

timeout_ms

Zastavit vysílání advertisingu po tomto počtu milisekund bez připojení. None znamená vysílat až do připojení.

aioble.register_services(*services: Service) None

Zaregistruje jeden nebo více objektů Service (a jejich charakteristiky a deskriptory) u GATT serveru. Musí být voláno jednou před spuštěním advertise. Následná volání nahradí předchozí registraci.

services

Jedna nebo více instancí Service.

Konstanty na úrovni modulu

aioble.ADDR_PUBLIC

Typ veřejné adresy BLE zařízení (0).

aioble.ADDR_RANDOM

Typ náhodné adresy BLE zařízení (1).

Výjimky

exception aioble.GattError

Vyvolána, když vzdálená GATT operace (čtení / zápis / indicate) skončí s nenulovým stavem. Stavový kód je dostupný v atributu _status.

exception aioble.DeviceDisconnectedError

Vyvolána uvnitř asynchronní operace (např. read, write, notified), když podkladové spojení během čekání vypadne.

exception aioble.L2CAPDisconnectedError

Vyvolána při pokusu o operaci send/recv/flush L2CAP kanálu na odpojeném kanálu (nebo při jejím přerušení odpojením).

exception aioble.L2CAPConnectionError

Vyvolána metodou DeviceConnection.l2cap_connect, když navázání kanálu selže. Stavový kód Bluetooth je prvním argumentem.

Třídy

class aioble.Device(addr_type: int, addr: bytes | str)

Reprezentuje vzdálené BLE zařízení podle adresy. Dvě instance Device jsou si rovny, pokud se shodují jak addr_type, tak addr. Používá se jako handle pro iniciaci připojení.

addr_type

Buď ADDR_PUBLIC, nebo ADDR_RANDOM.

addr

Šestibajtová adresa jako bytes, nebo hexadecimální řetězec oddělený dvojtečkami (např. "aa:bb:cc:dd:ee:ff").

addr_type

Typ adresy, se kterým bylo zařízení vytvořeno.

addr

Surová šestibajtová adresa zařízení.

addr_hex() str

Vrátí adresu naformátovanou jako hexadecimální řetězec oddělený dvojtečkami.

connect(timeout_ms: int = 10000, scan_duration_ms: int | None = None, min_conn_interval_us: int | None = None, max_conn_interval_us: int | None = None) Awaitable[DeviceConnection]

Asynchronní. Iniciuje GAP připojení k tomuto zařízení a vrátí výsledné DeviceConnection. Zruší jakékoli probíhající skenování.

timeout_ms

Jak dlouho čekat na dokončení připojení.

scan_duration_ms

Počáteční doba skenování před připojením (závislé na řadiči).

min_conn_interval_us / max_conn_interval_us

Volitelné meze intervalu připojení v mikrosekundách.

class aioble.DeviceConnection

Aktivní GAP připojení k Device. Vrací jej Device.connect() nebo advertise. Podporuje použití jako kontextový manažer async with, který se při ukončení automaticky odpojí.

Nekonstruujte přímo.

device

Podkladové Device.

encrypted

True, jakmile je spoj zašifrován (např. po párování).

authenticated

True, pokud byl spoj autentizován (párování chráněné proti MITM).

bonded

True, pokud párování vytvořilo bondovací klíče.

key_size

Vyjednaná velikost šifrovacího klíče v bajtech, nebo False, pokud není šifrováno.

mtu

Vyjednané ATT MTU po exchange_mtu, nebo None dokud není nastaveno.

is_connected() bool

Vrátí, zda je spojení stále aktivní.

disconnect(timeout_ms: int = 2000) Awaitable[None]

Asynchronní. Odpojí a počká na IRQ odpojení.

timeout_ms

Maximální doba čekání na odpojení.

disconnected(timeout_ms: int | None = None, disconnect: bool = False) Awaitable[None]

Asynchronní. Čeká, dokud spojení neukončí jedna ze stran. Pokud je disconnect True, nejprve aktivně odpojí.

timeout_ms

Maximální doba čekání. None znamená čekat navždy.

disconnect

Pokud je True, iniciuje odpojení.

timeout(timeout_ms: int | None) DeviceTimeout

Vrátí kontextový manažer, který zruší své tělo, pokud buď uplyne timeout (vyvolá asyncio.TimeoutError), nebo se zařízení odpojí (vyvolá DeviceDisconnectedError).

timeout_ms

Timeout v milisekundách, nebo None pro žádný timeout.

exchange_mtu(mtu: int | None = None, timeout_ms: int = 1000) Awaitable[int]

Asynchronní. Iniciuje výměnu ATT MTU a vrátí vyjednané MTU.

mtu

Volitelné preferované MTU k nastavení na podkladovém BLE rozhraní před výměnou.

timeout_ms

Timeout pro výměnu.

service(uuid: bluetooth.UUID, timeout_ms: int = 2000) Awaitable[ClientService | None]

Asynchronní. Objeví jedinou vzdálenou službu odpovídající uuid, nebo None, pokud nebyla nalezena.

services(uuid: bluetooth.UUID | None = None, timeout_ms: int = 2000) ClientDiscover

Vrátí asynchronní iterátor vzdálených objektů ClientService. Použijte s async for a nechte smyčku doběhnout.

uuid

Volitelný filtr UUID. None vrátí každou službu.

timeout_ms

Timeout na jedno objevování.

pair(bond: bool = True, le_secure: bool = True, mitm: bool = False, io: int = 3, timeout_ms: int = 20000) Awaitable[None]

Asynchronní. Iniciuje párování na tomto spojení. Po dokončení aktualizuje atributy encrypted / authenticated / bonded / key_size.

bond

Trvale uloží párovací klíče.

le_secure

Použít LE Secure Connections.

mitm

Vyžadovat ochranu proti man-in-the-middle.

io

Konstanta schopnosti IO (např. 3 pro žádný vstup/výstup).

timeout_ms

Timeout párování.

l2cap_accept(psm: int, mtu: int, timeout_ms: int | None = None) Awaitable[L2CAPChannel]

Asynchronní. Naslouchá na zadaném PSM a vrátí L2CAPChannel, jakmile jej protistrana otevře.

psm

Protocol/Service Multiplexer, na kterém se má naslouchat.

mtu

Maximální velikost příjmu v bajtech.

timeout_ms

Maximální doba čekání na připojení protistrany.

l2cap_connect(psm: int, mtu: int, timeout_ms: int = 1000) Awaitable[L2CAPChannel]

Asynchronní. Otevře L2CAP kanál k protistraně na zadaném PSM.

psm

Protocol/Service Multiplexer, ke kterému se připojit.

mtu

Maximální velikost příjmu v bajtech.

timeout_ms

Timeout připojení.

class aioble.ScanResult

Jediné zařízení objevené během scan. Tatáž instance je znovu poskytnuta, jakmile dorazí nová advertisingová data.

Nekonstruujte přímo.

device

Podkladové Device.

rssi

Naposledy hlášené RSSI v dBm.

adv_data

Surový payload advertisingu (bytes nebo None).

resp_data

Surový payload scan response (bytes nebo None), pokud je povoleno aktivní skenování.

connectable

True, pokud byl nejnovější advertisement připojitelný.

name() str | None

Dekóduje úplný (nebo zkrácený) vysílaný místní název z payloadu, nebo None, pokud není přítomen.

services() Iterator[bluetooth.UUID]

Generátor poskytující každé bluetooth.UUID vysílané v polích seznamu služeb 16/32/128bitových.

manufacturer(filter: int | None = None) Iterator[tuple[int, bytes]]

Generátor poskytující n-tice (company_id, data) z advertisingových polí specifických pro výrobce.

filter

Pokud je zadán, poskytne pouze záznamy, jejichž company ID odpovídá.

class aioble.Service(uuid: bluetooth.UUID)

Místní GATT služba. Sestavte službu z jedné nebo více instancí Characteristic a poté ji předejte register_services.

uuid

UUID služby.

uuid

UUID služby.

characteristics

Seznam objektů Characteristic navázaných na tuto službu.

class aioble.Characteristic(service: Service, uuid: bluetooth.UUID, read: bool = False, write: bool = False, write_no_response: bool = False, notify: bool = False, indicate: bool = False, initial: bytes | None = None, capture: bool = False)

Místní GATT charakteristika. Její konstrukce ji automaticky přidá k service.

service

Vlastnící Service.

uuid

UUID charakteristiky.

read, write, write_no_response, notify, indicate

Booleovské hodnoty vybírající podporované GATT operace.

initial

Volitelná počáteční hodnota (bytes).

capture

Pokud je True, zapsané hodnoty se řadí do fronty (až do hloubky 10), takže rychlé po sobě jdoucí zápisy nejsou ztraceny. Každé volání written poté vrátí n-tici (connection, data).

uuid

UUID charakteristiky.

flags

Bitová maska příznaků GATT vlastností sestavená z konstruktoru.

descriptors

Seznam objektů Descriptor navázaných na tuto charakteristiku.

read() bytes

Přečte aktuální hodnotu z místní GATT databáze.

write(data: bytes, send_update: bool = False) None

Aktualizuje hodnotu v místní GATT databázi.

data

Bajty nové hodnoty.

send_update

Pokud je True, také notifikuje/indikuje každé přihlášené spojení.

notify(connection: DeviceConnection, data: bytes | None = None) None

Odešle GATT Notify do connection.

connection

Cílové klientské spojení.

data

Payload k odeslání. Pokud je None, odešle se aktuální místní hodnota.

indicate(connection: DeviceConnection, data: bytes | None = None, timeout_ms: int = 1000) Awaitable[None]

Asynchronní. Odešle GATT Indicate do connection a počká na potvrzení od klienta. Při nenulovém stavu vyvolá GattError.

connection

Cílové klientské spojení.

data

Payload k indikaci, nebo None k odeslání místní hodnoty.

timeout_ms

Maximální doba čekání na potvrzení.

written(timeout_ms: int | None = None) Awaitable[DeviceConnection | tuple[DeviceConnection, bytes]]

Asynchronní. Čeká na vzdálený zápis. Vrací zapisující DeviceConnection, nebo (connection, data), pokud byla charakteristika vytvořena s capture=True.

timeout_ms

Maximální doba čekání. None čeká navždy.

on_read(connection: DeviceConnection) int

Přepisovatelný hook volaný synchronně při přijetí vzdáleného čtení. Vrácením 0 čtení povolíte, nenulovým ATT chybovým kódem jej odmítnete. Výchozí implementace vrací 0.

class aioble.BufferedCharacteristic(service: Service, uuid: bluetooth.UUID, max_len: int = 20, append: bool = False, **kwargs)

Characteristic, jejíž podkladový GATT buffer lze konfigurovat. Užitečné pro příjem hodnot větších než výchozí velikost atributu, nebo pro řazení po sobě jdoucích zápisů do fronty.

max_len

Velikost bufferu v bajtech.

append

Pokud je True, sekvenční zápisy se připojují do bufferu namísto přepisování.

Ostatní argumenty se předávají do Characteristic.

class aioble.Descriptor(characteristic: Characteristic, uuid: bluetooth.UUID, read: bool = False, write: bool = False, initial: bytes | None = None)

Místní GATT deskriptor. Jeho konstrukce jej automaticky přidá k characteristic. Dědí read, write a written z Characteristic.

characteristic

Vlastnící Characteristic.

uuid

UUID deskriptoru.

read, write

Booleovské hodnoty vybírající podporované GATT operace.

initial

Volitelná počáteční hodnota (bytes).

class aioble.ClientService

Vzdálená GATT služba objevená na protistraně. Vrací ji DeviceConnection.service() nebo se iteruje z DeviceConnection.services().

Nekonstruujte přímo.

connection

Vlastnící DeviceConnection.

uuid

UUID vzdálené služby.

characteristic(uuid: bluetooth.UUID, timeout_ms: int = 2000) Awaitable[ClientCharacteristic | None]

Asynchronní. Objeví jedinou charakteristiku podle UUID, nebo None, pokud nebyla nalezena.

characteristics(uuid: bluetooth.UUID | None = None, timeout_ms: int = 2000) ClientDiscover

Vrátí asynchronní iterátor objektů ClientCharacteristic. Použijte s async for a nechte smyčku doběhnout.

uuid

Volitelný filtr UUID.

timeout_ms

Timeout na jedno objevování.

class aioble.ClientCharacteristic

Vzdálená GATT charakteristika objevená na protistraně. Vrací ji ClientService.characteristic() nebo se iteruje z ClientService.characteristics().

Nekonstruujte přímo.

service

Vlastnící ClientService.

uuid

UUID charakteristiky.

properties

Bitová maska podporovaných GATT operací tak, jak je hlásí protistrana.

read(timeout_ms: int = 1000) Awaitable[bytes]

Asynchronní. Vyšle GATT Read a vrátí hodnotu. Při nenulovém stavu vyvolá GattError.

timeout_ms

Timeout čtení.

write(data: bytes, response: bool | None = None, timeout_ms: int = 1000) Awaitable[None]

Asynchronní. Vyšle GATT Write.

data

Hodnota k zápisu.

response

True pro vyžadování write-response (a vyvolání GattError při selhání). False pro write-without-response. None (výchozí) automaticky volí podle toho, co protistrana inzeruje.

timeout_ms

Timeout zápisu (relevantní pouze, pokud je response True).

notified(timeout_ms: int | None = None) Awaitable[bytes]

Asynchronní. Čeká na další notifikaci na této charakteristice a vrátí její payload. Vrátí se okamžitě, pokud je notifikace již ve frontě.

timeout_ms

Maximální doba čekání. None čeká navždy.

indicated(timeout_ms: int | None = None) Awaitable[bytes]

Asynchronní. Čeká na další indikaci na této charakteristice a vrátí její payload.

timeout_ms

Maximální doba čekání.

subscribe(notify: bool = True, indicate: bool = False) Awaitable[None]

Asynchronní. Zapíše Client Characteristic Configuration Descriptor (CCCD) pro přihlášení (nebo odhlášení) notifikací a/nebo indikací.

notify

Povolit notifikace.

indicate

Povolit indikace.

descriptor(uuid: bluetooth.UUID, timeout_ms: int = 2000) Awaitable[ClientDescriptor | None]

Asynchronní. Objeví jediný deskriptor podle UUID, nebo None, pokud nebyl nalezen.

descriptors(timeout_ms: int = 2000) ClientDiscover

Vrátí asynchronní iterátor objektů ClientDescriptor. Použijte s async for a nechte smyčku doběhnout.

class aioble.ClientDescriptor

Vzdálený GATT deskriptor objevený na protistraně. Dědí read a write z ClientCharacteristic.

Nekonstruujte přímo.

characteristic

Vlastnící ClientCharacteristic.

uuid

UUID deskriptoru.

class aioble.L2CAPChannel

Aktivní L2CAP kanál orientovaný na spojení. Vrací jej DeviceConnection.l2cap_accept() nebo DeviceConnection.l2cap_connect(). Podporuje použití jako kontextový manažer async with, který se při ukončení automaticky odpojí.

Nekonstruujte přímo.

our_mtu

Maximální velikost v bajtech, kterou nám protistrana může poslat v jediném SDU.

peer_mtu

Maximální velikost v bajtech, kterou můžeme poslat protistraně v jediném SDU.

available() bool

Synchronně vrátí True, pokud jsou bufferovaná přijatá data připravena (tj. recvinto nezablokuje).

recvinto(buf: bytearray, timeout_ms: int | None = None) Awaitable[int]

Asynchronní. Přijme do buf a vrátí počet přečtených bajtů. Pokud je kanál prázdný, počká na nová data.

buf

Předem alokovaný buffer k naplnění.

timeout_ms

Maximální doba čekání. None čeká navždy.

send(buf: bytes, timeout_ms: int | None = None, chunk_size: int | None = None) Awaitable[None]

Asynchronní. Odešle buf na kanálu a fragmentuje větší payloady na kusy velikosti MTU. Podle potřeby čeká na kredity řízení toku.

buf

Objekt typu bytes k odeslání.

timeout_ms

Maximální doba čekání na jeden kus.

chunk_size

Volitelné přepsání velikosti kusu pro jedno volání. Omezeno na min(our_mtu * 2, peer_mtu).

flush(timeout_ms: int | None = None) Awaitable[None]

Asynchronní. Čeká, dokud řadič nevyprázdní jakékoli zastavené send.

timeout_ms

Maximální doba čekání.

disconnect(timeout_ms: int = 1000) Awaitable[None]

Asynchronní. Odpojí kanál a počká na IRQ odpojení.

timeout_ms

Maximální doba čekání.

disconnected(timeout_ms: int = 1000) Awaitable[None]

Asynchronní. Čeká, dokud kanál neodpojí jedna ze stran.

timeout_ms

Maximální doba čekání.