8.14. AsyncCSI¶
Um script típico da OpenMV Cam termina com while True: img = csi0.snapshot() – um ciclo de captura bloqueante que não precisa de asyncio de todo. No momento em que a aplicação tem de fazer outra coisa a par das capturas – escutar um botão, enviar dados a um par, executar uma tarefa em segundo plano – a chamada bloqueante atrapalha. Enquanto snapshot aguarda pelo próximo fotograma, o ciclo de eventos não está a executar, pelo que qualquer outra corrotina no programa fica suspensa até o fotograma chegar.
Esta página constrói um pequeno invólucro em torno de CSI que converte snapshot numa corrotina compatível com await. O resultado é um substituto direto que permite que um ciclo de captura coexista com o resto de um programa asyncio.
8.14.1. As peças¶
Uma parte da API CSI faz a maior parte do trabalho – snapshot() no seu modo não bloqueante. Chamar snapshot(blocking=False) devolve o próximo fotograma (se estiver pronto) ou None (se não estiver). A primeira chamada não bloqueante também inicia a captura DMA da câmara se já não estivesse a correr, pelo que o invólucro não precisa de fazer nada de especial para arrancar.
A outra peça é asyncio.sleep_ms(). O invólucro faz polling de snapshots não bloqueantes num ciclo, cedendo ao ciclo de eventos com await asyncio.sleep_ms(0) entre verificações para que cada outra corrotina pronta tenha a oportunidade de executar antes do próximo polling.
8.14.2. O invólucro¶
import asyncio
import csi
class AsyncCSI:
def __init__(self, *args, **kwargs):
self._csi = csi.CSI(*args, **kwargs)
def __getattr__(self, name):
return getattr(self._csi, name)
async def snapshot(self):
while True:
img = self._csi.snapshot(blocking=False)
if img is not None:
return img
await asyncio.sleep_ms(0)
O construtor envolve uma instância de CSI. __getattr__ reencaminha cada atributo que o invólucro não define por si mesmo – reset, pixformat, framesize, todos os controlos do sensor – para o CSI subjacente, de modo que o invólucro parece idêntico ao objeto não envolvido, exceto pelo único método que importa.
async def snapshot é a nova peça. Chama snapshot(blocking=False); se a chamada devolver uma imagem, a corrotina devolve-a. Caso contrário, cede ao ciclo de eventos com await asyncio.sleep_ms(0) para que outras corrotinas tenham a oportunidade de executar, depois volta ao ciclo e tenta novamente. A primeira iteração inicia o DMA; as iterações seguintes recolhem fotogramas à medida que ficam disponíveis.
8.14.3. Um ciclo de captura com companhia¶
Com o invólucro em vigor, um ciclo de captura encaixa num programa asyncio maior da mesma forma que qualquer outra corrotina. O exemplo abaixo executa três corrotinas em simultâneo: o ciclo de captura, um intermitente LED, e um heartbeat que imprime hello uma vez por segundo:
import asyncio
import csi
from machine import LED
async def capture_loop(cam):
while True:
img = await cam.snapshot()
# process img here
async def blinker(led, period_ms):
while True:
led.on()
await asyncio.sleep_ms(period_ms)
led.off()
await asyncio.sleep_ms(period_ms)
async def hello(period_s):
while True:
print("hello")
await asyncio.sleep(period_s)
async def main():
cam = AsyncCSI()
cam.reset()
cam.pixformat(csi.RGB565)
cam.framesize(csi.QVGA)
asyncio.create_task(blinker(LED("LED_BLUE"), 200))
asyncio.create_task(hello(1))
await capture_loop(cam)
asyncio.run(main())
As três corrotinas progridem no mesmo ciclo de eventos. Enquanto capture_loop cede entre os pollings de snapshot não bloqueantes, blinker alterna o LED e hello imprime. Enquanto blinker e hello estão a dormir, capture_loop faz polling à câmara. O intervalo de polling é curto – um único tick do ciclo de eventos – pelo que adiciona latência negligenciável ao momento em que a aplicação vê um novo fotograma.
O ciclo de captura não bloqueia o ciclo de eventos. Adicionar mais trabalho em simultâneo – um cliente UART, por exemplo – é apenas mais uma chamada a create_task() dentro de main.
Nota
A definição de framebuffers ainda é relevante nesta forma. O modo de buffer único faz com que snapshot(blocking=False) devolva None até que o próximo fotograma seja capturado; o buffer duplo ou triplo suaviza isso para que o invólucro normalmente encontre um fotograma em buffer à espera na primeira verificação depois de o fotograma anterior ter sido processado.