8.14. AsyncCSI¶
Um script típico da OpenMV Cam termina em while True: img = csi0.snapshot() – um laço de snapshot bloqueante que não precisa de asyncio nenhum. No momento em que a aplicação precisa fazer outra coisa paralelamente às capturas – escutar um botão, enviar dados para um par, executar uma tarefa em segundo plano – a chamada bloqueante atrapalha. Enquanto snapshot aguarda o próximo quadro, o laço de eventos não está em execução, de modo que todas as outras corrotinas do programa ficam congeladas até o quadro chegar.
Esta página constrói um pequeno wrapper em torno de CSI que transforma snapshot em uma corrotina compatível com await. O resultado é um substituto direto que permite que um laço de captura coexista com o restante de um programa asyncio.
8.14.1. As peças¶
Uma parte da API de CSI faz a maior parte do trabalho – snapshot() em seu modo não bloqueante. Chamar snapshot(blocking=False) retorna o próximo quadro (se houver um pronto) ou None (caso contrário). A primeira chamada não bloqueante também inicia a captura por DMA da câmera, caso ainda não esteja em execução, de modo que o wrapper não precisa fazer nada de especial para iniciar.
A outra parte é asyncio.sleep_ms(). O wrapper consulta snapshots não bloqueantes em um laço, cedendo o controle ao laço de eventos com await asyncio.sleep_ms(0) entre as verificações, para que todas as outras corrotinas prontas tenham a chance de executar antes da próxima consulta.
8.14.2. O wrapper¶
import asyncio
import csi
class AsyncCSI:
def __init__(self, *args, **kwargs):
self._csi = csi.CSI(*args, **kwargs)
def __getattr__(self, name):
return getattr(self._csi, name)
async def snapshot(self):
while True:
img = self._csi.snapshot(blocking=False)
if img is not None:
return img
await asyncio.sleep_ms(0)
O construtor envolve uma instância de CSI. __getattr__ encaminha cada atributo que o próprio wrapper não define – reset, pixformat, framesize, todos os controles do sensor – para o CSI subjacente, de modo que o wrapper parece idêntico ao objeto original, exceto pelo único método que importa.
async def snapshot é a parte nova. Ele chama snapshot(blocking=False); se a chamada retornar uma imagem, a corrotina a retorna. Caso contrário, ela cede o controle de volta ao laço de eventos com await asyncio.sleep_ms(0) para que outras corrotinas tenham a chance de executar e, em seguida, repete o laço e tenta novamente. A primeira iteração inicia o DMA; as iterações seguintes captam os quadros à medida que ficam disponíveis.
8.14.3. Um laço de snapshot acompanhado¶
Com o wrapper no lugar, um laço de snapshot se encaixa em um programa asyncio maior da mesma forma que qualquer outra corrotina. O exemplo abaixo executa três corrotinas simultaneamente: o laço de captura, um piscador de LED e um heartbeat que imprime hello uma vez por segundo:
import asyncio
import csi
from machine import LED
async def capture_loop(cam):
while True:
img = await cam.snapshot()
# process img here
async def blinker(led, period_ms):
while True:
led.on()
await asyncio.sleep_ms(period_ms)
led.off()
await asyncio.sleep_ms(period_ms)
async def hello(period_s):
while True:
print("hello")
await asyncio.sleep(period_s)
async def main():
cam = AsyncCSI()
cam.reset()
cam.pixformat(csi.RGB565)
cam.framesize(csi.QVGA)
asyncio.create_task(blinker(LED("LED_BLUE"), 200))
asyncio.create_task(hello(1))
await capture_loop(cam)
asyncio.run(main())
Todas as três corrotinas avançam no mesmo laço de eventos. Enquanto capture_loop cede o controle entre as consultas de snapshot não bloqueantes, blinker alterna o LED e hello imprime. Enquanto blinker e hello estão dormindo, capture_loop consulta a câmera. O intervalo de consulta é curto – um único tick do laço de eventos – de modo que adiciona uma latência desprezível ao momento em que a aplicação vê um novo quadro.
O laço de captura não bloqueia o laço de eventos. Adicionar mais trabalho simultâneo – um cliente UART, por exemplo – é apenas mais uma chamada de create_task() dentro de main.
Nota
A configuração de framebuffers ainda importa neste formato. O modo de buffer único faz com que snapshot(blocking=False) retorne None até o próximo quadro ser capturado; o buffer duplo ou triplo suaviza isso, de modo que o wrapper geralmente encontra um quadro em buffer aguardando na primeira consulta após o quadro anterior ter sido processado.