aioble — Asinkroni BLE

aioble je visokorazinski omot prilagođen asyncio-u oko modula bluetooth. Pruža čiste korutine za skeniranje, povezivanje, oglašavanje, GATT usluge i L2CAP kanale.

Sve udaljene operacije (connect, disconnect, čitanje/pisanje na klijentu, indiciranje na poslužitelju, l2cap recv/send, uparivanje) mogu se čekati (awaitable) i podržavaju vremenska ograničenja.

Podržane uloge:

  • Broadcaster (oglašivač) — generira korisne podatke za oglašavanje i za odgovor na skeniranje za uobičajena polja, automatski dijeli korisne podatke između oglašavanja i odgovora na skeniranje, oglašava neograničeno ili tijekom utvrđenog trajanja.

  • Peripheral — čeka vezu od centralnog uređaja, čeka razmjenu MTU-a.

  • Observer (skener) — pasivno i aktivno skeniranje, kombinira korisne podatke oglašavanja i odgovora na skeniranje za isti uređaj, raščlanjuje uobičajena polja iz korisnih podataka oglašavanja.

  • Central — povezuje se na periferni uređaj, pokreće razmjenu MTU-a.

  • GATT klijent — otkriva usluge / karakteristike / deskriptore (opcionalno prema UUID-u); čita / piše / piše-s-odgovorom na karakteristike i deskriptore; pretplaćuje se na obavijesti i indikacije (preko CCCD-a); čeka obavijesti i indikacije.

  • GATT poslužitelj — registrira usluge / karakteristike / deskriptore; čeka pisanja na karakteristike i deskriptore; presreće zahtjeve za čitanje; šalje obavijesti i indikacije (i čeka odgovor).

  • L2CAP — prihvaća i povezuje L2CAP kanale orijentirane na vezu, upravlja kontrolom toka kanala.

  • Sigurnost — upravljanje ključevima/tajnama uz podršku JSON-a, pokretanje uparivanja, ispitivanje stanja enkripcije / autentifikacije.

Primjeri

Skenirajte obližnje BLE uređaje i ispišite svaki čim se uoči:

import aioble
import asyncio

async def find_devices():
    async with aioble.scan(duration_ms=5000, active=True) as scanner:
        async for result in scanner:
            print(result.device.addr_hex(), result.rssi, result.name())

asyncio.run(find_devices())

Povežite se na periferni uređaj koji oglašava uslugu Heart Rate kao central i pretplatite se na njegove obavijesti o mjerenjima:

import aioble
import asyncio
import bluetooth

_HR_SERVICE = bluetooth.UUID(0x180D)
_HR_MEASUREMENT = bluetooth.UUID(0x2A37)

async def connect_and_read():
    device = None
    async with aioble.scan(duration_ms=5000, active=True) as scanner:
        async for result in scanner:
            if _HR_SERVICE in result.services():
                device = result.device
                break
    if device is None:
        return

    async with await device.connect() as conn:
        service = await conn.service(_HR_SERVICE)
        char = await service.characteristic(_HR_MEASUREMENT)
        await char.subscribe(notify=True)
        while True:
            data = await char.notified()
            print("notify:", data)

asyncio.run(connect_and_read())

Ponašajte se kao peripheral: registrirajte GATT uslugu, oglašavajte je i šaljite obavijesti svakome tko se poveže:

import aioble
import asyncio
import bluetooth
import struct

_ENV_SERVICE = bluetooth.UUID(0x181A)
_TEMP_CHAR = bluetooth.UUID(0x2A6E)

def encode_temperature(deg_c):
    # Bluetooth Temperature (0x2A6E) is sint16 little-endian, 0.01 degC units.
    return struct.pack("<h", round(deg_c * 100))

service = aioble.Service(_ENV_SERVICE)
temp_char = aioble.Characteristic(service, _TEMP_CHAR, read=True, notify=True)
aioble.register_services(service)

async def peripheral_task():
    while True:
        connection = await aioble.advertise(
            interval_us=250000,
            name="openmv-sensor",
            services=[_ENV_SERVICE],
            appearance=0x0300,
        )
        print("connected:", connection.device.addr_hex())
        async with connection:
            while connection.is_connected():
                temp_char.write(encode_temperature(23.68), send_update=True)
                await asyncio.sleep(1)

