11.9. Atuando como um periférico

O padrão BLE mais comum do lado da câmera é atuar como um periférico – publicar um pequeno banco de dados GATT, anunciar sua existência, aceitar uma conexão de um celular ou de um dispositivo complementar e transmitir valores para quem estiver do outro lado.

11.9.1. Construindo o banco de dados GATT

A primeira coisa que um periférico faz na inicialização – antes mesmo de ligar o rádio – é construir o banco de dados que pretende expor, criar objetos para cada serviço e característica e, em seguida, registrar tudo:

import aioble
import bluetooth

ENV_SERVICE = bluetooth.UUID(0x181A)              # Environmental Sensing
TEMP_UUID = bluetooth.UUID(0x2A6E)                # Temperature
HUMID_UUID = bluetooth.UUID(0x2A6F)               # Humidity

env = aioble.Service(ENV_SERVICE)
temp_char = aioble.Characteristic(
    env, TEMP_UUID,
    read=True, notify=True, initial=b"\\x00\\x00",
)
humid_char = aioble.Characteristic(
    env, HUMID_UUID,
    read=True, notify=True, initial=b"\\x00\\x00",
)

aioble.register_services(env)

Cada aioble.Characteristic é vinculada ao seu serviço simplesmente construindo-a com o serviço como primeiro argumento. Os argumentos nomeados booleanos (read, write, write_no_response, notify, indicate) selecionam quais operações GATT o cliente poderá realizar; passar False (o padrão) significa que o bit da propriedade não é definido.

aioble.register_services() confirma a árvore montada no servidor GATT. Ela deve ser chamada uma vez, antes que qualquer aioble.advertise() comece; chamá-la novamente substitui o banco de dados anterior.

11.9.2. Anunciando

Uma vez que o banco de dados esteja pronto, anunciar é uma única chamada de corrotina que aguarda por uma conexão:

async def serve_one():
    connection = await aioble.advertise(
        interval_us=250000,
        name="openmv-env",
        services=[ENV_SERVICE],
        appearance=0x0540,           # Generic Sensor
    )

Os argumentos nomeados mapeiam diretamente para os campos da carga útil de anúncio. name é o campo de nome local; services é a lista de UUIDs de serviços que o dispositivo hospeda (um scanner no lado do celular pode filtrar por eles); appearance é uma dica, a partir dos valores padrão de aparência de 16 bits, que permite ao central exibir um ícone sensato. Dados específicos do fabricante entram por meio de manufacturer=(company_id, data_bytes).

Um punhado de argumentos nomeados menos comuns cobre o restante do espaço de flags de anúncio:

  • connectable=False – modo somente de transmissão (nenhuma conexão é jamais aceita). A escolha certa para cargas úteis no estilo beacon.

  • limited_disc=True – usa a flag limited discoverable em vez de general discoverable; alguns sistemas operacionais tratam as duas de forma diferente em sua interface de pareamento.

  • adv_data / resp_data – bytes brutos, caso a aplicação precise de controle total sobre o layout.

  • timeout_ms – desiste após um tempo fixo. O padrão é anunciar para sempre.

Quando um central se conecta, aioble.advertise() retorna a aioble.DeviceConnection resultante. O periférico para de anunciar neste momento.

11.9.3. Atendendo um cliente

O laço principal de um periférico normalmente se parece com isto:

async def serve():
    while True:
        connection = await aioble.advertise(
            interval_us=250000,
            name="openmv-env",
            services=[ENV_SERVICE],
        )
        print("connected:", connection.device.addr_hex())
        async with connection:
            await connection.disconnected()
        print("disconnected; advertising again")

asyncio.run(serve())

async with connection torna a limpeza de desconexão automática. disconnected() é uma corrotina que se suspende até que um dos lados encerre a conexão – uma maneira limpa de manter o periférico atendendo até que o central vá embora, e então voltar ao laço para anunciar na próxima rodada.

11.9.4. Atualizando uma característica

