8.14. AsyncCSI

Um script típico da OpenMV Cam termina com while True: img = csi0.snapshot() – um ciclo de captura bloqueante que não precisa de asyncio de todo. No momento em que a aplicação tem de fazer outra coisa a par das capturas – escutar um botão, enviar dados a um par, executar uma tarefa em segundo plano – a chamada bloqueante atrapalha. Enquanto snapshot aguarda pelo próximo fotograma, o ciclo de eventos não está a executar, pelo que qualquer outra corrotina no programa fica suspensa até o fotograma chegar.

Esta página constrói um pequeno invólucro em torno de CSI que converte snapshot numa corrotina compatível com await. O resultado é um substituto direto que permite que um ciclo de captura coexista com o resto de um programa asyncio.

8.14.1. As peças

Uma parte da API CSI faz a maior parte do trabalho – snapshot() no seu modo não bloqueante. Chamar snapshot(blocking=False) devolve o próximo fotograma (se estiver pronto) ou None (se não estiver). A primeira chamada não bloqueante também inicia a captura DMA da câmara se já não estivesse a correr, pelo que o invólucro não precisa de fazer nada de especial para arrancar.

A outra peça é asyncio.sleep_ms(). O invólucro faz polling de snapshots não bloqueantes num ciclo, cedendo ao ciclo de eventos com await asyncio.sleep_ms(0) entre verificações para que cada outra corrotina pronta tenha a oportunidade de executar antes do próximo polling.

8.14.2. O invólucro

import asyncio
import csi


class AsyncCSI:
    def __init__(self, *args, **kwargs):
        self._csi = csi.CSI(*args, **kwargs)

    def __getattr__(self, name):
        return getattr(self._csi, name)

    async def snapshot(self):
        while True:
            img = self._csi.snapshot(blocking=False)
            if img is not None:
                return img
            await asyncio.sleep_ms(0)

O construtor envolve uma instância de CSI. __getattr__ reencaminha cada atributo que o invólucro não define por si mesmo – reset, pixformat, framesize, todos os controlos do sensor – para o CSI subjacente, de modo que o invólucro parece idêntico ao objeto não envolvido, exceto pelo único método que importa.

async def snapshot é a nova peça. Chama snapshot(blocking=False); se a chamada devolver uma imagem, a corrotina devolve-a. Caso contrário, cede ao ciclo de eventos com await asyncio.sleep_ms(0) para que outras corrotinas tenham a oportunidade de executar, depois volta ao ciclo e tenta novamente. A primeira iteração inicia o DMA; as iterações seguintes recolhem fotogramas à medida que ficam disponíveis.

8.14.3. Um ciclo de captura com companhia

Com o invólucro em vigor, um ciclo de captura encaixa num programa asyncio maior da mesma forma que qualquer outra corrotina. O exemplo abaixo executa três corrotinas em simultâneo: o ciclo de captura, um intermitente LED, e um heartbeat que imprime hello uma vez por segundo:

import asyncio
import csi
from machine import LED


async def capture_loop(cam):
    while True:
        img = await cam.snapshot()
        # process img here

async def blinker(led, period_ms):
    while True:
        led.on()
        await asyncio.sleep_ms(period_ms)
        led.off()
        await asyncio.sleep_ms(period_ms)

async def hello(period_s):
    while True:
        print("hello")
        await asyncio.sleep(period_s)

async def main():
    cam = AsyncCSI()
    cam.reset()
    cam.pixformat(csi.RGB565)
    cam.framesize(csi.QVGA)

    asyncio.create_task(blinker(LED("LED_BLUE"), 200))
    asyncio.create_task(hello(1))
    await capture_loop(cam)

asyncio.run(main())

As três corrotinas progridem no mesmo ciclo de eventos. Enquanto capture_loop cede entre os pollings de snapshot não bloqueantes, blinker alterna o LED e hello imprime. Enquanto blinker e hello estão a dormir, capture_loop faz polling à câmara. O intervalo de polling é curto – um único tick do ciclo de eventos – pelo que adiciona latência negligenciável ao momento em que a aplicação vê um novo fotograma.

O ciclo de captura não bloqueia o ciclo de eventos. Adicionar mais trabalho em simultâneo – um cliente UART, por exemplo – é apenas mais uma chamada a create_task() dentro de main.

Nota

A definição de framebuffers ainda é relevante nesta forma. O modo de buffer único faz com que snapshot(blocking=False) devolva None até que o próximo fotograma seja capturado; o buffer duplo ou triplo suaviza isso para que o invólucro normalmente encontre um fotograma em buffer à espera na primeira verificação depois de o fotograma anterior ter sido processado.