11.10. Agir como central

O outro lado da conversa é o central – o dispositivo que procura periféricos a anunciar, escolhe um com quem falar, abre uma ligação, percorre a base de dados GATT remota e lê ou subscreve características nela. Uma câmara que recolhe leituras de um sensor vestível, ouve um beacon ou fala com um microcontrolador complementar é um central.

O padrão central em aioble passa por quatro fases: scan, ligar, descobrir, operar.

11.10.1. Scanning

aioble.scan() devolve um gestor de contexto assíncrono que funciona também como iterador assíncrono sobre dispositivos descobertos. A utilização típica é fazer scan até aparecer um dispositivo de interesse e depois sair da iteração:

import aioble
import asyncio
import bluetooth

HR_SERVICE = bluetooth.UUID(0x180D)

async def find_heart_rate():
    async with aioble.scan(duration_ms=5000, active=True) as scanner:
        async for result in scanner:
            if HR_SERVICE in result.services():
                return result.device
    return None

duration_ms=5000 limita o tempo de duração do scan; duration_ms=0 faz scan indefinidamente (até o gestor de contexto sair). active=True pede respostas de scan, o que duplica o tamanho da carga por dispositivo ao custo de uma pequena transmissão adicional de ambos os lados. Os restantes argumentos de palavra-chave interval_us / window_us ajustam o ciclo de trabalho rádio do próprio scanner e raramente são alterados relativamente aos valores predefinidos.

Cada aioble.ScanResult expõe o endereço do dispositivo, o último RSSI, os bytes brutos de publicidade e resposta de scan, e auxiliares que analisam os campos padrão:

  • result.device – um aioble.Device pronto para invocar connect().

  • result.rssi – indicador de intensidade do sinal recebido em dBm, útil para lógica de «escolher o mais próximo».

  • result.name() – a cadeia de caracteres do nome local, ou None se não for anunciado.

  • result.services() – um gerador de bluetooth.UUID para cada serviço que o dispositivo anuncia.

  • result.manufacturer() – um gerador de tuplos (company_id, data) para os campos específicos do fabricante.

  • result.connectable – se o anúncio mais recente era do tipo conectável.

O mesmo ScanResult é gerado novamente quando chegam novos dados de publicidade para o mesmo dispositivo, pelo que um ouvinte passivo que apenas pretende monitorizar dispositivos indefinidamente pode executar o iterador assíncrono para sempre e despachar cada evento.

11.10.2. Ligar

Assim que um dispositivo alvo é identificado, abrir uma ligação é um único await

async def talk_to(device):
    connection = await device.connect()           # 10 s timeout
    async with connection:
        # ... do GATT work ...
        pass

aioble.Device.connect() aceita timeout_ms (quanto tempo esperar pela ligação; predefinição 10 s), e min_conn_interval_us / max_conn_interval_us (o intervalo de ligação solicitado de Ligações).

11.10.2.1. Reconectar a um par conhecido sem fazer scan

Quando já existe uma ligação com um par, o endereço já é conhecido e outra ronda de scan-e-escolha é tempo de rádio desperdiçado. Construa um aioble.Device diretamente com o endereço guardado e salte diretamente para connect()

import aioble

KITCHEN_CAM = aioble.Device(aioble.ADDR_PUBLIC,
                            "aa:bb:cc:dd:ee:ff")

async def talk_to_kitchen():
    async with await KITCHEN_CAM.connect() as connection:
        # ... GATT work ...
        pass

O primeiro argumento é um dos aioble.ADDR_PUBLIC (endereço de fábrica do controlador) ou aioble.ADDR_RANDOM (endereço privado estático ou resolvível gerado); o segundo é um valor bytes de seis bytes ou uma cadeia hexadecimal separada por dois pontos. Os atributos addr_type e addr de qualquer Device (p. ex., um obtido anteriormente de um ScanResult) podem ser persistidos e passados aqui.

O aioble.DeviceConnection devolvido é aquilo em que assenta o resto do trabalho do central. async with garante que a ligação é fechada quando o bloco termina – em caso de sucesso, cancelamento, ou qualquer exceção incluindo aioble.DeviceDisconnectedError quando o par se desliga.

Se o central precisar de um valor de característica maior do que o MTU predefinido de 23 bytes permite, este é o local para o negociar:

await connection.exchange_mtu(512)

(exchange_mtu() devolve o MTU efetivamente negociado, que é o mínimo entre o valor solicitado e o que o par suporta.)

11.10.3. Descoberta

A descoberta percorre a base de dados GATT remota para encontrar serviços e características pelos seus UUIDs. Existem dois tipos: dirigida (conhece o UUID e quer uma coisa específica) e exaustiva (quer tudo).

Dirigida – o caso comum:

service = await connection.service(HR_SERVICE)
if service is None:
    return                                        # no such service

char = await service.characteristic(HR_MEASUREMENT)
if char is None:
    return                                        # no such characteristic

aioble.DeviceConnection.service() e aioble.ClientService.characteristic() aceitam cada um um bluetooth.UUID e devolvem o objeto correspondente (ou None). Ambos têm uma palavra-chave timeout_ms por descoberta com valor predefinido de 2 s.

Exaustiva:

async for service in connection.services():
    print("service:", service.uuid)
    async for char in service.characteristics():
        print("  characteristic:", char.uuid, "properties:", hex(char.properties))

É isto que fazem as aplicações genéricas de exploração Bluetooth – útil para desenvolvimento, menos para código de produção que já sabe quais UUIDs espera.

11.10.3.1. Inspecionar o que uma característica suporta

A descoberta devolve o bitmask de propriedades GATT que o par anunciou para cada característica em properties. Os bits são os definidos pelo GATT – leitura (0x02), escrita sem resposta (0x04), escrita (0x08), notificar (0x10), indicar (0x20), entre outros. Inspecionar o bitmask antes de emitir uma operação permite que um cliente genérico se adapte a características cujas capacidades não conhece antecipadamente:

_PROP_READ = const(0x02)
_PROP_NOTIFY = const(0x10)

char = await service.characteristic(STATUS_UUID)
if char.properties & _PROP_NOTIFY:
    await char.subscribe(notify=True)
    value = await char.notified()
elif char.properties & _PROP_READ:
    value = await char.read()
else:
    value = None                                  # nothing the client can do

O código de produção que já conhece o perfil GATT do par habitualmente não precisa disto – os UUIDs foram documentados antecipadamente. Clientes genéricos/exploratórios (uma página de definições que percorre um dispositivo desconhecido, um anfitrião de plugins) apoiam-se nisto.

11.10.4. Operar

Assim que o central detém uma ClientCharacteristic, cada operação GATT é uma chamada de corrotina:

  • Leitura. Emitir uma leitura GATT e obter o valor de volta:

    value = await char.read()
    print("value:", value)
    

    Leituras longas (valores maiores que o MTU) são tratadas de forma transparente.

  • Escrita. Enviar um novo valor para o servidor:

    await char.write(b"\\x01")
    

    response=True aguarda uma resposta de escrita e lança aioble.GattError se o servidor rejeitar a escrita. response=False é escrita sem resposta: disparar e esquecer. response=None (o valor predefinido) escolhe automaticamente com base no que o par anunciou.

  • Subscrever. Ativar notificações ou indicações escrevendo no CCCD da característica:

    await char.subscribe(notify=True)
    

    Após este retorno, o central pode aguardar por envios recebidos.

  • Notificado / indicado. Aguardar pelo próximo envio do servidor:

    while True:
        data = await char.notified()
        print("push:", data)
    

    timeout_ms=None (o valor predefinido) aguarda indefinidamente; passe um inteiro em milissegundos para desistir após algum tempo.

Juntar as quatro partes dá o programa central canónico «ligar, subscrever, transmitir»:

async def stream_heart_rate():
    async with aioble.scan(duration_ms=5000, active=True) as scanner:
        async for result in scanner:
            if HR_SERVICE in result.services():
                device = result.device
                break
        else:
            return

    async with await device.connect() as connection:
        service = await connection.service(HR_SERVICE)
        char = await service.characteristic(HR_MEASUREMENT)
        await char.subscribe(notify=True)
        while connection.is_connected():
            data = await char.notified()
            print("hr push:", data)

asyncio.run(stream_heart_rate())

O conjunto tem cerca de uma dúzia de linhas e cobre o fluxo desde «sem Bluetooth ativo» até «transmissão de dados em tempo real». O iterador de scan corresponde ao padrão broadcaster/observer, connect abre a ligação GAP, service / characteristic percorre a árvore GATT, subscribe escreve no CCCD, e notified aguarda por envios.

11.10.5. Desligamentos e reconexão

Qualquer coisa que aconteça à ligação rádio surge na corrotina que estava a aguardar nela. aioble.DeviceDisconnectedError é o sinal de que o par se desligou ou o tempo de supervisão expirou; a exceção termina qualquer chamada read(), write(), ou notified() que estivesse em curso, e qualquer bloco async with connection termina de forma limpa.

Um central que deve reconectar em caso de perda envolve o trabalho no seu próprio ciclo externo:

async def keep_streaming():
    while True:
        try:
            await stream_heart_rate()
        except aioble.DeviceDisconnectedError:
            print("disconnected, retrying...")
            await asyncio.sleep(2)

11.10.5.1. Delimitar uma sequência com timeout()

Quando várias operações GATT seguidas devem completar dentro de um único orçamento de tempo – não cada uma individualmente com o seu próprio timeout_ms – use aioble.DeviceConnection.timeout() para as envolver. O gestor de contexto devolvido cancela o seu corpo se o orçamento expirar (lançando asyncio.TimeoutError) ou se o par se desligar (lançando aioble.DeviceDisconnectedError):

async with await device.connect() as connection:
    try:
        with connection.timeout(2000):                    # 2 s for the whole block
            service = await connection.service(HR_SERVICE)
            char = await service.characteristic(HR_MEASUREMENT)
            await char.subscribe(notify=True)
    except asyncio.TimeoutError:
        print("discovery + subscribe took too long")

Esta é a alternativa mais limpa a envolver cada chamada individualmente em asyncio.wait_for() e evita sucessos espúrios onde cada chamada cumpre o seu próprio prazo mas a sequência como um todo ultrapassa o limite. Passar timeout_ms=None para timeout() desativa o prazo e deixa apenas o guarda de desligamento ativo.