asyncio.run(peripheral_task())

Funkcije na razini modula

aioble.config(*args, **kwargs) Any

Prosljeđuje na bluetooth.BLE.config(), osiguravajući da je BLE radio prvo aktivan.

args

Opcionalni naziv jednog parametra za ispitivanje.

kwargs

Imenovani argumenti za postavljanje konfiguracijskih vrijednosti.

aioble.stop() None

Deaktivirajte osnovni BLE radio i pokrenite sve registrirane rukovatelje za gašenje pod-modula. Nakon ovog poziva, skeneri, oglašivači, veze i L2CAP kanali svi se ruše.

aioble.scan(duration_ms: int, interval_us: int | None = None, window_us: int | None = None, active: bool = False) scan

Vraća scan asinkroni upravitelj konteksta / asinkroni iterator koji vraća instance ScanResult za svaki jedinstveni otkriveni uređaj (ili za svaki novi dio podataka oglašavanja od poznatog uređaja).

duration_ms

Koliko dugo skenirati, u milisekundama. Proslijedite 0 za neograničeno skeniranje dok upravitelj konteksta ne izađe.

interval_us

Interval skeniranja u mikrosekundama. Zadano je 1.280.000.

window_us

Prozor skeniranja u mikrosekundama (mora biti manji ili jednak interval_us). Zadano je 11.250.

active

Ako je True, izvodi aktivno skeniranje (zahtijeva podatke odgovora na skeniranje). Zadano je False.

aioble.advertise(interval_us: int, adv_data: bytes | None = None, resp_data: bytes | None = None, connectable: bool = True, limited_disc: bool = False, br_edr: bool = False, name: str | None = None, services: list | None = None, appearance: int = 0, manufacturer: tuple | None = None, timeout_ms: int | None = None) DeviceConnection

Asinkrona korutina koja započinje oglašavanje i čeka dolaznu vezu centralnog uređaja. Vraća DeviceConnection koji predstavlja povezani centralni uređaj, ili podiže asyncio.TimeoutError pri isteku vremena.

interval_us

Interval oglašavanja, u mikrosekundama.

adv_data

Sirovi korisni podaci oglašavanja. Ako nije postavljeno, adv_data se gradi iz preostalih imenovanih argumenata.

resp_data

Sirovi korisni podaci odgovora na skeniranje. Automatski se popunjava preljevom iz adv_data ako je potrebno.

connectable

Ako je True, ovo je oglas na koji se može povezati.

limited_disc

Koristi zastavicu ograničene vidljivosti umjesto opće.

br_edr

Postavlja zastavicu podrške za BR/EDR.

name

Opcionalni potpuni lokalni naziv za ugradnju.

services

Iterabilni objekt od bluetooth.UUID za oglašavanje.

appearance

16-bitna vrijednost izgleda (vidi Bluetooth dodijeljene brojeve).

manufacturer

Torka (company_id, data_bytes) za oglašavanje kao podaci specifični za proizvođača.

timeout_ms

Prekida oglašavanje nakon ovoliko milisekundi bez veze. None znači oglašavati dok se ne uspostavi veza.

aioble.register_services(*services: Service) None

Registrira jedan ili više objekata Service (i njihove karakteristike i deskriptore) s GATT poslužiteljem. Mora se pozvati jednom prije pokretanja advertise. Naknadni pozivi zamjenjuju prethodnu registraciju.

services

Jedna ili više instanci Service.

Konstante na razini modula

aioble.ADDR_PUBLIC

Tip javne BLE adrese uređaja (0).

aioble.ADDR_RANDOM

Tip nasumične BLE adrese uređaja (1).

Iznimke

exception aioble.GattError

Podiže se kada udaljena GATT operacija (čitanje / pisanje / indiciranje) završi sa statusom različitim od nule. Statusni kod dostupan je na atributu _status.

exception aioble.DeviceDisconnectedError

Podiže se unutar asinkrone operacije (npr. read, write, notified) kada se osnovna veza prekine tijekom čekanja.

exception aioble.L2CAPDisconnectedError

Podiže se kada se operacija slanja/primanja/pražnjenja L2CAP kanala pokuša na (ili je prekinuta od strane) prekinutom kanalu.

