aioble — BLE asincron

aioble este un wrapper de nivel înalt, compatibil cu asyncio, peste modulul bluetooth. Oferă coroutine clare pentru scanare, conectare, advertising, servicii GATT și canale L2CAP.

Toate operațiile la distanță (connect, disconnect, citire/scriere client, indicate server, recv/send l2cap, pair) sunt awaitable și acceptă timeout-uri.

Roluri acceptate:

  • Broadcaster (advertiser) — generează payload-uri de advertising și de scan-response pentru câmpurile uzuale, împarte automat payload-ul între advertising și scan response, face advertising la nesfârșit sau pe o durată fixă.

  • Peripheral — așteaptă o conexiune de la un central, așteaptă schimbul de MTU.

  • Observer (scanner) — scanare pasivă și activă, combină payload-urile de advertising și de scan-response pentru același dispozitiv, analizează câmpurile uzuale din payload-urile de advertising.

  • Central — se conectează la un peripheral, inițiază schimbul de MTU.

  • GATT Client — descoperă servicii / caracteristici / descriptori (opțional după UUID); citire / scriere / scriere-cu-răspuns pe caracteristici și descriptori; abonare la notificări și indicații (prin CCCD); așteaptă notificări și indicații.

  • GATT Server — înregistrează servicii / caracteristici / descriptori; așteaptă scrieri pe caracteristici și descriptori; interceptează cererile de citire; trimite notificări și indicații (și așteaptă răspunsul).

  • L2CAP — acceptă și conectează canale L2CAP orientate pe conexiune, gestionează controlul fluxului canalului.

  • Securitate — gestionare de chei/secrete bazată pe JSON, inițiere pairing, interogare a stării de criptare / autentificare.

Exemple

Scanează dispozitivele BLE din apropiere și afișează fiecare dispozitiv pe măsură ce este detectat:

import aioble
import asyncio

async def find_devices():
    async with aioble.scan(duration_ms=5000, active=True) as scanner:
        async for result in scanner:
            print(result.device.addr_hex(), result.rssi, result.name())

asyncio.run(find_devices())

Conectează-te la un peripheral care face advertising pentru serviciul Heart Rate ca central și abonează-te la notificările sale de măsurare:

import aioble
import asyncio
import bluetooth

_HR_SERVICE = bluetooth.UUID(0x180D)
_HR_MEASUREMENT = bluetooth.UUID(0x2A37)

async def connect_and_read():
    device = None
    async with aioble.scan(duration_ms=5000, active=True) as scanner:
        async for result in scanner:
            if _HR_SERVICE in result.services():
                device = result.device
                break
    if device is None:
        return

    async with await device.connect() as conn:
        service = await conn.service(_HR_SERVICE)
        char = await service.characteristic(_HR_MEASUREMENT)
        await char.subscribe(notify=True)
        while True:
            data = await char.notified()
            print("notify:", data)

asyncio.run(connect_and_read())

Acționează ca un peripheral: înregistrează un serviciu GATT, fă-i advertising și trimite notificări oricui se conectează:

import aioble
import asyncio
import bluetooth
import struct

_ENV_SERVICE = bluetooth.UUID(0x181A)
_TEMP_CHAR = bluetooth.UUID(0x2A6E)

def encode_temperature(deg_c):
    # Bluetooth Temperature (0x2A6E) is sint16 little-endian, 0.01 degC units.
    return struct.pack("<h", round(deg_c * 100))

service = aioble.Service(_ENV_SERVICE)
temp_char = aioble.Characteristic(service, _TEMP_CHAR, read=True, notify=True)
aioble.register_services(service)

async def peripheral_task():
    while True:
        connection = await aioble.advertise(
            interval_us=250000,
            name="openmv-sensor",
            services=[_ENV_SERVICE],
            appearance=0x0300,
        )
        print("connected:", connection.device.addr_hex())
        async with connection:
            while connection.is_connected():
                temp_char.write(encode_temperature(23.68), send_update=True)
                await asyncio.sleep(1)

