11.10. Agir como central¶
O outro lado da conversa é o central – o dispositivo que procura periféricos a anunciar, escolhe um com quem falar, abre uma ligação, percorre a base de dados GATT remota e lê ou subscreve características nela. Uma câmara que recolhe leituras de um sensor vestível, ouve um beacon ou fala com um microcontrolador complementar é um central.
O padrão central em aioble passa por quatro fases: scan, ligar, descobrir, operar.
11.10.1. Scanning¶
aioble.scan() devolve um gestor de contexto assíncrono que funciona também como iterador assíncrono sobre dispositivos descobertos. A utilização típica é fazer scan até aparecer um dispositivo de interesse e depois sair da iteração:
import aioble
import asyncio
import bluetooth
HR_SERVICE = bluetooth.UUID(0x180D)
async def find_heart_rate():
async with aioble.scan(duration_ms=5000, active=True) as scanner:
async for result in scanner:
if HR_SERVICE in result.services():
return result.device
return None
duration_ms=5000 limita o tempo de duração do scan; duration_ms=0 faz scan indefinidamente (até o gestor de contexto sair). active=True pede respostas de scan, o que duplica o tamanho da carga por dispositivo ao custo de uma pequena transmissão adicional de ambos os lados. Os restantes argumentos de palavra-chave interval_us / window_us ajustam o ciclo de trabalho rádio do próprio scanner e raramente são alterados relativamente aos valores predefinidos.
Cada aioble.ScanResult expõe o endereço do dispositivo, o último RSSI, os bytes brutos de publicidade e resposta de scan, e auxiliares que analisam os campos padrão:
result.device– umaioble.Devicepronto para invocarconnect().result.rssi– indicador de intensidade do sinal recebido em dBm, útil para lógica de «escolher o mais próximo».result.name()– a cadeia de caracteres do nome local, ouNonese não for anunciado.result.services()– um gerador debluetooth.UUIDpara cada serviço que o dispositivo anuncia.result.manufacturer()– um gerador de tuplos(company_id, data)para os campos específicos do fabricante.result.connectable– se o anúncio mais recente era do tipo conectável.
O mesmo ScanResult é gerado novamente quando chegam novos dados de publicidade para o mesmo dispositivo, pelo que um ouvinte passivo que apenas pretende monitorizar dispositivos indefinidamente pode executar o iterador assíncrono para sempre e despachar cada evento.
11.10.2. Ligar¶
Assim que um dispositivo alvo é identificado, abrir uma ligação é um único await
async def talk_to(device):
connection = await device.connect() # 10 s timeout
async with connection:
# ... do GATT work ...
pass
aioble.Device.connect() aceita timeout_ms (quanto tempo esperar pela ligação; predefinição 10 s), e min_conn_interval_us / max_conn_interval_us (o intervalo de ligação solicitado de Ligações).
11.10.2.1. Reconectar a um par conhecido sem fazer scan¶
Quando já existe uma ligação com um par, o endereço já é conhecido e outra ronda de scan-e-escolha é tempo de rádio desperdiçado. Construa um aioble.Device diretamente com o endereço guardado e salte diretamente para connect()
import aioble
KITCHEN_CAM = aioble.Device(aioble.ADDR_PUBLIC,
"aa:bb:cc:dd:ee:ff")
async def talk_to_kitchen():
async with await KITCHEN_CAM.connect() as connection:
# ... GATT work ...
pass
O primeiro argumento é um dos aioble.ADDR_PUBLIC (endereço de fábrica do controlador) ou aioble.ADDR_RANDOM (endereço privado estático ou resolvível gerado); o segundo é um valor bytes de seis bytes ou uma cadeia hexadecimal separada por dois pontos. Os atributos addr_type e addr de qualquer Device (p. ex., um obtido anteriormente de um ScanResult) podem ser persistidos e passados aqui.
O aioble.DeviceConnection devolvido é aquilo em que assenta o resto do trabalho do central. async with garante que a ligação é fechada quando o bloco termina – em caso de sucesso, cancelamento, ou qualquer exceção incluindo aioble.DeviceDisconnectedError quando o par se desliga.
Se o central precisar de um valor de característica maior do que o MTU predefinido de 23 bytes permite, este é o local para o negociar:
await connection.exchange_mtu(512)
(exchange_mtu() devolve o MTU efetivamente negociado, que é o mínimo entre o valor solicitado e o que o par suporta.)
11.10.3. Descoberta¶
A descoberta percorre a base de dados GATT remota para encontrar serviços e características pelos seus UUIDs. Existem dois tipos: dirigida (conhece o UUID e quer uma coisa específica) e exaustiva (quer tudo).
Dirigida – o caso comum:
service = await connection.service(HR_SERVICE)
if service is None:
return # no such service
char = await service.characteristic(HR_MEASUREMENT)
if char is None:
return # no such characteristic
aioble.DeviceConnection.service() e aioble.ClientService.characteristic() aceitam cada um um bluetooth.UUID e devolvem o objeto correspondente (ou None). Ambos têm uma palavra-chave timeout_ms por descoberta com valor predefinido de 2 s.
Exaustiva:
async for service in connection.services():
print("service:", service.uuid)
async for char in service.characteristics():
print(" characteristic:", char.uuid, "properties:", hex(char.properties))
É isto que fazem as aplicações genéricas de exploração Bluetooth – útil para desenvolvimento, menos para código de produção que já sabe quais UUIDs espera.
11.10.3.1. Inspecionar o que uma característica suporta¶
A descoberta devolve o bitmask de propriedades GATT que o par anunciou para cada característica em properties. Os bits são os definidos pelo GATT – leitura (0x02), escrita sem resposta (0x04), escrita (0x08), notificar (0x10), indicar (0x20), entre outros. Inspecionar o bitmask antes de emitir uma operação permite que um cliente genérico se adapte a características cujas capacidades não conhece antecipadamente:
_PROP_READ = const(0x02)
_PROP_NOTIFY = const(0x10)
char = await service.characteristic(STATUS_UUID)
if char.properties & _PROP_NOTIFY:
await char.subscribe(notify=True)
value = await char.notified()
elif char.properties & _PROP_READ:
value = await char.read()
else:
value = None # nothing the client can do
O código de produção que já conhece o perfil GATT do par habitualmente não precisa disto – os UUIDs foram documentados antecipadamente. Clientes genéricos/exploratórios (uma página de definições que percorre um dispositivo desconhecido, um anfitrião de plugins) apoiam-se nisto.
11.10.4. Operar¶
Assim que o central detém uma ClientCharacteristic, cada operação GATT é uma chamada de corrotina:
Leitura. Emitir uma leitura GATT e obter o valor de volta:
value = await char.read() print("value:", value)
Leituras longas (valores maiores que o MTU) são tratadas de forma transparente.
Escrita. Enviar um novo valor para o servidor:
await char.write(b"\\x01")response=Trueaguarda uma resposta de escrita e lançaaioble.GattErrorse o servidor rejeitar a escrita.response=Falseé escrita sem resposta: disparar e esquecer.response=None(o valor predefinido) escolhe automaticamente com base no que o par anunciou.Subscrever. Ativar notificações ou indicações escrevendo no CCCD da característica:
await char.subscribe(notify=True)Após este retorno, o central pode aguardar por envios recebidos.
Notificado / indicado. Aguardar pelo próximo envio do servidor:
while True: data = await char.notified() print("push:", data)
timeout_ms=None(o valor predefinido) aguarda indefinidamente; passe um inteiro em milissegundos para desistir após algum tempo.
Juntar as quatro partes dá o programa central canónico «ligar, subscrever, transmitir»:
async def stream_heart_rate():
async with aioble.scan(duration_ms=5000, active=True) as scanner:
async for result in scanner:
if HR_SERVICE in result.services():
device = result.device
break
else:
return
async with await device.connect() as connection:
service = await connection.service(HR_SERVICE)
char = await service.characteristic(HR_MEASUREMENT)
await char.subscribe(notify=True)
while connection.is_connected():
data = await char.notified()
print("hr push:", data)
asyncio.run(stream_heart_rate())
O conjunto tem cerca de uma dúzia de linhas e cobre o fluxo desde «sem Bluetooth ativo» até «transmissão de dados em tempo real». O iterador de scan corresponde ao padrão broadcaster/observer, connect abre a ligação GAP, service / characteristic percorre a árvore GATT, subscribe escreve no CCCD, e notified aguarda por envios.
11.10.5. Desligamentos e reconexão¶
Qualquer coisa que aconteça à ligação rádio surge na corrotina que estava a aguardar nela. aioble.DeviceDisconnectedError é o sinal de que o par se desligou ou o tempo de supervisão expirou; a exceção termina qualquer chamada read(), write(), ou notified() que estivesse em curso, e qualquer bloco async with connection termina de forma limpa.
Um central que deve reconectar em caso de perda envolve o trabalho no seu próprio ciclo externo:
async def keep_streaming():
while True:
try:
await stream_heart_rate()
except aioble.DeviceDisconnectedError:
print("disconnected, retrying...")
await asyncio.sleep(2)
11.10.5.1. Delimitar uma sequência com timeout()¶
Quando várias operações GATT seguidas devem completar dentro de um único orçamento de tempo – não cada uma individualmente com o seu próprio timeout_ms – use aioble.DeviceConnection.timeout() para as envolver. O gestor de contexto devolvido cancela o seu corpo se o orçamento expirar (lançando asyncio.TimeoutError) ou se o par se desligar (lançando aioble.DeviceDisconnectedError):
async with await device.connect() as connection:
try:
with connection.timeout(2000): # 2 s for the whole block
service = await connection.service(HR_SERVICE)
char = await service.characteristic(HR_MEASUREMENT)
await char.subscribe(notify=True)
except asyncio.TimeoutError:
print("discovery + subscribe took too long")
Esta é a alternativa mais limpa a envolver cada chamada individualmente em asyncio.wait_for() e evita sucessos espúrios onde cada chamada cumpre o seu próprio prazo mas a sequência como um todo ultrapassa o limite. Passar timeout_ms=None para timeout() desativa o prazo e deixa apenas o guarda de desligamento ativo.