exception aioble.L2CAPConnectionError

Podiže se od strane DeviceConnection.l2cap_connect kada uspostavljanje kanala ne uspije. Bluetooth statusni kod je prvi argument.

Klase

class aioble.Device(addr_type: int, addr: bytes | str)

Predstavlja udaljeni BLE uređaj prema adresi. Dvije instance Device smatraju se jednakima ako se podudaraju i addr_type i addr. Koristi se kao ručka za pokretanje veza.

addr_type

Ili ADDR_PUBLIC ili ADDR_RANDOM.

addr

Šestobajtna adresa kao bytes, ili heksadecimalni niz odvojen dvotočkama (npr. "aa:bb:cc:dd:ee:ff").

addr_type

Tip adrese s kojom je uređaj konstruiran.

addr

Sirova šestobajtna adresa uređaja.

addr_hex() str

Vraća adresu formatiranu kao heksadecimalni niz odvojen dvotočkama.

connect(timeout_ms: int = 10000, scan_duration_ms: int | None = None, min_conn_interval_us: int | None = None, max_conn_interval_us: int | None = None) Awaitable[DeviceConnection]

Asinkrono. Pokreće GAP vezu na ovaj uređaj i vraća rezultirajući DeviceConnection. Otkazuje svako skeniranje koje je u tijeku.

timeout_ms

Koliko dugo čekati da se veza dovrši.

scan_duration_ms

Početno trajanje skeniranja prije povezivanja (ovisno o kontroleru).

min_conn_interval_us / max_conn_interval_us

Opcionalne granice intervala veze, u mikrosekundama.

class aioble.DeviceConnection

Aktivna GAP veza s Device. Vraćena od Device.connect() ili advertise. Podržava upotrebu kao async with upravitelj konteksta koji se automatski isključuje pri izlasku.

Nemojte konstruirati izravno.

device

Osnovni Device.

encrypted

True čim je veza enkriptirana (npr. nakon uparivanja).

authenticated

True ako je veza autentificirana (uparivanje zaštićeno od MITM-a).

bonded

True ako je uparivanje proizvelo ključeve za vezivanje.

key_size

Dogovorena veličina enkripcijskog ključa u bajtovima, ili False ako nije enkriptirano.

mtu

Dogovoreni ATT MTU nakon exchange_mtu, ili None dok se ne postavi.

is_connected() bool

Vraća je li veza još uvijek aktivna.

disconnect(timeout_ms: int = 2000) Awaitable[None]

Asinkrono. Isključuje vezu i čeka prekid (IRQ) za isključivanje.

timeout_ms

Maksimalno vrijeme čekanja na isključivanje.

disconnected(timeout_ms: int | None = None, disconnect: bool = False) Awaitable[None]

Asinkrono. Čeka da vezu prekine bilo koja strana. Ako je disconnect True, najprije aktivno prekida vezu.

timeout_ms

Maksimalno vrijeme čekanja. None znači čekati zauvijek.

disconnect

Ako je True, pokreće prekid veze.

timeout(timeout_ms: int | None) DeviceTimeout

Vraća upravitelj konteksta koji otkazuje svoje tijelo ako protekne vremensko ograničenje (podižući asyncio.TimeoutError) ili ako se uređaj isključi (podižući DeviceDisconnectedError).

timeout_ms

Vremensko ograničenje u milisekundama, ili None za bez ograničenja.

exchange_mtu(mtu: int | None = None, timeout_ms: int = 1000) Awaitable[int]

Asinkrono. Pokreće ATT MTU razmjenu i vraća dogovoreni MTU.

mtu

Opcionalni preferirani MTU za postavljanje na osnovnom BLE sučelju prije razmjene.

timeout_ms

Vremensko ograničenje za razmjenu.

service(uuid: bluetooth.UUID, timeout_ms: int = 2000) Awaitable[ClientService | None]

Asinkrono. Otkriva jednu udaljenu uslugu koja se podudara s uuid, ili None ako nije pronađena.

services(uuid: bluetooth.UUID | None = None, timeout_ms: int = 2000) ClientDiscover

Vraća asinkroni iterator udaljenih objekata ClientService. Koristite s async for i izvedite petlju do kraja.

uuid

Opcionalni UUID filtar. None vraća svaku uslugu.

