11.9. Agir comme périphérique¶
Le schéma BLE côté caméra le plus courant consiste à agir comme périphérique – publier une petite base de données GATT, annoncer son existence, accepter une connexion depuis un téléphone ou un appareil compagnon, et transmettre des valeurs à quiconque se trouve à l’autre extrémité.
11.9.1. Construction de la base de données GATT¶
La première chose qu’un périphérique fait au démarrage – avant même d’allumer la radio – est de construire la base de données qu’il compte exposer, de créer des objets pour chaque service et caractéristique, puis d’enregistrer l’ensemble
import aioble
import bluetooth
ENV_SERVICE = bluetooth.UUID(0x181A) # Environmental Sensing
TEMP_UUID = bluetooth.UUID(0x2A6E) # Temperature
HUMID_UUID = bluetooth.UUID(0x2A6F) # Humidity
env = aioble.Service(ENV_SERVICE)
temp_char = aioble.Characteristic(
env, TEMP_UUID,
read=True, notify=True, initial=b"\\x00\\x00",
)
humid_char = aioble.Characteristic(
env, HUMID_UUID,
read=True, notify=True, initial=b"\\x00\\x00",
)
aioble.register_services(env)
Chaque aioble.Characteristic est rattachée à son service simplement en la construisant avec le service comme premier argument. Les arguments nommés booléens (read, write, write_no_response, notify, indicate) sélectionnent les opérations GATT que le client sera autorisé à effectuer ; passer False (la valeur par défaut) signifie que le bit de propriété n’est pas activé.
aioble.register_services() valide l’arbre assemblé dans le serveur GATT. Elle doit être appelée une fois, avant qu’un aioble.advertise() ne démarre ; l’appeler de nouveau remplace la base de données précédente.
11.9.2. Annonce (advertising)¶
Une fois la base de données en place, l’annonce se résume à un appel de coroutine qui attend une connexion
async def serve_one():
connection = await aioble.advertise(
interval_us=250000,
name="openmv-env",
services=[ENV_SERVICE],
appearance=0x0540, # Generic Sensor
)
Les arguments nommés correspondent directement aux champs de la charge utile d’annonce. name est le champ de nom local ; services est la liste des UUID de services que l’appareil héberge (un scanner côté téléphone peut filtrer selon ceux-ci) ; appearance est un indice issu des valeurs d’apparence standard sur 16 bits qui permet au central d’afficher une icône pertinente. Les données spécifiques au fabricant sont transmises via manufacturer=(company_id, data_bytes).
Quelques mots-clés moins courants couvrent le reste de l’espace des indicateurs d’annonce :
connectable=False– mode diffusion seule (aucune connexion n’est jamais acceptée). Le bon choix pour des charges utiles de type balise.limited_disc=True– utilise l’indicateur limited discoverable au lieu de general discoverable ; certains systèmes d’exploitation traitent les deux différemment dans leur interface d’appairage.adv_data/resp_data– octets bruts si l’application a besoin d’un contrôle total sur la disposition.timeout_ms– abandonner après une durée fixe. Par défaut, l’annonce se poursuit indéfiniment.
Lorsqu’un central se connecte, aioble.advertise() renvoie la aioble.DeviceConnection résultante. Le périphérique cesse d’annoncer à ce moment-là.
11.9.3. Servir un seul client¶
La boucle principale d’un périphérique ressemble généralement à ceci :
async def serve():
while True:
connection = await aioble.advertise(
interval_us=250000,
name="openmv-env",
services=[ENV_SERVICE],
)
print("connected:", connection.device.addr_hex())
async with connection:
await connection.disconnected()
print("disconnected; advertising again")
asyncio.run(serve())
async with connection rend le nettoyage de déconnexion automatique. disconnected() est une coroutine qui se suspend jusqu’à ce que l’une des deux extrémités mette fin à la connexion – un moyen propre de maintenir le périphérique en service jusqu’à ce que le central s’en aille, puis de revenir en boucle à l’annonce pour le tour suivant.
11.9.4. Mise à jour d’une caractéristique¶
Le périphérique met à jour la base de données GATT locale avec aioble.Characteristic.write()
temp_char.write(b"\\x9a\\x09") # 24.58 deg C as sint16, 0.01 units
Cela modifie la valeur que renverrait la prochaine opération read de tout client. En soi, cela ne pousse pas la nouvelle valeur – un client abonné ne verra rien tant qu’il n’interroge pas ou que le périphérique n’envoie pas de notification explicite.
Le côté push se résume à un seul mot-clé sur le même appel
temp_char.write(temp_bytes, send_update=True)
send_update=True notifie (ou indique) chaque client qui s’est abonné à cette caractéristique. La plupart du code de type capteur réside dans une tâche par connexion qui boucle en lisant le capteur et en écrivant la valeur avec send_update=True toutes les secondes environ
async def stream_temperature(connection):
while connection.is_connected():
temp_char.write(encode_temperature(read_sensor()), send_update=True)
await asyncio.sleep(1)
async def serve():
while True:
connection = await aioble.advertise(
interval_us=250000,
name="openmv-env",
services=[ENV_SERVICE],
)
async with connection:
asyncio.create_task(stream_temperature(connection))
await connection.disconnected()
Si vous préférez diriger une notification vers un client spécifique plutôt que vers l’ensemble des abonnés (par exemple une réponse privée à la connexion en réaction à la commande de ce client), aioble.Characteristic.notify() et indicate() acceptent un argument DeviceConnection et une charge utile facultative.
11.9.5. Réception des écritures¶
L’autre direction – un client qui écrit dans une caractéristique – devient disponible lorsque la caractéristique est construite avec write=True ou write_no_response=True. Le périphérique attend la prochaine écriture avec aioble.Characteristic.written()
cmd_char = aioble.Characteristic(env, CMD_UUID, write=True, capture=True)
async def handle_commands():
while True:
connection, data = await cmd_char.written()
print("command from", connection.device.addr_hex(), "=", data)
Sans capture=True, written() renvoie uniquement la connexion qui écrit ; la nouvelle valeur réside dans le tampon de stockage de la caractéristique et l’application la récupère avec read(). Si une deuxième écriture arrive avant que l’application n’ait lu la première, la deuxième valeur écrase la première dans le tampon et la valeur d’origine est perdue – written() réveille tout de même l’application, mais une seule fois par « il y a du nouveau », et non une fois par écriture.
Le mot-clé capture=True corrige cela. Chaque écriture entrante est ajoutée à une file d’attente à l’échelle du module, et written() renvoie un tuple (connection, data) pour chaque écriture individuelle – la boucle de l’application voit chacune d’elles exactement une fois, dans l’ordre d’arrivée. Deux conséquences pratiques :
La file d’attente est bornée et est partagée entre toutes les caractéristiques de l’appareil pour lesquelles la capture est activée. De courtes rafales d’écritures consécutives sont tolérées ; un débordement soutenu (des écritures arrivant plus vite que l’application ne les vide) supprime silencieusement les entrées en file d’attente les plus anciennes, et un trafic en rafale sur une caractéristique peut évincer des entrées en attente d’une autre.
Choisissez
capture=Truepour les écritures de type commande où chaque valeur compte. Laissez-le désactivé pour les caractéristiques de type état où seule la dernière valeur présente un intérêt.
Si une lecture du client doit être satisfaite par du code exécuté à la demande plutôt que par une valeur statique, redéfinissez on_read(). La méthode est appelée de façon synchrone lorsqu’une lecture arrive ; renvoyez 0 pour autoriser la lecture (la valeur courante issue de write() sera envoyée), ou un code d’erreur ATT non nul pour la rejeter
import time
_ATT_ERR_READ_NOT_PERMITTED = const(0x02)
_MIN_READ_INTERVAL_MS = const(1000) # at most once per second
class TempChar(aioble.Characteristic):
_last_read_ms = 0
def on_read(self, connection):
now = time.ticks_ms()
if time.ticks_diff(now, self._last_read_ms) < _MIN_READ_INTERVAL_MS:
return _ATT_ERR_READ_NOT_PERMITTED
self._last_read_ms = now
self.write(encode_temperature(read_sensor()))
return 0
temp_char = TempChar(env, TEMP_UUID, read=True)
La fonction de rappel échantillonne le capteur et met à jour la valeur de la caractéristique juste avant que la pile GATT ne serve la lecture, de sorte que le client voie toujours des données fraîches. La limitation de débit empêche un client de solliciter le capteur plus vite qu’il ne peut être échantillonné – toute lecture pendant le temps de recharge d’une seconde est renvoyée comme une erreur ATT Read Not Permitted plutôt qu’une valeur périmée.
11.9.5.1. Tampons de stockage plus grands – BufferedCharacteristic¶
Le tampon de stockage d’une Characteristic ordinaire fait 20 octets de large – la limite pratique avec la MTU par défaut de 23 octets. Un client qui écrit plus que cela dans une caractéristique ordinaire voit sa valeur tronquée. Pour des valeurs entrantes plus grandes ou pour mettre en file d’attente des écritures consécutives que la boucle de l’application rattrapera plus tard, déclarez la caractéristique comme BufferedCharacteristic et choisissez la taille du tampon dès le départ
blob = aioble.BufferedCharacteristic(
service, BLOB_UUID,
max_len=512, append=True,
write=True, capture=True,
)
async def receive_blob():
while True:
connection, chunk = await blob.written()
handle_chunk(connection, chunk)
Deux réglages la distinguent d’une Characteristic ordinaire :
max_lenest la taille du tampon de stockage en octets. Choisissez-la pour correspondre à la plus grande écriture unique que le client est censé effectuer (après négociation de la MTU).append=Truefait en sorte que les écritures séquentielles s’ajoutent dans le tampon au lieu de l’écraser – utile pour recevoir une valeur qui arrive en plusieurs écritures (fragments de mise à jour du micrologiciel, lignes de journal). Avecappend=False, le tampon se comporte comme une caractéristique normale, simplement plus large.
Tous les autres indicateurs du constructeur (read, write, notify, indicate, capture, initial) sont transmis tels quels à la caractéristique sous-jacente.
11.9.6. Services standard et UUID attribués par le SIG¶
S’en tenir aux UUID des numéros attribués (0x180F pour Battery Service, 0x181A pour Environmental Sensing, 0x180D pour Heart Rate, et ainsi de suite) signifie que le menu Bluetooth générique d’un téléphone ou toute application de scanner tierce peut identifier la fonction de l’appareil sans aucun code client personnalisé. La disposition des octets à l’intérieur de chaque caractéristique standard est elle aussi fixée par la spécification – Battery Level (0x2A19) est un octet unique de 0 à 100 ; Temperature (0x2A6E) est un sint16 en little-endian en unités de 0,01 °C. Pour les applications qui ne correspondent pas à un service standard, générez un UUID de 128 bits une fois et utilisez-le pour les services et les caractéristiques de l’appareil.
Un périphérique qui ne publie que des UUID personnalisés reste tout à fait valable – il lui faut simplement une application cliente personnalisée qui connaît ces UUID.
Note
Les valeurs BLE sont little-endian partout – la spécification GATT, chaque caractéristique standard, chaque champ d’annonce. Les entiers multi-octets circulent sur le fil octet de poids faible en premier. Le préfixe < dans les chaînes de format de struct est ce qu’il vous faut pour l’encodage/décodage ("<h", "<H", "<I", …) ; utiliser l’ordre des octets natif par défaut sur un MCU little-endian fonctionne par hasard pour l’instant, mais expliciter < est l’habitude la plus sûre.
11.9.7. La radio derrière tout cela¶
La radio est active dès l’instant où la première coroutine aioble y touche. Tant qu’un central n’est pas connecté, le périphérique passe son temps à alterner entre de brèves rafales d’annonce et la veille ; après une connexion, il suit l’intervalle de connexion négocié. Le périphérique paie un léger coût d’énergie par annonce, de sorte que le choix de interval_us sur aioble.advertise() est le réglage le plus direct dont dispose un périphérique pour arbitrer entre la latence de découverte et l’autonomie de la batterie.