asyncio.run(peripheral_task())

Funcții la nivel de modul

aioble.config(*args, **kwargs) Any

Redirecționează către bluetooth.BLE.config(), asigurându-se mai întâi că radioul BLE este activ.

args

Nume opțional al unui singur parametru de interogat.

kwargs

Argumente de tip keyword pentru a seta valori de configurare.

aioble.stop() None

Dezactivează radioul BLE subiacent și execută orice handler de oprire înregistrat de sub-module. După acest apel, scanerele, advertiserele, conexiunile și canalele L2CAP sunt toate închise.

aioble.scan(duration_ms: int, interval_us: int | None = None, window_us: int | None = None, active: bool = False) scan

Returnează un context-manager asincron / iterator asincron scan care produce instanțe ScanResult pentru fiecare dispozitiv unic descoperit (sau pentru fiecare nouă bucată de date de advertising de la un dispozitiv cunoscut).

duration_ms

Cât timp să se scaneze, în milisecunde. Folosește 0 pentru a scana la nesfârșit, până când context manager-ul iese.

interval_us

Intervalul de scanare în microsecunde. Valoarea implicită este 1.280.000.

window_us

Fereastra de scanare în microsecunde (trebuie să fie mai mică sau egală cu interval_us). Valoarea implicită este 11.250.

active

Dacă este True, efectuează o scanare activă (solicită date de scan response). Valoarea implicită este False.

aioble.advertise(interval_us: int, adv_data: bytes | None = None, resp_data: bytes | None = None, connectable: bool = True, limited_disc: bool = False, br_edr: bool = False, name: str | None = None, services: list | None = None, appearance: int = 0, manufacturer: tuple | None = None, timeout_ms: int | None = None) DeviceConnection

Coroutine asincronă care începe advertising-ul și așteaptă o conexiune central de intrare. Returnează un DeviceConnection care reprezintă central-ul conectat sau ridică asyncio.TimeoutError la timeout.

interval_us

Intervalul de advertising, în microsecunde.

adv_data

Payload brut de advertising. Dacă nu este setat, adv_data este construit din celelalte argumente de tip keyword.

resp_data

Payload brut de scan response. Populat automat cu surplusul din adv_data dacă este necesar.

connectable

Dacă este True, acesta este un advertisement conectabil.

limited_disc

Folosește flag-ul de descoperire limitată în loc de cel general.

br_edr

Setează flag-ul BR/EDR-supported.

name

Nume local complet, opțional, de încorporat.

services

Iterabil de bluetooth.UUID de inclus în advertising.

appearance

Valoare appearance pe 16 biți (vezi numerele alocate Bluetooth).

manufacturer

Tuplu de (company_id, data_bytes) de inclus în advertising ca date specifice producătorului.

timeout_ms

Oprește advertising-ul după atâtea milisecunde fără o conexiune. None înseamnă advertising până la conectare.

aioble.register_services(*services: Service) None

Înregistrează unul sau mai multe obiecte Service (și caracteristicile și descriptorii lor) la serverul GATT. Trebuie apelat o dată înainte de a porni advertise. Apelurile ulterioare înlocuiesc înregistrarea anterioară.

services

Una sau mai multe instanțe Service.

Constante la nivel de modul

aioble.ADDR_PUBLIC

Tipul de adresă publică pentru dispozitive BLE (0).

aioble.ADDR_RANDOM

Tipul de adresă aleatorie pentru dispozitive BLE (1).

Excepții

exception aioble.GattError

Ridicată atunci când o operație GATT la distanță (read / write / indicate) se finalizează cu un status diferit de zero. Codul de status este disponibil în atributul _status.

exception aioble.DeviceDisconnectedError

Ridicată în interiorul unei operații asincrone (de ex. read, write, notified) atunci când conexiunea subiacentă cade în timpul așteptării.

exception aioble.L2CAPDisconnectedError

Ridicată atunci când o operație de send/recv/flush pe un canal L2CAP este încercată pe (sau întreruptă de) un canal deconectat.