timeout_ms

Vremensko ograničenje po otkrivanju.

pair(bond: bool = True, le_secure: bool = True, mitm: bool = False, io: int = 3, timeout_ms: int = 20000) Awaitable[None]

Asinkrono. Pokreće uparivanje na ovoj vezi. Ažurira atribute encrypted / authenticated / bonded / key_size po dovršetku.

bond

Trajno pohranjuje ključeve uparivanja.

le_secure

Koristi LE Secure Connections.

mitm

Zahtijeva zaštitu od posrednika (man-in-the-middle).

io

Konstanta IO sposobnosti (npr. 3 za bez ulaza/izlaza).

timeout_ms

Vremensko ograničenje uparivanja.

l2cap_accept(psm: int, mtu: int, timeout_ms: int | None = None) Awaitable[L2CAPChannel]

Asinkrono. Sluša na zadanom PSM-u i vraća L2CAPChannel čim ga udaljeni uređaj otvori.

psm

Protocol/Service Multiplexer na kojem se sluša.

mtu

Maksimalna veličina primanja, u bajtovima.

timeout_ms

Maksimalno vrijeme čekanja da se udaljeni uređaj poveže.

l2cap_connect(psm: int, mtu: int, timeout_ms: int = 1000) Awaitable[L2CAPChannel]

Asinkrono. Otvara L2CAP kanal prema udaljenom uređaju na zadanom PSM-u.

psm

Protocol/Service Multiplexer na koji se povezuje.

mtu

Maksimalna veličina primanja, u bajtovima.

timeout_ms

Vremensko ograničenje veze.

class aioble.ScanResult

Jedan uređaj otkriven tijekom scan. Ista instanca se ponovno vraća kako pristižu novi podaci oglašavanja.

Nemojte konstruirati izravno.

device

Osnovni Device.

rssi

Posljednji prijavljeni RSSI, u dBm.

adv_data

Sirovi korisni podaci oglašavanja (bytes ili None).

resp_data

Sirovi korisni podaci odgovora na skeniranje (bytes ili None), ako je aktivno skeniranje omogućeno.

connectable

True ako je najnoviji oglas bio takav da se na njega može povezati.

name() str | None

Dekodira potpuni (ili skraćeni) oglašeni lokalni naziv iz korisnih podataka, ili None ako nije prisutan.

services() Iterator[bluetooth.UUID]

Generator koji vraća svaki bluetooth.UUID oglašen u poljima popisa usluga od 16/32/128 bita.

manufacturer(filter: int | None = None) Iterator[tuple[int, bytes]]

Generator koji vraća torke (company_id, data) iz polja oglašavanja specifičnih za proizvođača.

filter

Ako je zadano, vraća samo unose čiji se ID tvrtke podudara.

class aioble.Service(uuid: bluetooth.UUID)

Lokalna GATT usluga. Izgradite uslugu s jednom ili više instanci Characteristic, a zatim je proslijedite u register_services.

uuid

UUID usluge.

uuid

UUID usluge.

characteristics

Popis objekata Characteristic vezanih za ovu uslugu.

class aioble.Characteristic(service: Service, uuid: bluetooth.UUID, read: bool = False, write: bool = False, write_no_response: bool = False, notify: bool = False, indicate: bool = False, initial: bytes | None = None, capture: bool = False)

Lokalna GATT karakteristika. Konstruiranje jedne automatski je dodaje u service.

service

Vlasnička Service.

uuid

UUID karakteristike.

read, write, write_no_response, notify, indicate

Booleovi koji odabiru podržane GATT operacije.

initial

Opcionalna početna vrijednost (bytes).

capture

Ako je True, zapisane vrijednosti se stavljaju u red (do 10 u dubinu) tako da se brza uzastopna pisanja ne gube. Svaki poziv written tada vraća torku (connection, data).

uuid

UUID karakteristike.

flags

Bitovna maska zastavica GATT svojstava izgrađena iz konstruktora.

descriptors

Popis objekata Descriptor vezanih za ovu karakteristiku.

read() bytes

Čita trenutnu vrijednost iz lokalne GATT baze podataka.

write(data: bytes, send_update: bool = False) None

Ažurira vrijednost u lokalnoj GATT bazi podataka.

data