O periférico atualiza o banco de dados GATT local com aioble.Characteristic.write()

temp_char.write(b"\\x9a\\x09")              # 24.58 deg C as sint16, 0.01 units

Isso altera o valor que a próxima read de qualquer cliente retornaria. Por si só, isso não envia o novo valor – um cliente inscrito não verá nada até que ele faça uma consulta (poll) ou que o periférico envie uma notificação explícita.

O lado de envio é um único argumento nomeado na mesma chamada:

temp_char.write(temp_bytes, send_update=True)

send_update=True notifica (ou indica) cada cliente que se inscreveu nessa característica. A maioria do código no estilo de sensor reside em uma tarefa por conexão que faz um laço lendo o sensor e escrevendo o valor com send_update=True a cada segundo, mais ou menos:

async def stream_temperature(connection):
    while connection.is_connected():
        temp_char.write(encode_temperature(read_sensor()), send_update=True)
        await asyncio.sleep(1)

async def serve():
    while True:
        connection = await aioble.advertise(
            interval_us=250000,
            name="openmv-env",
            services=[ENV_SERVICE],
        )
        async with connection:
            asyncio.create_task(stream_temperature(connection))
            await connection.disconnected()

Se você preferir direcionar uma notificação a um cliente específico em vez de todo o conjunto de inscritos (digamos, uma resposta privada da conexão ao comando daquele cliente), aioble.Characteristic.notify() e indicate() aceitam um argumento DeviceConnection e uma carga útil opcional.

11.9.5. Recebendo escritas

A outra direção – um cliente escrevendo em uma característica – torna-se disponível quando a característica é construída com write=True ou write_no_response=True. O periférico aguarda a próxima escrita com aioble.Characteristic.written()

cmd_char = aioble.Characteristic(env, CMD_UUID, write=True, capture=True)

async def handle_commands():
    while True:
        connection, data = await cmd_char.written()
        print("command from", connection.device.addr_hex(), "=", data)

Sem capture=True, written() retorna apenas a conexão que escreveu; o novo valor reside no buffer de apoio da característica e a aplicação o obtém com read(). Se uma segunda escrita chegar antes que a aplicação tenha lido a primeira, o segundo valor sobrescreve o primeiro no buffer e o valor original é perdido – written() ainda acorda a aplicação, mas apenas uma vez por “há algo novo”, e não uma vez por escrita.

O argumento nomeado capture=True corrige isso. Cada escrita recebida é adicionada a uma fila de escopo de módulo, e written() retorna uma tupla (connection, data) para cada escrita individual – o laço da aplicação vê cada uma exatamente uma vez, na ordem de chegada. Duas consequências práticas:

  • A fila é limitada e é compartilhada entre todas as características com captura habilitada no dispositivo. Pequenas rajadas de escritas consecutivas são toleradas; um excesso sustentado (escritas chegando mais rápido do que a aplicação consegue esvaziá-las) descarta silenciosamente as entradas mais antigas enfileiradas, e um tráfego em rajadas em uma característica pode despejar entradas pendentes de outra.

  • Escolha capture=True para escritas no estilo de comando, em que cada valor importa. Deixe-o desativado para características no estilo de estado, em que apenas o valor mais recente é de interesse.

Se uma leitura do cliente deve ser respondida por código executado sob demanda em vez de um valor estático, sobrescreva on_read(). O método é chamado de forma síncrona quando uma leitura chega; retorne 0 para permitir a leitura (o valor atual de write() será enviado), ou um código de erro ATT diferente de zero para rejeitá-la:

import time

_ATT_ERR_READ_NOT_PERMITTED = const(0x02)
_MIN_READ_INTERVAL_MS = const(1000)            # at most once per second

class TempChar(aioble.Characteristic):
    _last_read_ms = 0

    def on_read(self, connection):
        now = time.ticks_ms()
        if time.ticks_diff(now, self._last_read_ms) < _MIN_READ_INTERVAL_MS:
            return _ATT_ERR_READ_NOT_PERMITTED
        self._last_read_ms = now
        self.write(encode_temperature(read_sensor()))
        return 0