exception aioble.L2CAPConnectionError

Ridicată de DeviceConnection.l2cap_connect atunci când stabilirea canalului eșuează. Codul de status Bluetooth este primul argument.

Clase

class aioble.Device(addr_type: int, addr: bytes | str)

Reprezintă un dispozitiv BLE la distanță prin adresă. Două instanțe Device sunt egale dacă atât addr_type cât și addr coincid. Folosit ca handle pentru inițierea conexiunilor.

addr_type

Fie ADDR_PUBLIC, fie ADDR_RANDOM.

addr

Adresă pe șase octeți ca bytes sau un șir hexazecimal separat prin două puncte (de ex. "aa:bb:cc:dd:ee:ff").

addr_type

Tipul de adresă cu care a fost construit dispozitivul.

addr

Adresa brută pe șase octeți a dispozitivului.

addr_hex() str

Returnează adresa formatată ca șir hexazecimal separat prin două puncte.

connect(timeout_ms: int = 10000, scan_duration_ms: int | None = None, min_conn_interval_us: int | None = None, max_conn_interval_us: int | None = None) Awaitable[DeviceConnection]

Asincron. Inițiază o conexiune GAP către acest dispozitiv și returnează DeviceConnection-ul rezultat. Anulează orice scanare în curs.

timeout_ms

Cât timp să se aștepte finalizarea conexiunii.

scan_duration_ms

Durata inițială de scanare înainte de conectare (specifică controllerului).

min_conn_interval_us / max_conn_interval_us

Limite opționale pentru intervalul de conexiune, în microsecunde.

class aioble.DeviceConnection

O conexiune GAP activă către un Device. Returnată de Device.connect() sau advertise. Acceptă utilizarea ca context manager async with care se deconectează automat la ieșire.

Nu construi direct.

device

Device-ul subiacent.

encrypted

True odată ce legătura este criptată (de ex. după pairing).

authenticated

True dacă legătura a fost autentificată (pairing protejat MITM).

bonded

True dacă pairing-ul a produs chei de bonding.

key_size

Dimensiunea negociată a cheii de criptare în octeți sau False dacă nu este criptată.

mtu

MTU ATT negociat după exchange_mtu sau None până la setare.

is_connected() bool

Returnează dacă conexiunea este încă activă.

disconnect(timeout_ms: int = 2000) Awaitable[None]

Asincron. Se deconectează și așteaptă IRQ-ul de deconectare.

timeout_ms

Timpul maxim de așteptare pentru deconectare.

disconnected(timeout_ms: int | None = None, disconnect: bool = False) Awaitable[None]

Asincron. Așteaptă ca conexiunea să fie încheiată de oricare dintre părți. Dacă disconnect este True, se deconectează activ mai întâi.

timeout_ms

Timpul maxim de așteptare. None înseamnă așteptare la nesfârșit.

disconnect

Dacă este True, inițiază deconectarea.

timeout(timeout_ms: int | None) DeviceTimeout

Returnează un context manager care anulează corpul său dacă fie timeout-ul expiră (ridicând asyncio.TimeoutError), fie dispozitivul se deconectează (ridicând DeviceDisconnectedError).

timeout_ms

Timeout în milisecunde sau None pentru niciun timeout.

exchange_mtu(mtu: int | None = None, timeout_ms: int = 1000) Awaitable[int]

Asincron. Inițiază un schimb de MTU ATT și returnează MTU-ul negociat.

mtu

MTU preferat opțional de setat pe interfața BLE subiacentă înainte de schimb.

timeout_ms

Timeout pentru schimb.

service(uuid: bluetooth.UUID, timeout_ms: int = 2000) Awaitable[ClientService | None]

Asincron. Descoperă un singur serviciu la distanță care corespunde lui uuid sau None dacă nu este găsit.

services(uuid: bluetooth.UUID | None = None, timeout_ms: int = 2000) ClientDiscover

Returnează un iterator asincron de obiecte ClientService la distanță. Folosește-l cu async for și execută bucla până la finalizare.

uuid