Bajtovi nove vrijednosti.

send_update

Ako je True, također obavještava/indicira svaku pretplaćenu vezu.

notify(connection: DeviceConnection, data: bytes | None = None) None

Šalje GATT Notify prema connection.

connection

Ciljana klijentska veza.

data

Korisni podaci za slanje. Ako je None, šalje se trenutna lokalna vrijednost.

indicate(connection: DeviceConnection, data: bytes | None = None, timeout_ms: int = 1000) Awaitable[None]

Asinkrono. Šalje GATT Indicate prema connection i čeka potvrdu klijenta. Podiže GattError pri statusu različitom od nule.

connection

Ciljana klijentska veza.

data

Korisni podaci za indiciranje, ili None za slanje lokalne vrijednosti.

timeout_ms

Maksimalno vrijeme čekanja na potvrdu.

written(timeout_ms: int | None = None) Awaitable[DeviceConnection | tuple[DeviceConnection, bytes]]

Asinkrono. Čeka udaljeno pisanje. Vraća DeviceConnection koji piše, ili (connection, data) ako je karakteristika kreirana s capture=True.

timeout_ms

Maksimalno vrijeme čekanja. None čeka zauvijek.

on_read(connection: DeviceConnection) int

Nadjačiva kuka pozvana sinkrono kada je primljeno udaljeno čitanje. Vratite 0 za dopuštanje čitanja ili ATT kod pogreške različit od nule za njegovo odbijanje. Zadana implementacija vraća 0.

class aioble.BufferedCharacteristic(service: Service, uuid: bluetooth.UUID, max_len: int = 20, append: bool = False, **kwargs)

Characteristic čiji se pozadinski GATT međuspremnik može konfigurirati. Korisno za primanje vrijednosti većih od zadane veličine atributa, ili za stavljanje uzastopnih pisanja u red.

max_len

Veličina međuspremnika, u bajtovima.

append

Ako je True, uzastopna pisanja se dodaju u međuspremnik umjesto da ga prepisuju.

Ostali argumenti se prosljeđuju u Characteristic.

class aioble.Descriptor(characteristic: Characteristic, uuid: bluetooth.UUID, read: bool = False, write: bool = False, initial: bytes | None = None)

Lokalni GATT deskriptor. Konstruiranje jednog automatski ga dodaje u characteristic. Nasljeđuje read, write i written iz Characteristic.

characteristic

Vlasnička Characteristic.

uuid

UUID deskriptora.

read, write

Booleovi koji odabiru podržane GATT operacije.

initial

Opcionalna početna vrijednost (bytes).

class aioble.ClientService

Udaljena GATT usluga otkrivena na ravnopravnom uređaju. Vraćena od DeviceConnection.service() ili iterirana iz DeviceConnection.services().

Nemojte konstruirati izravno.

connection

Vlasnička DeviceConnection.

uuid

UUID udaljene usluge.

characteristic(uuid: bluetooth.UUID, timeout_ms: int = 2000) Awaitable[ClientCharacteristic | None]

Asinkrono. Otkriva jednu karakteristiku prema UUID-u, ili None ako nije pronađena.

characteristics(uuid: bluetooth.UUID | None = None, timeout_ms: int = 2000) ClientDiscover

Vraća asinkroni iterator objekata ClientCharacteristic. Koristite s async for i izvedite petlju do kraja.

uuid

Opcionalni UUID filtar.

timeout_ms

Vremensko ograničenje po otkrivanju.

class aioble.ClientCharacteristic

Udaljena GATT karakteristika otkrivena na ravnopravnom uređaju. Vraćena od ClientService.characteristic() ili iterirana iz ClientService.characteristics().

Nemojte konstruirati izravno.

service

Vlasnička ClientService.

uuid

UUID karakteristike.

properties

Bitovna maska podržanih GATT operacija kako ih prijavljuje ravnopravni uređaj.

read(timeout_ms: int = 1000) Awaitable[bytes]

Asinkrono. Izdaje GATT Read i vraća vrijednost. Podiže GattError pri statusu različitom od nule.

timeout_ms

Vremensko ograničenje čitanja.

write(data: bytes, response: bool | None = None, timeout_ms: int = 1000) Awaitable[None]

Asinkrono. Izdaje GATT Write.

