8.14. AsyncCSI

Un script tipic pentru OpenMV Cam se termină cu while True: img = csi0.snapshot() – o buclă de instantanee blocantă care nu are deloc nevoie de asyncio. Din momentul în care aplicația trebuie să facă altceva în paralel cu capturile – să asculte un buton, să trimită date către un partener, să ruleze o sarcină de fundal – apelul blocant devine un obstacol. Cât timp snapshot așteaptă următorul cadru, bucla de evenimente nu rulează, așa că orice altă corutină din program este înghețată până când cadrul sosește.

Această pagină construiește un mic strat de încapsulare în jurul CSI care transformă snapshot într-o corutină compatibilă cu await. Rezultatul este un înlocuitor direct care permite unei bucle de captură să coexiste cu restul unui program asyncio.

8.14.1. Componentele

O singură parte a API-ului CSI face cea mai mare parte a treabă – snapshot() în modul său non-blocant. Apelarea snapshot(blocking=False) fie returnează următorul cadru (dacă unul este gata), fie None (dacă nu). Primul apel non-blocant pornește totodată captura DMA a camerei, dacă aceasta nu rula deja, astfel încât stratul de încapsulare nu trebuie să facă nimic special pentru inițializare.

Cealaltă parte este asyncio.sleep_ms(). Stratul de încapsulare interoghează instantanee non-blocante într-o buclă, cedând controlul buclei de evenimente prin await asyncio.sleep_ms(0) între verificări, astfel încât orice altă corutină pregătită să aibă șansa de a rula înainte de următoarea interogare.

8.14.2. Stratul de încapsulare

import asyncio
import csi


class AsyncCSI:
    def __init__(self, *args, **kwargs):
        self._csi = csi.CSI(*args, **kwargs)

    def __getattr__(self, name):
        return getattr(self._csi, name)

    async def snapshot(self):
        while True:
            img = self._csi.snapshot(blocking=False)
            if img is not None:
                return img
            await asyncio.sleep_ms(0)

Constructorul încapsulează o instanță CSI. __getattr__ redirecționează fiecare atribut pe care stratul de încapsulare nu îl definește el însuși – reset, pixformat, framesize, toate comenzile senzorului – către instanța CSI subiacentă, astfel încât stratul de încapsulare arată identic cu obiectul neîncapsulat, cu excepția singurei metode care contează.

async def snapshot este partea nouă. Apelează snapshot(blocking=False); dacă apelul returnează o imagine, corutina o returnează. În caz contrar, cedează controlul buclei de evenimente prin await asyncio.sleep_ms(0), astfel încât alte corutine să aibă șansa de a rula, apoi reia bucla și încearcă din nou. Prima iterație pornește DMA-ul; iterațiile ulterioare preiau cadrele pe măsură ce devin disponibile.

8.14.3. O buclă de instantanee cu companie

Cu stratul de încapsulare la locul lui, o buclă de instantanee se integrează într-un program asyncio mai amplu la fel ca orice altă corutină. Exemplul de mai jos rulează trei corutine concurent: bucla de captură, un intermitent pentru LED și un semnal de prezență care afișează hello o dată pe secundă:

import asyncio
import csi
from machine import LED


async def capture_loop(cam):
    while True:
        img = await cam.snapshot()
        # process img here

async def blinker(led, period_ms):
    while True:
        led.on()
        await asyncio.sleep_ms(period_ms)
        led.off()
        await asyncio.sleep_ms(period_ms)

async def hello(period_s):
    while True:
        print("hello")
        await asyncio.sleep(period_s)

async def main():
    cam = AsyncCSI()
    cam.reset()
    cam.pixformat(csi.RGB565)
    cam.framesize(csi.QVGA)

    asyncio.create_task(blinker(LED("LED_BLUE"), 200))
    asyncio.create_task(hello(1))
    await capture_loop(cam)

asyncio.run(main())

Toate cele trei corutine progresează pe aceeași buclă de evenimente. Cât timp capture_loop cedează controlul între interogările de instantanee non-blocante, blinker comută LED-ul, iar hello afișează. Cât timp blinker și hello dorm, capture_loop interoghează camera. Intervalul de interogare este scurt – un singur tic al buclei de evenimente – așa că adaugă o latență neglijabilă momentului în care aplicația vede un cadru nou.

Bucla de captură nu blochează bucla de evenimente. Adăugarea de mai multă muncă concurentă – un client UART, de exemplu – este doar un alt apel create_task() în interiorul main.

Notă

Setarea framebuffers rămâne importantă și în această formă. Modul cu un singur tampon face ca snapshot(blocking=False) să returneze None până la capturarea următorului cadru; tamponarea dublă sau triplă netezește acest aspect, astfel încât stratul de încapsulare găsește de obicei un cadru tamponat în așteptare la prima interogare după ce cadrul anterior a fost procesat.