Filtru UUID opțional. None returnează fiecare serviciu.

timeout_ms

Timeout per descoperire.

pair(bond: bool = True, le_secure: bool = True, mitm: bool = False, io: int = 3, timeout_ms: int = 20000) Awaitable[None]

Asincron. Inițiază pairing-ul pe această conexiune. Actualizează atributele encrypted / authenticated / bonded / key_size la finalizare.

bond

Persistă cheile de pairing.

le_secure

Folosește LE Secure Connections.

mitm

Necesită protecție man-in-the-middle.

io

Constantă pentru capacitatea IO (de ex. 3 pentru fără input/output).

timeout_ms

Timeout pentru pairing.

l2cap_accept(psm: int, mtu: int, timeout_ms: int | None = None) Awaitable[L2CAPChannel]

Asincron. Ascultă pe PSM-ul dat și returnează un L2CAPChannel odată ce dispozitivul la distanță îl deschide.

psm

Protocol/Service Multiplexer pe care să se asculte.

mtu

Dimensiunea maximă de recepție, în octeți.

timeout_ms

Timpul maxim de așteptare pentru conectarea dispozitivului la distanță.

l2cap_connect(psm: int, mtu: int, timeout_ms: int = 1000) Awaitable[L2CAPChannel]

Asincron. Deschide un canal L2CAP către dispozitivul la distanță pe PSM-ul dat.

psm

Protocol/Service Multiplexer la care să se conecteze.

mtu

Dimensiunea maximă de recepție, în octeți.

timeout_ms

Timeout pentru conexiune.

class aioble.ScanResult

Un singur dispozitiv descoperit în timpul scan. Aceeași instanță este re-produsă pe măsură ce sosesc noi date de advertising.

Nu construi direct.

device

Device-ul subiacent.

rssi

Ultimul RSSI raportat, în dBm.

adv_data

Payload brut de advertising (bytes sau None).

resp_data

Payload brut de scan response (bytes sau None), dacă scanarea activă este activată.

connectable

True dacă cel mai recent advertisement a fost conectabil.

name() str | None

Decodează numele local complet (sau prescurtat) din payload sau None dacă nu este prezent.

services() Iterator[bluetooth.UUID]

Generator care produce fiecare bluetooth.UUID din câmpurile cu lista de servicii pe 16/32/128 de biți din advertising.

manufacturer(filter: int | None = None) Iterator[tuple[int, bytes]]

Generator care produce tupluri (company_id, data) din câmpurile de advertising specifice producătorului.

filter

Dacă este dat, produce doar intrările al căror company ID corespunde.

class aioble.Service(uuid: bluetooth.UUID)

Un serviciu GATT local. Construiește un serviciu cu una sau mai multe instanțe Characteristic, apoi pasează-l către register_services.

uuid

UUID-ul serviciului.

uuid

UUID-ul serviciului.

characteristics

Lista de obiecte Characteristic legate de acest serviciu.

class aioble.Characteristic(service: Service, uuid: bluetooth.UUID, read: bool = False, write: bool = False, write_no_response: bool = False, notify: bool = False, indicate: bool = False, initial: bytes | None = None, capture: bool = False)

O caracteristică GATT locală. Construirea uneia o adaugă automat la service.

service

Service-ul proprietar.

uuid

UUID-ul caracteristicii.

read, write, write_no_response, notify, indicate

Valori booleene care selectează operațiile GATT acceptate.

initial

Valoare inițială opțională (bytes).

capture

Dacă este True, valorile scrise sunt puse în coadă (până la 10 în adâncime) astfel încât scrierile rapide consecutive să nu se piardă. Fiecare apel written returnează apoi un tuplu (connection, data).

uuid

UUID-ul caracteristicii.

flags

Mască de biți cu flag-urile de proprietate GATT construite din constructor.

descriptors

Lista de obiecte Descriptor legate de această caracteristică.

read() bytes

Citește valoarea curentă din baza de date GATT locală.

write(data: bytes, send_update: bool = False) None