data

Vrijednost za pisanje.

response

True za zahtijevanje odgovora na pisanje (i podizanje GattError pri neuspjehu). False za pisanje-bez-odgovora. None (zadano) automatski bira na temelju onoga što ravnopravni uređaj oglašava.

timeout_ms

Vremensko ograničenje pisanja (relevantno samo ako je response True).

notified(timeout_ms: int | None = None) Awaitable[bytes]

Asinkrono. Čeka sljedeću obavijest na ovoj karakteristici i vraća njezine korisne podatke. Vraća odmah ako je obavijest već u redu čekanja.

timeout_ms

Maksimalno vrijeme čekanja. None čeka zauvijek.

indicated(timeout_ms: int | None = None) Awaitable[bytes]

Asinkrono. Čeka sljedeću indikaciju na ovoj karakteristici i vraća njezine korisne podatke.

timeout_ms

Maksimalno vrijeme čekanja.

subscribe(notify: bool = True, indicate: bool = False) Awaitable[None]

Asinkrono. Piše Client Characteristic Configuration Descriptor (CCCD) za pretplatu (ili odjavu) na obavijesti i/ili indikacije.

notify

Omogućuje obavijesti.

indicate

Omogućuje indikacije.

descriptor(uuid: bluetooth.UUID, timeout_ms: int = 2000) Awaitable[ClientDescriptor | None]

Asinkrono. Otkriva jedan deskriptor prema UUID-u, ili None ako nije pronađen.

descriptors(timeout_ms: int = 2000) ClientDiscover

Vraća asinkroni iterator objekata ClientDescriptor. Koristite s async for i izvedite petlju do kraja.

class aioble.ClientDescriptor

Udaljeni GATT deskriptor otkriven na ravnopravnom uređaju. Nasljeđuje read i write iz ClientCharacteristic.

Nemojte konstruirati izravno.

characteristic

Vlasnička ClientCharacteristic.

uuid

UUID deskriptora.

class aioble.L2CAPChannel

Aktivan L2CAP kanal orijentiran na vezu. Vraćen od DeviceConnection.l2cap_accept() ili DeviceConnection.l2cap_connect(). Podržava upotrebu kao async with upravitelj konteksta koji se automatski isključuje pri izlasku.

Nemojte konstruirati izravno.

our_mtu

Maksimalna veličina, u bajtovima, koju nam ravnopravni uređaj može poslati u jednom SDU-u.

peer_mtu

Maksimalna veličina, u bajtovima, koju mi možemo poslati ravnopravnom uređaju u jednom SDU-u.

available() bool

Sinkrono vraća True ako su međuspremljeni primljeni podaci spremni (tj. recvinto neće blokirati).

recvinto(buf: bytearray, timeout_ms: int | None = None) Awaitable[int]

Asinkrono. Prima u buf, vraćajući broj pročitanih bajtova. Čeka nove podatke ako je kanal prazan.

buf

Unaprijed alocirani međuspremnik za popunjavanje.

timeout_ms

Maksimalno vrijeme čekanja. None čeka zauvijek.

send(buf: bytes, timeout_ms: int | None = None, chunk_size: int | None = None) Awaitable[None]

Asinkrono. Šalje buf na kanalu, fragmentirajući veće korisne podatke u dijelove veličine MTU-a. Čeka kredite za kontrolu toka po potrebi.

buf

Objekt sličan bajtovima za slanje.

timeout_ms

Maksimalno vrijeme čekanja po dijelu.

chunk_size

Opcionalno nadjačavanje veličine dijela po pozivu. Ograničeno na min(our_mtu * 2, peer_mtu).

flush(timeout_ms: int | None = None) Awaitable[None]

Asinkrono. Čeka dok kontroler ne isprazni svako zaglavljeno send.

timeout_ms

Maksimalno vrijeme čekanja.

disconnect(timeout_ms: int = 1000) Awaitable[None]

Asinkrono. Isključuje kanal i čeka prekid (IRQ) za isključivanje.

timeout_ms

Maksimalno vrijeme čekanja.

disconnected(timeout_ms: int = 1000) Awaitable[None]

Asinkrono. Čeka dok bilo koja strana ne isključi kanal.

timeout_ms

Maksimalno vrijeme čekanja.