8.14. AsyncCSI

Typický skript pro OpenMV Cam končí konstrukcí while True: img = csi0.snapshot() – blokující smyčkou snímání, která vůbec nepotřebuje asyncio. Ve chvíli, kdy aplikace musí dělat něco jiného souběžně se snímáním – naslouchat tlačítku, posílat data protistraně, spouštět úlohu na pozadí – začne blokující volání překážet. Zatímco snapshot čeká na další snímek, smyčka událostí neběží, takže každá další korutina v programu je zmrazená, dokud snímek nedorazí.

Tato stránka vytváří kolem CSI malý wrapper, který z snapshot udělá korutinu vhodnou pro await. Výsledkem je přímá náhrada, která umožňuje, aby smyčka snímání koexistovala se zbytkem programu využívajícího asyncio.

8.14.1. Stavební prvky

Většinu práce odvede jediná část API třídy CSI – metoda snapshot() ve svém neblokujícím režimu. Volání snapshot(blocking=False) buď vrátí další snímek (pokud je připravený), nebo None (pokud není). První neblokující volání také spustí DMA snímání kamery, pokud ještě neběželo, takže wrapper nemusí dělat nic zvláštního pro inicializaci.

Druhou částí je asyncio.sleep_ms(). Wrapper ve smyčce dotazuje neblokující snímky a mezi kontrolami předává řízení smyčce událostí pomocí await asyncio.sleep_ms(0), takže každá další připravená korutina dostane šanci běžet před dalším dotazem.

8.14.2. Wrapper

import asyncio
import csi


class AsyncCSI:
    def __init__(self, *args, **kwargs):
        self._csi = csi.CSI(*args, **kwargs)

    def __getattr__(self, name):
        return getattr(self._csi, name)

    async def snapshot(self):
        while True:
            img = self._csi.snapshot(blocking=False)
            if img is not None:
                return img
            await asyncio.sleep_ms(0)

Konstruktor obaluje instanci CSI. __getattr__ přesměrovává každý atribut, který wrapper sám nedefinuje – reset, pixformat, framesize, všechny ovládací prvky senzoru – na podkladovou CSI, takže wrapper vypadá identicky jako neobalený objekt s výjimkou té jedné metody, na které záleží.

async def snapshot je nový prvek. Volá snapshot(blocking=False); pokud volání vrátí obraz, korutina jej vrátí. Jinak předá řízení zpět smyčce událostí pomocí await asyncio.sleep_ms(0), aby ostatní korutiny dostaly šanci běžet, poté se vrátí do smyčky a zkusí to znovu. První iterace spustí DMA; následující iterace přebírají snímky, jakmile jsou k dispozici.

8.14.3. Smyčka snímání ve společnosti

S nasazeným wrapperem zapadne smyčka snímání do většího programu využívajícího asyncio stejně jako jakákoli jiná korutina. Příklad níže spouští souběžně tři korutiny: smyčku snímání, blikání LED a heartbeat, který jednou za sekundu vypíše hello

import asyncio
import csi
from machine import LED


async def capture_loop(cam):
    while True:
        img = await cam.snapshot()
        # process img here

async def blinker(led, period_ms):
    while True:
        led.on()
        await asyncio.sleep_ms(period_ms)
        led.off()
        await asyncio.sleep_ms(period_ms)

async def hello(period_s):
    while True:
        print("hello")
        await asyncio.sleep(period_s)

async def main():
    cam = AsyncCSI()
    cam.reset()
    cam.pixformat(csi.RGB565)
    cam.framesize(csi.QVGA)

    asyncio.create_task(blinker(LED("LED_BLUE"), 200))
    asyncio.create_task(hello(1))
    await capture_loop(cam)

asyncio.run(main())

Všechny tři korutiny postupují na téže smyčce událostí. Zatímco capture_loop předává řízení mezi neblokujícími dotazy na snímek, blinker přepíná LED a hello vypisuje. Zatímco blinker a hello spí, capture_loop dotazuje kameru. Interval dotazování je krátký – jediný tik smyčky událostí – takže přidává zanedbatelnou latenci k okamžiku, kdy aplikace uvidí nový snímek.

Smyčka snímání neblokuje smyčku událostí. Přidání další souběžné práce – například klienta UART – je jen dalším voláním create_task() uvnitř main.

Poznámka

Nastavení framebuffers má v této podobě stále význam. V režimu s jedním bufferem vrací snapshot(blocking=False) None, dokud není zachycen další snímek; dvojité nebo trojité bufferování to vyhlazuje, takže wrapper obvykle najde připravený snímek v bufferu hned při prvním dotazu po zpracování předchozího snímku.