Actualizează valoarea în baza de date GATT locală.

data

Octeții noii valori.

send_update

Dacă este True, notifică/indică de asemenea fiecare conexiune abonată.

notify(connection: DeviceConnection, data: bytes | None = None) None

Trimite un GATT Notify către connection.

connection

Conexiunea client țintă.

data

Payload de trimis. Dacă este None, se trimite valoarea locală curentă.

indicate(connection: DeviceConnection, data: bytes | None = None, timeout_ms: int = 1000) Awaitable[None]

Asincron. Trimite un GATT Indicate către connection și așteaptă confirmarea clientului. Ridică GattError la un status diferit de zero.

connection

Conexiunea client țintă.

data

Payload de indicat sau None pentru a trimite valoarea locală.

timeout_ms

Timpul maxim de așteptare pentru confirmare.

written(timeout_ms: int | None = None) Awaitable[DeviceConnection | tuple[DeviceConnection, bytes]]

Asincron. Așteaptă o scriere la distanță. Returnează DeviceConnection-ul care scrie sau (connection, data) dacă caracteristica a fost creată cu capture=True.

timeout_ms

Timpul maxim de așteptare. None așteaptă la nesfârșit.

on_read(connection: DeviceConnection) int

Hook de suprascriere invocat sincron atunci când se primește o citire la distanță. Returnează 0 pentru a permite citirea sau un cod de eroare ATT diferit de zero pentru a o respinge. Implementarea implicită returnează 0.

class aioble.BufferedCharacteristic(service: Service, uuid: bluetooth.UUID, max_len: int = 20, append: bool = False, **kwargs)

O Characteristic al cărei tampon (buffer) GATT de suport poate fi configurat. Utilă pentru primirea de valori mai mari decât dimensiunea implicită a atributului sau pentru punerea în coadă a scrierilor consecutive.

max_len

Dimensiunea tamponului (buffer), în octeți.

append

Dacă este True, scrierile secvențiale se adaugă în tampon (buffer) în loc să-l suprascrie.

Celelalte argumente sunt redirecționate către Characteristic.

class aioble.Descriptor(characteristic: Characteristic, uuid: bluetooth.UUID, read: bool = False, write: bool = False, initial: bytes | None = None)

Un descriptor GATT local. Construirea unuia îl adaugă automat la characteristic. Moștenește read, write și written de la Characteristic.

characteristic

Characteristic-ul proprietar.

uuid

UUID-ul descriptorului.

read, write

Valori booleene care selectează operațiile GATT acceptate.

initial

Valoare inițială opțională (bytes).

class aioble.ClientService

Un serviciu GATT la distanță descoperit pe un peer. Returnat de DeviceConnection.service() sau iterat din DeviceConnection.services().

Nu construi direct.

connection

DeviceConnection-ul proprietar.

uuid

UUID-ul serviciului la distanță.

characteristic(uuid: bluetooth.UUID, timeout_ms: int = 2000) Awaitable[ClientCharacteristic | None]

Asincron. Descoperă o singură caracteristică după UUID sau None dacă nu este găsită.

characteristics(uuid: bluetooth.UUID | None = None, timeout_ms: int = 2000) ClientDiscover

Returnează un iterator asincron de obiecte ClientCharacteristic. Folosește-l cu async for și execută bucla până la finalizare.

uuid

Filtru UUID opțional.

timeout_ms

Timeout per descoperire.

class aioble.ClientCharacteristic

O caracteristică GATT la distanță descoperită pe un peer. Returnată de ClientService.characteristic() sau iterată din ClientService.characteristics().

Nu construi direct.

service

ClientService-ul proprietar.

uuid

UUID-ul caracteristicii.

properties

Mască de biți cu operațiile GATT acceptate, așa cum sunt raportate de peer.

read(timeout_ms: int = 1000) Awaitable[bytes]

Asincron. Emite un GATT Read și returnează valoarea. Ridică GattError la un status diferit de zero.

timeout_ms

Timeout pentru citire.

write(data: bytes, response: bool | None = None, timeout_ms: int = 1000) Awaitable[None]

