aioble — Asynchronní BLE¶
aioble je vysokoúrovňový obal nad modulem bluetooth, přívětivý k asyncio. Poskytuje přehledné korutiny pro skenování, připojování, vysílání advertisingu, GATT služby a L2CAP kanály.
Všechny vzdálené operace (connect, disconnect, čtení/zápis klienta, indicate serveru, l2cap recv/send, pair) lze awaitovat a podporují timeouty.
Podporované role:
Broadcaster (vysílač) — generuje payloady advertisingu a scan-response pro běžná pole, automaticky rozděluje payload mezi advertising a scan response, vysílá neomezeně nebo po pevně danou dobu.
Peripheral — čeká na připojení od centrálního zařízení, čeká na výměnu MTU.
Observer (skener) — pasivní a aktivní skenování, kombinuje payloady advertisingu a scan-response pro totéž zařízení, parsuje běžná pole z payloadů advertisingu.
Central — připojí se k peripheral, iniciuje výměnu MTU.
GATT klient — objevuje služby / charakteristiky / deskriptory (volitelně podle UUID); čtení / zápis / write-with-response na charakteristikách a deskriptorech; přihlašuje se k notifikacím a indikacím (přes CCCD); čeká na notifikace a indikace.
GATT server — registruje služby / charakteristiky / deskriptory; čeká na zápisy do charakteristik a deskriptorů; zachytává požadavky na čtení; odesílá notifikace a indikace (a čeká na odpověď).
L2CAP — přijímá a navazuje L2CAP spojení orientované na kanál, spravuje řízení toku kanálu.
Zabezpečení — správa klíčů/tajemství zálohovaná JSONem, iniciace párování, dotazování na stav šifrování / autentizace.
Příklady¶
Skenování blízkých BLE zařízení a vytisknutí každého z nich, jakmile je spatřeno:
import aioble
import asyncio
async def find_devices():
async with aioble.scan(duration_ms=5000, active=True) as scanner:
async for result in scanner:
print(result.device.addr_hex(), result.rssi, result.name())
asyncio.run(find_devices())
Připojení k peripheral vysílajícímu službu Heart Rate jako central a přihlášení k jeho notifikacím měření:
import aioble
import asyncio
import bluetooth
_HR_SERVICE = bluetooth.UUID(0x180D)
_HR_MEASUREMENT = bluetooth.UUID(0x2A37)
async def connect_and_read():
device = None
async with aioble.scan(duration_ms=5000, active=True) as scanner:
async for result in scanner:
if _HR_SERVICE in result.services():
device = result.device
break
if device is None:
return
async with await device.connect() as conn:
service = await conn.service(_HR_SERVICE)
char = await service.characteristic(_HR_MEASUREMENT)
await char.subscribe(notify=True)
while True:
data = await char.notified()
print("notify:", data)
asyncio.run(connect_and_read())
Chování jako peripheral: registrace GATT služby, její vysílání a odesílání notifikací každému, kdo se připojí:
import aioble
import asyncio
import bluetooth
import struct
_ENV_SERVICE = bluetooth.UUID(0x181A)
_TEMP_CHAR = bluetooth.UUID(0x2A6E)
def encode_temperature(deg_c):
# Bluetooth Temperature (0x2A6E) is sint16 little-endian, 0.01 degC units.
return struct.pack("<h", round(deg_c * 100))
service = aioble.Service(_ENV_SERVICE)
temp_char = aioble.Characteristic(service, _TEMP_CHAR, read=True, notify=True)
aioble.register_services(service)
async def peripheral_task():
while True:
connection = await aioble.advertise(
interval_us=250000,
name="openmv-sensor",
services=[_ENV_SERVICE],
appearance=0x0300,
)
print("connected:", connection.device.addr_hex())
async with connection:
while connection.is_connected():
temp_char.write(encode_temperature(23.68), send_update=True)
await asyncio.sleep(1)
asyncio.run(peripheral_task())
Funkce na úrovni modulu¶
- aioble.config(*args, **kwargs) Any¶
Předává volání metodě
bluetooth.BLE.config()a nejprve zajistí, že BLE rádio je aktivní.- args
Volitelný název jediného parametru k dotazování.
- kwargs
Klíčové argumenty pro nastavení konfiguračních hodnot.
- aioble.stop() None¶
Deaktivuje podkladové BLE rádio a spustí všechny registrované obslužné rutiny vypnutí podmodulů. Po tomto volání jsou všechny skenery, vysílače, spojení a L2CAP kanály zrušeny.
- aioble.scan(duration_ms: int, interval_us: int | None = None, window_us: int | None = None, active: bool = False) scan¶
Vrací
scanjako asynchronní kontextový manažer / asynchronní iterátor, který poskytuje instanceScanResultpro každé jedinečné objevené zařízení (nebo pro každý nový kus advertisingových dat ze známého zařízení).- duration_ms
Jak dlouho skenovat, v milisekundách. Předejte
0pro neomezené skenování až do ukončení kontextového manažeru.- interval_us
Interval skenování v mikrosekundách. Výchozí hodnota je 1 280 000.
- window_us
Okno skenování v mikrosekundách (musí být menší nebo rovno interval_us). Výchozí hodnota je 11 250.
- active
Pokud je
True, provede aktivní skenování (požaduje data scan response). Výchozí hodnota jeFalse.
- aioble.advertise(interval_us: int, adv_data: bytes | None = None, resp_data: bytes | None = None, connectable: bool = True, limited_disc: bool = False, br_edr: bool = False, name: str | None = None, services: list | None = None, appearance: int = 0, manufacturer: tuple | None = None, timeout_ms: int | None = None) DeviceConnection¶
Asynchronní korutina, která zahájí vysílání advertisingu a čeká na příchozí připojení od centrálního zařízení. Vrací
DeviceConnectionreprezentující připojené centrální zařízení, nebo při timeoutu vyvoláasyncio.TimeoutError.- interval_us
Interval advertisingu v mikrosekundách.
- adv_data
Surový payload advertisingu. Pokud není nastaven, adv_data se sestaví ze zbývajících klíčových argumentů.
- resp_data
Surový payload scan response. V případě potřeby se automaticky naplní přetečením z adv_data.
- connectable
Pokud je
True, jde o připojitelný advertisement.- limited_disc
Použít příznak limited-discoverable namísto general.
- br_edr
Nastavit příznak BR/EDR-supported.
- name
Volitelný úplný místní název k vložení.
- services
Iterovatelný objekt
bluetooth.UUIDk vysílání.- appearance
16bitová hodnota appearance (viz přidělená čísla Bluetooth).
- manufacturer
N-tice
(company_id, data_bytes)k vysílání jako data specifická pro výrobce.- timeout_ms
Zastavit vysílání advertisingu po tomto počtu milisekund bez připojení.
Noneznamená vysílat až do připojení.
Konstanty na úrovni modulu¶
- aioble.ADDR_PUBLIC¶
Typ veřejné adresy BLE zařízení (
0).
- aioble.ADDR_RANDOM¶
Typ náhodné adresy BLE zařízení (
1).
Výjimky¶
- exception aioble.GattError¶
Vyvolána, když vzdálená GATT operace (čtení / zápis / indicate) skončí s nenulovým stavem. Stavový kód je dostupný v atributu
_status.
- exception aioble.DeviceDisconnectedError¶
Vyvolána uvnitř asynchronní operace (např. read, write, notified), když podkladové spojení během čekání vypadne.
- exception aioble.L2CAPDisconnectedError¶
Vyvolána při pokusu o operaci send/recv/flush L2CAP kanálu na odpojeném kanálu (nebo při jejím přerušení odpojením).
- exception aioble.L2CAPConnectionError¶
Vyvolána metodou
DeviceConnection.l2cap_connect, když navázání kanálu selže. Stavový kód Bluetooth je prvním argumentem.
Třídy¶
- class aioble.Device(addr_type: int, addr: bytes | str)¶
Reprezentuje vzdálené BLE zařízení podle adresy. Dvě instance
Devicejsou si rovny, pokud se shodují jak addr_type, tak addr. Používá se jako handle pro iniciaci připojení.- addr_type
Buď
ADDR_PUBLIC, neboADDR_RANDOM.- addr
Šestibajtová adresa jako
bytes, nebo hexadecimální řetězec oddělený dvojtečkami (např."aa:bb:cc:dd:ee:ff").
- addr_type¶
Typ adresy, se kterým bylo zařízení vytvořeno.
- addr¶
Surová šestibajtová adresa zařízení.
- connect(timeout_ms: int = 10000, scan_duration_ms: int | None = None, min_conn_interval_us: int | None = None, max_conn_interval_us: int | None = None) Awaitable[DeviceConnection]¶
Asynchronní. Iniciuje GAP připojení k tomuto zařízení a vrátí výsledné
DeviceConnection. Zruší jakékoli probíhající skenování.- timeout_ms
Jak dlouho čekat na dokončení připojení.
- scan_duration_ms
Počáteční doba skenování před připojením (závislé na řadiči).
- min_conn_interval_us / max_conn_interval_us
Volitelné meze intervalu připojení v mikrosekundách.
- class aioble.DeviceConnection¶
Aktivní GAP připojení k
Device. Vrací jejDevice.connect()neboadvertise. Podporuje použití jako kontextový manažerasync with, který se při ukončení automaticky odpojí.Nekonstruujte přímo.
- encrypted¶
True, jakmile je spoj zašifrován (např. po párování).
- authenticated¶
True, pokud byl spoj autentizován (párování chráněné proti MITM).
- bonded¶
True, pokud párování vytvořilo bondovací klíče.
- key_size¶
Vyjednaná velikost šifrovacího klíče v bajtech, nebo
False, pokud není šifrováno.
- mtu¶
Vyjednané ATT MTU po
exchange_mtu, neboNonedokud není nastaveno.
- disconnect(timeout_ms: int = 2000) Awaitable[None]¶
Asynchronní. Odpojí a počká na IRQ odpojení.
- timeout_ms
Maximální doba čekání na odpojení.
- disconnected(timeout_ms: int | None = None, disconnect: bool = False) Awaitable[None]¶
Asynchronní. Čeká, dokud spojení neukončí jedna ze stran. Pokud je disconnect
True, nejprve aktivně odpojí.- timeout_ms
Maximální doba čekání.
Noneznamená čekat navždy.- disconnect
Pokud je
True, iniciuje odpojení.
- timeout(timeout_ms: int | None) DeviceTimeout¶
Vrátí kontextový manažer, který zruší své tělo, pokud buď uplyne timeout (vyvolá
asyncio.TimeoutError), nebo se zařízení odpojí (vyvoláDeviceDisconnectedError).- timeout_ms
Timeout v milisekundách, nebo
Nonepro žádný timeout.
- exchange_mtu(mtu: int | None = None, timeout_ms: int = 1000) Awaitable[int]¶
Asynchronní. Iniciuje výměnu ATT MTU a vrátí vyjednané MTU.
- mtu
Volitelné preferované MTU k nastavení na podkladovém BLE rozhraní před výměnou.
- timeout_ms
Timeout pro výměnu.
- service(uuid: bluetooth.UUID, timeout_ms: int = 2000) Awaitable[ClientService | None]¶
Asynchronní. Objeví jedinou vzdálenou službu odpovídající uuid, nebo
None, pokud nebyla nalezena.
- services(uuid: bluetooth.UUID | None = None, timeout_ms: int = 2000) ClientDiscover¶
Vrátí asynchronní iterátor vzdálených objektů
ClientService. Použijte sasync fora nechte smyčku doběhnout.- uuid
Volitelný filtr UUID.
Nonevrátí každou službu.- timeout_ms
Timeout na jedno objevování.
- pair(bond: bool = True, le_secure: bool = True, mitm: bool = False, io: int = 3, timeout_ms: int = 20000) Awaitable[None]¶
Asynchronní. Iniciuje párování na tomto spojení. Po dokončení aktualizuje atributy
encrypted/authenticated/bonded/key_size.- bond
Trvale uloží párovací klíče.
- le_secure
Použít LE Secure Connections.
- mitm
Vyžadovat ochranu proti man-in-the-middle.
- io
Konstanta schopnosti IO (např.
3pro žádný vstup/výstup).- timeout_ms
Timeout párování.
- l2cap_accept(psm: int, mtu: int, timeout_ms: int | None = None) Awaitable[L2CAPChannel]¶
Asynchronní. Naslouchá na zadaném PSM a vrátí
L2CAPChannel, jakmile jej protistrana otevře.- psm
Protocol/Service Multiplexer, na kterém se má naslouchat.
- mtu
Maximální velikost příjmu v bajtech.
- timeout_ms
Maximální doba čekání na připojení protistrany.
- l2cap_connect(psm: int, mtu: int, timeout_ms: int = 1000) Awaitable[L2CAPChannel]¶
Asynchronní. Otevře L2CAP kanál k protistraně na zadaném PSM.
- psm
Protocol/Service Multiplexer, ke kterému se připojit.
- mtu
Maximální velikost příjmu v bajtech.
- timeout_ms
Timeout připojení.
- class aioble.ScanResult¶
Jediné zařízení objevené během
scan. Tatáž instance je znovu poskytnuta, jakmile dorazí nová advertisingová data.Nekonstruujte přímo.
- rssi¶
Naposledy hlášené RSSI v dBm.
- adv_data¶
Surový payload advertisingu (
bytesneboNone).
- resp_data¶
Surový payload scan response (
bytesneboNone), pokud je povoleno aktivní skenování.
- connectable¶
True, pokud byl nejnovější advertisement připojitelný.
- name() str | None¶
Dekóduje úplný (nebo zkrácený) vysílaný místní název z payloadu, nebo
None, pokud není přítomen.
- services() Iterator[bluetooth.UUID]¶
Generátor poskytující každé
bluetooth.UUIDvysílané v polích seznamu služeb 16/32/128bitových.
- class aioble.Service(uuid: bluetooth.UUID)¶
Místní GATT služba. Sestavte službu z jedné nebo více instancí
Characteristica poté ji předejteregister_services.- uuid
UUID služby.
- uuid¶
UUID služby.
- characteristics¶
Seznam objektů
Characteristicnavázaných na tuto službu.
- class aioble.Characteristic(service: Service, uuid: bluetooth.UUID, read: bool = False, write: bool = False, write_no_response: bool = False, notify: bool = False, indicate: bool = False, initial: bytes | None = None, capture: bool = False)¶
Místní GATT charakteristika. Její konstrukce ji automaticky přidá k service.
- service
Vlastnící
Service.- uuid
UUID charakteristiky.
- read, write, write_no_response, notify, indicate
Booleovské hodnoty vybírající podporované GATT operace.
- initial
Volitelná počáteční hodnota (
bytes).- capture
Pokud je
True, zapsané hodnoty se řadí do fronty (až do hloubky 10), takže rychlé po sobě jdoucí zápisy nejsou ztraceny. Každé voláníwrittenpoté vrátí n-tici(connection, data).
- uuid¶
UUID charakteristiky.
- flags¶
Bitová maska příznaků GATT vlastností sestavená z konstruktoru.
- descriptors¶
Seznam objektů
Descriptornavázaných na tuto charakteristiku.
- write(data: bytes, send_update: bool = False) None¶
Aktualizuje hodnotu v místní GATT databázi.
- data
Bajty nové hodnoty.
- send_update
Pokud je
True, také notifikuje/indikuje každé přihlášené spojení.
- notify(connection: DeviceConnection, data: bytes | None = None) None¶
Odešle GATT Notify do connection.
- connection
Cílové klientské spojení.
- data
Payload k odeslání. Pokud je
None, odešle se aktuální místní hodnota.
- indicate(connection: DeviceConnection, data: bytes | None = None, timeout_ms: int = 1000) Awaitable[None]¶
Asynchronní. Odešle GATT Indicate do connection a počká na potvrzení od klienta. Při nenulovém stavu vyvolá
GattError.- connection
Cílové klientské spojení.
- data
Payload k indikaci, nebo
Nonek odeslání místní hodnoty.- timeout_ms
Maximální doba čekání na potvrzení.
- written(timeout_ms: int | None = None) Awaitable[DeviceConnection | tuple[DeviceConnection, bytes]]¶
Asynchronní. Čeká na vzdálený zápis. Vrací zapisující
DeviceConnection, nebo(connection, data), pokud byla charakteristika vytvořena scapture=True.- timeout_ms
Maximální doba čekání.
Nonečeká navždy.
- on_read(connection: DeviceConnection) int¶
Přepisovatelný hook volaný synchronně při přijetí vzdáleného čtení. Vrácením
0čtení povolíte, nenulovým ATT chybovým kódem jej odmítnete. Výchozí implementace vrací0.
- class aioble.BufferedCharacteristic(service: Service, uuid: bluetooth.UUID, max_len: int = 20, append: bool = False, **kwargs)¶
Characteristic, jejíž podkladový GATT buffer lze konfigurovat. Užitečné pro příjem hodnot větších než výchozí velikost atributu, nebo pro řazení po sobě jdoucích zápisů do fronty.- max_len
Velikost bufferu v bajtech.
- append
Pokud je
True, sekvenční zápisy se připojují do bufferu namísto přepisování.
Ostatní argumenty se předávají do
Characteristic.
- class aioble.Descriptor(characteristic: Characteristic, uuid: bluetooth.UUID, read: bool = False, write: bool = False, initial: bytes | None = None)¶
Místní GATT deskriptor. Jeho konstrukce jej automaticky přidá k characteristic. Dědí
read,writeawrittenzCharacteristic.- characteristic
Vlastnící
Characteristic.- uuid
UUID deskriptoru.
- read, write
Booleovské hodnoty vybírající podporované GATT operace.
- initial
Volitelná počáteční hodnota (
bytes).
- class aioble.ClientService¶
Vzdálená GATT služba objevená na protistraně. Vrací ji
DeviceConnection.service()nebo se iteruje zDeviceConnection.services().Nekonstruujte přímo.
- connection¶
Vlastnící
DeviceConnection.
- uuid¶
UUID vzdálené služby.
- characteristic(uuid: bluetooth.UUID, timeout_ms: int = 2000) Awaitable[ClientCharacteristic | None]¶
Asynchronní. Objeví jedinou charakteristiku podle UUID, nebo
None, pokud nebyla nalezena.
- characteristics(uuid: bluetooth.UUID | None = None, timeout_ms: int = 2000) ClientDiscover¶
Vrátí asynchronní iterátor objektů
ClientCharacteristic. Použijte sasync fora nechte smyčku doběhnout.- uuid
Volitelný filtr UUID.
- timeout_ms
Timeout na jedno objevování.
- class aioble.ClientCharacteristic¶
Vzdálená GATT charakteristika objevená na protistraně. Vrací ji
ClientService.characteristic()nebo se iteruje zClientService.characteristics().Nekonstruujte přímo.
- service¶
Vlastnící
ClientService.
- uuid¶
UUID charakteristiky.
- properties¶
Bitová maska podporovaných GATT operací tak, jak je hlásí protistrana.
- read(timeout_ms: int = 1000) Awaitable[bytes]¶
Asynchronní. Vyšle GATT Read a vrátí hodnotu. Při nenulovém stavu vyvolá
GattError.- timeout_ms
Timeout čtení.
- write(data: bytes, response: bool | None = None, timeout_ms: int = 1000) Awaitable[None]¶
Asynchronní. Vyšle GATT Write.
- data
Hodnota k zápisu.
- response
Truepro vyžadování write-response (a vyvoláníGattErrorpři selhání).Falsepro write-without-response.None(výchozí) automaticky volí podle toho, co protistrana inzeruje.- timeout_ms
Timeout zápisu (relevantní pouze, pokud je response
True).
- notified(timeout_ms: int | None = None) Awaitable[bytes]¶
Asynchronní. Čeká na další notifikaci na této charakteristice a vrátí její payload. Vrátí se okamžitě, pokud je notifikace již ve frontě.
- timeout_ms
Maximální doba čekání.
Nonečeká navždy.
- indicated(timeout_ms: int | None = None) Awaitable[bytes]¶
Asynchronní. Čeká na další indikaci na této charakteristice a vrátí její payload.
- timeout_ms
Maximální doba čekání.
- subscribe(notify: bool = True, indicate: bool = False) Awaitable[None]¶
Asynchronní. Zapíše Client Characteristic Configuration Descriptor (CCCD) pro přihlášení (nebo odhlášení) notifikací a/nebo indikací.
- notify
Povolit notifikace.
- indicate
Povolit indikace.
- descriptor(uuid: bluetooth.UUID, timeout_ms: int = 2000) Awaitable[ClientDescriptor | None]¶
Asynchronní. Objeví jediný deskriptor podle UUID, nebo
None, pokud nebyl nalezen.
- descriptors(timeout_ms: int = 2000) ClientDiscover¶
Vrátí asynchronní iterátor objektů
ClientDescriptor. Použijte sasync fora nechte smyčku doběhnout.
- class aioble.ClientDescriptor¶
Vzdálený GATT deskriptor objevený na protistraně. Dědí
readawritezClientCharacteristic.Nekonstruujte přímo.
- characteristic¶
Vlastnící
ClientCharacteristic.
- uuid¶
UUID deskriptoru.
- class aioble.L2CAPChannel¶
Aktivní L2CAP kanál orientovaný na spojení. Vrací jej
DeviceConnection.l2cap_accept()neboDeviceConnection.l2cap_connect(). Podporuje použití jako kontextový manažerasync with, který se při ukončení automaticky odpojí.Nekonstruujte přímo.
- our_mtu¶
Maximální velikost v bajtech, kterou nám protistrana může poslat v jediném SDU.
- peer_mtu¶
Maximální velikost v bajtech, kterou můžeme poslat protistraně v jediném SDU.
- available() bool¶
Synchronně vrátí
True, pokud jsou bufferovaná přijatá data připravena (tj.recvintonezablokuje).
- recvinto(buf: bytearray, timeout_ms: int | None = None) Awaitable[int]¶
Asynchronní. Přijme do buf a vrátí počet přečtených bajtů. Pokud je kanál prázdný, počká na nová data.
- buf
Předem alokovaný buffer k naplnění.
- timeout_ms
Maximální doba čekání.
Nonečeká navždy.
- send(buf: bytes, timeout_ms: int | None = None, chunk_size: int | None = None) Awaitable[None]¶
Asynchronní. Odešle buf na kanálu a fragmentuje větší payloady na kusy velikosti MTU. Podle potřeby čeká na kredity řízení toku.
- buf
Objekt typu bytes k odeslání.
- timeout_ms
Maximální doba čekání na jeden kus.
- chunk_size
Volitelné přepsání velikosti kusu pro jedno volání. Omezeno na
min(our_mtu * 2, peer_mtu).
- flush(timeout_ms: int | None = None) Awaitable[None]¶
Asynchronní. Čeká, dokud řadič nevyprázdní jakékoli zastavené
send.- timeout_ms
Maximální doba čekání.