temp_char = TempChar(env, TEMP_UUID, read=True)

O callback amostra o sensor e atualiza o valor da característica logo antes de a pilha GATT atender a leitura, de modo que o cliente sempre veja dados atualizados. O limite de taxa impede que um cliente sobrecarregue o sensor mais rápido do que ele pode ser amostrado – qualquer leitura dentro do período de espera de um segundo é rejeitada como um erro ATT Read Not Permitted em vez de um valor desatualizado.

11.9.5.1. Buffers de apoio maiores – BufferedCharacteristic

O buffer de apoio de uma Characteristic comum tem 20 bytes de largura – o limite prático no MTU padrão de 23 bytes. Um cliente que escreve mais do que isso em uma característica comum tem seu valor truncado. Para valores recebidos maiores ou para enfileirar escritas consecutivas que o laço da aplicação alcançará mais tarde, declare a característica como BufferedCharacteristic e escolha o tamanho do buffer logo de início:

blob = aioble.BufferedCharacteristic(
    service, BLOB_UUID,
    max_len=512, append=True,
    write=True, capture=True,
)

async def receive_blob():
    while True:
        connection, chunk = await blob.written()
        handle_chunk(connection, chunk)

Dois ajustes a distinguem de uma Characteristic simples:

  • max_len é o tamanho do buffer de apoio em bytes. Escolha-o para corresponder à maior escrita única que se espera que o cliente faça (após a negociação de MTU).

  • append=True faz com que escritas sequenciais acrescentem ao buffer em vez de sobrescrevê-lo – útil para receber um valor que chega ao longo de várias escritas (blocos de atualização de firmware, linhas de log). Com append=False o buffer se comporta como uma característica normal, apenas mais larga.

Todas as outras flags do construtor (read, write, notify, indicate, capture, initial) são repassadas inalteradas para a característica subjacente.

11.9.6. Serviços padrão e os UUIDs atribuídos pelo SIG

Ater-se aos UUIDs de números atribuídos (0x180F para o Battery Service, 0x181A para Environmental Sensing, 0x180D para Heart Rate, e assim por diante) significa que o menu Bluetooth genérico de um celular ou qualquer aplicativo de scanner de terceiros pode identificar a finalidade do dispositivo sem qualquer código de cliente personalizado. O layout de bytes dentro de cada característica padrão também é fixado pela especificação – Battery Level (0x2A19) é um único byte de 0..100; Temperature (0x2A6E) é sint16 little-endian em unidades de 0,01 graus C. Para aplicações que não se encaixam em um serviço padrão, gere um UUID de 128 bits uma vez e use-o em todos os serviços e características do dispositivo.

Um periférico que publica apenas UUIDs personalizados ainda está bem – ele só precisa de um aplicativo cliente personalizado que conheça esses UUIDs.

Nota

Os valores BLE são little-endian em todos os lugares – a especificação GATT, cada característica padrão, cada campo de anúncio. Inteiros de múltiplos bytes vão para o fio com o byte menos significativo primeiro. O prefixo < nas strings de formato de struct é o que você quer para codificar/decodificar ("<h", "<H", "<I", …); usar a ordem de bytes nativa padrão em um MCU little-endian acaba funcionando por enquanto, mas escrever explicitamente < é o hábito seguro.

11.9.7. O rádio por trás de tudo

O rádio é ligado no momento em que a primeira corrotina aioble o toca. Até que um central esteja conectado, o periférico passa o tempo alternando entre breves rajadas de anúncio e suspensão; após uma conexão, ele segue o intervalo de conexão negociado. O periférico paga um pequeno custo de energia por anúncio, então a escolha de interval_us em aioble.advertise() é o ajuste mais direto que um periférico tem para equilibrar a latência de descoberta contra a vida útil da bateria.