8.14. AsyncCSI

Een typisch OpenMV Cam-script eindigt in while True: img = csi0.snapshot() – een blokkerende snapshot-lus die helemaal geen asyncio nodig heeft. Op het moment dat de applicatie iets anders moet doen naast het vastleggen van beelden – luisteren naar een knop, gegevens naar een peer sturen, een achtergrondtaak uitvoeren – staat de blokkerende aanroep in de weg. Terwijl snapshot wacht op het volgende frame draait de event loop niet, dus elke andere coroutine in het programma is bevroren totdat het frame binnenkomt.

Deze pagina bouwt een kleine wrapper rond CSI die snapshot omzet in een await-vriendelijke coroutine. Het resultaat is een drop-in-vervanging waarmee een captureluss kan samenleven met de rest van een asyncio-programma.

8.14.1. De onderdelen

Eén onderdeel van de CSI-API doet het meeste werk – snapshot() in zijn niet-blokkerende modus. Het aanroepen van snapshot(blocking=False) retourneert ofwel het volgende frame (als er een klaar is) of None (zo niet). De eerste niet-blokkerende aanroep start ook de DMA-capture van de camera als deze nog niet liep, dus de wrapper hoeft niets bijzonders te doen om op te starten.

Het andere onderdeel is asyncio.sleep_ms(). De wrapper pollt niet-blokkerende snapshots in een lus en geeft de controle terug aan de event loop met await asyncio.sleep_ms(0) tussen de controles, zodat elke andere klaarstaande coroutine een kans krijgt om te draaien vóór de volgende poll.

8.14.2. De wrapper

import asyncio
import csi


class AsyncCSI:
    def __init__(self, *args, **kwargs):
        self._csi = csi.CSI(*args, **kwargs)

    def __getattr__(self, name):
        return getattr(self._csi, name)

    async def snapshot(self):
        while True:
            img = self._csi.snapshot(blocking=False)
            if img is not None:
                return img
            await asyncio.sleep_ms(0)

De constructor wikkelt een CSI-instantie in. __getattr__ stuurt elk attribuut dat de wrapper zelf niet definieert – reset, pixformat, framesize, alle sensorinstellingen – door naar de onderliggende CSI, zodat de wrapper er identiek uitziet als het niet-ingewikkelde object, op de ene methode na die ertoe doet.

async def snapshot is het nieuwe onderdeel. Het roept snapshot(blocking=False) aan; als de aanroep een afbeelding retourneert, retourneert de coroutine deze. Anders geeft het de controle terug aan de event loop met await asyncio.sleep_ms(0) zodat andere coroutines een kans krijgen om te draaien, en gaat dan terug naar het begin van de lus om het opnieuw te proberen. De eerste iteratie start de DMA; volgende iteraties pakken frames op zodra ze beschikbaar komen.

8.14.3. Een snapshot-lus met gezelschap

Met de wrapper op zijn plaats past een snapshot-lus in een groter asyncio-programma op dezelfde manier als elke andere coroutine. Het onderstaande voorbeeld draait drie coroutines gelijktijdig: de captureluss, een LED-knipperaar en een heartbeat die één keer per seconde hello afdrukt:

import asyncio
import csi
from machine import LED


async def capture_loop(cam):
    while True:
        img = await cam.snapshot()
        # process img here

async def blinker(led, period_ms):
    while True:
        led.on()
        await asyncio.sleep_ms(period_ms)
        led.off()
        await asyncio.sleep_ms(period_ms)

async def hello(period_s):
    while True:
        print("hello")
        await asyncio.sleep(period_s)

async def main():
    cam = AsyncCSI()
    cam.reset()
    cam.pixformat(csi.RGB565)
    cam.framesize(csi.QVGA)

    asyncio.create_task(blinker(LED("LED_BLUE"), 200))
    asyncio.create_task(hello(1))
    await capture_loop(cam)

asyncio.run(main())

Alle drie de coroutines maken voortgang op dezelfde event loop. Terwijl capture_loop de controle teruggeeft tussen niet-blokkerende snapshot-polls, schakelt blinker de LED en drukt hello af. Terwijl blinker en hello slapen, pollt capture_loop de camera. Het pollinterval is kort – één enkele event-loop-tick – dus het voegt een verwaarloosbare latentie toe aan het moment waarop de applicatie een nieuw frame ziet.

De captureluss blokkeert de event loop niet. Meer gelijktijdig werk toevoegen – bijvoorbeeld een UART-client – is gewoon nog een create_task()-aanroep binnen main.

Notitie

De instelling framebuffers is in deze vorm nog steeds van belang. In de single-buffermodus retourneert snapshot(blocking=False) None totdat het volgende frame is vastgelegd; dubbele of driedubbele buffering vergemakkelijkt dit, zodat de wrapper meestal een gebufferd frame klaar vindt staan bij de eerste poll nadat het vorige frame is verwerkt.