11.10. Agir en tant que central¶
L’autre côté de la conversation est le central – l’appareil qui recherche les périphériques en diffusion, en choisit un avec qui dialoguer, ouvre une connexion, parcourt la base de données GATT distante et lit ou s’abonne à des caractéristiques sur celle-ci. Une caméra qui collecte des mesures auprès d’un capteur porté sur soi, qui écoute une balise ou qui dialogue avec un microcontrôleur compagnon est un central.
Le motif du central dans aioble se déroule en quatre étapes : scanner, connecter, découvrir, opérer.
11.10.1. Scan¶
aioble.scan() renvoie un gestionnaire de contexte asynchrone qui sert également d’itérateur asynchrone sur les appareils découverts. L’usage typique consiste à scanner jusqu’à ce qu’un appareil d’intérêt apparaisse, puis à sortir de l’itération
import aioble
import asyncio
import bluetooth
HR_SERVICE = bluetooth.UUID(0x180D)
async def find_heart_rate():
async with aioble.scan(duration_ms=5000, active=True) as scanner:
async for result in scanner:
if HR_SERVICE in result.services():
return result.device
return None
duration_ms=5000 plafonne la durée du scan ; duration_ms=0 scanne indéfiniment (jusqu’à la sortie du gestionnaire de contexte). active=True demande les réponses de scan, ce qui double la taille de la charge utile par appareil au prix d’une petite transmission supplémentaire des deux côtés. Les arguments nommés restants interval_us / window_us règlent le cycle de service radio du scanner lui-même et sont rarement modifiés par rapport aux valeurs par défaut.
Chaque aioble.ScanResult expose l’adresse de l’appareil, le dernier RSSI, les octets bruts de diffusion et de réponse de scan, ainsi que des utilitaires qui analysent les champs standard :
result.device– unaioble.Deviceprêt pour un appel àconnect().result.rssi– l’indicateur de puissance du signal reçu en dBm, utile pour une logique de type « choisir le plus proche ».result.name()– la chaîne de nom local, ouNonesi non diffusée.result.services()– un générateur debluetooth.UUIDpour chaque service que l’appareil diffuse.result.manufacturer()– un générateur de tuples(company_id, data)pour les champs spécifiques au fabricant.result.connectable– indique si la diffusion la plus récente était connectable.
Le même ScanResult est de nouveau émis à mesure que de nouvelles données de diffusion arrivent pour le même appareil, de sorte qu’un écouteur passif qui souhaite simplement suivre des appareils indéfiniment peut exécuter l’itérateur asynchrone à l’infini et traiter chaque événement.
11.10.2. Connexion¶
Une fois un appareil cible identifié, l’ouverture d’une connexion tient en un seul await
async def talk_to(device):
connection = await device.connect() # 10 s timeout
async with connection:
# ... do GATT work ...
pass
aioble.Device.connect() accepte timeout_ms (combien de temps attendre que la connexion s’établisse ; 10 s par défaut), ainsi que min_conn_interval_us / max_conn_interval_us (la plage d’intervalle de connexion demandée, voir Connexions).
11.10.2.1. Se reconnecter à un pair connu sans scanner¶
Une fois qu’une liaison existe avec un pair, l’adresse est déjà connue et un nouveau cycle de scan et de sélection constitue du temps radio gaspillé. Construisez un aioble.Device directement avec l’adresse enregistrée et passez directement à connect()
import aioble
KITCHEN_CAM = aioble.Device(aioble.ADDR_PUBLIC,
"aa:bb:cc:dd:ee:ff")
async def talk_to_kitchen():
async with await KITCHEN_CAM.connect() as connection:
# ... GATT work ...
pass
Le premier argument est l’un de aioble.ADDR_PUBLIC (l’adresse d’usine d’un contrôleur) ou aioble.ADDR_RANDOM (une adresse privée statique ou résoluble générée) ; le second est soit une valeur bytes de six octets, soit une chaîne hexadécimale séparée par des deux-points. Les attributs addr_type et addr de tout Device (par exemple un obtenu précédemment d’un ScanResult) peuvent être conservés et réinjectés ici.
Le aioble.DeviceConnection renvoyé est ce sur quoi repose le reste du travail du central. async with garantit que la connexion est fermée à la sortie du bloc – en cas de succès, d’annulation ou de toute exception, y compris aioble.DeviceDisconnectedError si le pair disparaît.
Si le central a besoin d’une valeur de caractéristique plus grande que ce que le MTU par défaut de 23 octets autorise, c’est ici qu’il faut la négocier
await connection.exchange_mtu(512)
(exchange_mtu() renvoie le MTU effectivement négocié, qui est le minimum entre la valeur demandée et ce que le pair prend en charge.)
11.10.3. Découverte¶
La découverte parcourt la base de données GATT distante pour trouver les services et les caractéristiques par leurs UUID. Il existe deux variantes : ciblée (vous connaissez l’UUID et voulez une chose précise) et exhaustive (vous voulez tout).
Ciblée – le cas courant
service = await connection.service(HR_SERVICE)
if service is None:
return # no such service
char = await service.characteristic(HR_MEASUREMENT)
if char is None:
return # no such characteristic
aioble.DeviceConnection.service() et aioble.ClientService.characteristic() prennent chacune un bluetooth.UUID et renvoient l’objet correspondant (ou None). Toutes deux disposent d’un argument nommé timeout_ms par découverte dont la valeur par défaut est 2 s.
Exhaustive
async for service in connection.services():
print("service:", service.uuid)
async for char in service.characteristics():
print(" characteristic:", char.uuid, "properties:", hex(char.properties))
C’est ce que font les applications génériques d’exploration Bluetooth – utile pour le développement, moins pour du code de production qui sait quels UUID il attend.
11.10.3.1. Inspecter ce qu’une caractéristique prend en charge¶
La découverte renvoie le masque binaire de propriétés GATT que le pair a annoncé pour chaque caractéristique sous properties. Les bits sont ceux définis par GATT – read (0x02), write-without-response (0x04), write (0x08), notify (0x10), indicate (0x20), et les autres. Inspecter le masque binaire avant d’émettre une opération permet à un client générique de s’adapter à des caractéristiques dont il ne connaît pas les capacités à l’avance
_PROP_READ = const(0x02)
_PROP_NOTIFY = const(0x10)
char = await service.characteristic(STATUS_UUID)
if char.properties & _PROP_NOTIFY:
await char.subscribe(notify=True)
value = await char.notified()
elif char.properties & _PROP_READ:
value = await char.read()
else:
value = None # nothing the client can do
Le code de production qui connaît déjà le profil GATT du pair n’en a généralement pas besoin – les UUID étaient documentés dès le départ. Les clients génériques / exploratoires (une page de réglages qui parcourt un appareil inconnu, un hôte de greffons) s’appuient dessus.
11.10.4. Opérer¶
Une fois que le central détient une ClientCharacteristic, chaque opération GATT est un seul appel de coroutine :
Lecture. Émettez une lecture GATT et récupérez la valeur
value = await char.read() print("value:", value)
Les lectures longues (valeurs plus grandes que le MTU) sont gérées de manière transparente.
Écriture. Envoyez une nouvelle valeur au serveur
await char.write(b"\\x01")response=Trueattend une réponse d’écriture et lèveaioble.GattErrorsi le serveur rejette l’écriture.response=Falseest l’écriture sans réponse : envoi sans suivi.response=None(la valeur par défaut) choisit automatiquement en fonction de ce que le pair a annoncé.Abonnement. Activez les notifications ou les indications en écrivant dans le CCCD de la caractéristique
await char.subscribe(notify=True)Une fois ceci terminé, le central peut attendre les envois entrants.
Notifié / indiqué. Attendez le prochain envoi du serveur
while True: data = await char.notified() print("push:", data)
timeout_ms=None(la valeur par défaut) attend indéfiniment ; passez un entier en millisecondes pour abandonner au bout d’un certain temps.
Réunir les quatre donne le programme central canonique « connecter, s’abonner, diffuser »
async def stream_heart_rate():
async with aioble.scan(duration_ms=5000, active=True) as scanner:
async for result in scanner:
if HR_SERVICE in result.services():
device = result.device
break
else:
return
async with await device.connect() as connection:
service = await connection.service(HR_SERVICE)
char = await service.characteristic(HR_MEASUREMENT)
await char.subscribe(notify=True)
while connection.is_connected():
data = await char.notified()
print("hr push:", data)
asyncio.run(stream_heart_rate())
L’ensemble tient en une douzaine de lignes et couvre le flux allant de « aucun Bluetooth en cours d’exécution » à « données en direct diffusées ». L’itérateur de scan correspond au motif diffuseur/observateur, connect ouvre la connexion GAP, service / characteristic parcourt l’arbre GATT, subscribe écrit le CCCD, et notified attend les envois.
11.10.5. Déconnexions et reconnexion¶
Tout ce qui arrive à la liaison radio remonte dans la coroutine qui l’attendait. aioble.DeviceDisconnectedError est le signal que le pair a disparu ou que le délai de supervision s’est écoulé ; l’exception met fin à tout appel à read(), write() ou notified() en cours, et tout bloc async with connection se termine proprement.
Un central qui doit se reconnecter en cas de perte encapsule le travail dans sa propre boucle externe
async def keep_streaming():
while True:
try:
await stream_heart_rate()
except aioble.DeviceDisconnectedError:
print("disconnected, retrying...")
await asyncio.sleep(2)
11.10.5.1. Encadrer une séquence avec timeout()¶
Lorsque plusieurs opérations GATT consécutives doivent toutes se terminer dans un même budget – et non chacune individuellement avec son propre timeout_ms – utilisez aioble.DeviceConnection.timeout() pour les encadrer. Le gestionnaire de contexte renvoyé annule son corps si le budget s’écoule (en levant asyncio.TimeoutError) ou si le pair se déconnecte (en levant aioble.DeviceDisconnectedError)
async with await device.connect() as connection:
try:
with connection.timeout(2000): # 2 s for the whole block
service = await connection.service(HR_SERVICE)
char = await service.characteristic(HR_MEASUREMENT)
await char.subscribe(notify=True)
except asyncio.TimeoutError:
print("discovery + subscribe took too long")
C’est l’alternative plus propre à l’encapsulation de chaque appel individuellement dans asyncio.wait_for(), et cela évite les succès trompeurs où chaque appel respecte sa propre échéance mais où la séquence dans son ensemble dépasse le temps imparti. Passer timeout_ms=None à timeout() désactive l’échéance et ne laisse active que la protection contre la déconnexion.