Asincron. Emite un GATT Write.

data

Valoarea de scris.

response

True pentru a impune un write-response (și a ridica GattError la eșec). False pentru write-without-response. None (implicit) selectează automat în funcție de ceea ce anunță peer-ul.

timeout_ms

Timeout pentru scriere (relevant doar dacă response este True).

notified(timeout_ms: int | None = None) Awaitable[bytes]

Asincron. Așteaptă următoarea notificare pe această caracteristică și returnează payload-ul ei. Returnează imediat dacă o notificare este deja în coadă.

timeout_ms

Timpul maxim de așteptare. None așteaptă la nesfârșit.

indicated(timeout_ms: int | None = None) Awaitable[bytes]

Asincron. Așteaptă următoarea indicație pe această caracteristică și returnează payload-ul ei.

timeout_ms

Timpul maxim de așteptare.

subscribe(notify: bool = True, indicate: bool = False) Awaitable[None]

Asincron. Scrie Client Characteristic Configuration Descriptor (CCCD) pentru a te abona (sau dezabona) la notificări și/sau indicații.

notify

Activează notificările.

indicate

Activează indicațiile.

descriptor(uuid: bluetooth.UUID, timeout_ms: int = 2000) Awaitable[ClientDescriptor | None]

Asincron. Descoperă un singur descriptor după UUID sau None dacă nu este găsit.

descriptors(timeout_ms: int = 2000) ClientDiscover

Returnează un iterator asincron de obiecte ClientDescriptor. Folosește-l cu async for și execută bucla până la finalizare.

class aioble.ClientDescriptor

Un descriptor GATT la distanță descoperit pe un peer. Moștenește read și write de la ClientCharacteristic.

Nu construi direct.

characteristic

ClientCharacteristic-ul proprietar.

uuid

UUID-ul descriptorului.

class aioble.L2CAPChannel

Un canal L2CAP orientat pe conexiune, activ. Returnat de DeviceConnection.l2cap_accept() sau DeviceConnection.l2cap_connect(). Acceptă utilizarea ca context manager async with care se deconectează automat la ieșire.

Nu construi direct.

our_mtu

Dimensiunea maximă, în octeți, pe care peer-ul ne-o poate trimite într-un singur SDU.

peer_mtu

Dimensiunea maximă, în octeți, pe care i-o putem trimite peer-ului într-un singur SDU.

available() bool

Returnează sincron True dacă date de recepție sunt pregătite în tampon (adică recvinto nu va bloca).

recvinto(buf: bytearray, timeout_ms: int | None = None) Awaitable[int]

Asincron. Recepționează în buf, returnând numărul de octeți citiți. Așteaptă date noi dacă canalul este gol.

buf

Tampon (buffer) pre-alocat de umplut.

timeout_ms

Timpul maxim de așteptare. None așteaptă la nesfârșit.

send(buf: bytes, timeout_ms: int | None = None, chunk_size: int | None = None) Awaitable[None]

Asincron. Trimite buf pe canal, fragmentând payload-urile mai mari în bucăți de dimensiunea MTU. Așteaptă credite de control al fluxului după cum este necesar.

buf

Obiect de tip bytes de trimis.

timeout_ms

Timpul maxim de așteptare per bucată.

chunk_size

Suprascriere opțională pentru dimensiunea bucății per apel. Limitată la min(our_mtu * 2, peer_mtu).

flush(timeout_ms: int | None = None) Awaitable[None]

Asincron. Așteaptă până când orice send blocat a fost golit de către controller.

timeout_ms

Timpul maxim de așteptare.

disconnect(timeout_ms: int = 1000) Awaitable[None]

Asincron. Deconectează canalul și așteaptă IRQ-ul de deconectare.

timeout_ms

Timpul maxim de așteptare.

disconnected(timeout_ms: int = 1000) Awaitable[None]

Asincron. Așteaptă până când canalul este deconectat de oricare dintre părți.

timeout_ms

Timpul maxim de așteptare.