8.14. AsyncCSI

Un script OpenMV Cam typique se termine par while True: img = csi0.snapshot() – une boucle de capture bloquante qui n’a pas du tout besoin d’asyncio. Dès que l’application doit faire autre chose en parallèle des captures – écouter un bouton, envoyer des données à un pair, exécuter une tâche d’arrière-plan – l’appel bloquant devient gênant. Pendant que snapshot attend la prochaine trame, la boucle d’événements ne tourne pas, de sorte que toutes les autres coroutines du programme sont figées jusqu’à l’arrivée de la trame.

Cette page construit un petit wrapper autour de CSI qui transforme snapshot en une coroutine compatible avec await. Le résultat est un remplacement transparent qui permet à une boucle de capture de coexister avec le reste d’un programme asyncio.

8.14.1. Les éléments

Une partie de l’API CSI fait l’essentiel du travail – snapshot() dans son mode non bloquant. L’appel de snapshot(blocking=False) renvoie soit la prochaine trame (si une est prête), soit None (sinon). Le premier appel non bloquant démarre également la capture DMA de la caméra si elle n’était pas déjà en cours, de sorte que le wrapper n’a rien de particulier à faire pour s’initialiser.

L’autre partie est asyncio.sleep_ms(). Le wrapper interroge les captures non bloquantes dans une boucle, en redonnant la main à la boucle d’événements avec await asyncio.sleep_ms(0) entre les vérifications, afin que toutes les autres coroutines prêtes aient une chance de s’exécuter avant la prochaine interrogation.

8.14.2. Le wrapper

import asyncio
import csi


class AsyncCSI:
    def __init__(self, *args, **kwargs):
        self._csi = csi.CSI(*args, **kwargs)

    def __getattr__(self, name):
        return getattr(self._csi, name)

    async def snapshot(self):
        while True:
            img = self._csi.snapshot(blocking=False)
            if img is not None:
                return img
            await asyncio.sleep_ms(0)

Le constructeur enveloppe une instance CSI. __getattr__ transfère chaque attribut que le wrapper ne définit pas lui-même – reset, pixformat, framesize, tous les réglages du capteur – vers le CSI sous-jacent, de sorte que le wrapper paraît identique à l’objet non enveloppé à l’exception de la seule méthode qui compte.

async def snapshot est l’élément nouveau. Il appelle snapshot(blocking=False) ; si l’appel renvoie une image, la coroutine la renvoie. Sinon, elle redonne la main à la boucle d’événements avec await asyncio.sleep_ms(0) afin que les autres coroutines aient une chance de s’exécuter, puis recommence et réessaie. La première itération démarre la DMA ; les itérations suivantes récupèrent les trames à mesure qu’elles deviennent disponibles.

8.14.3. Une boucle de capture en bonne compagnie

Avec le wrapper en place, une boucle de capture s’intègre dans un programme asyncio plus vaste de la même manière que n’importe quelle autre coroutine. L’exemple ci-dessous exécute trois coroutines simultanément : la boucle de capture, un clignotant de LED et un battement de cœur qui affiche hello une fois par seconde

import asyncio
import csi
from machine import LED


async def capture_loop(cam):
    while True:
        img = await cam.snapshot()
        # process img here

async def blinker(led, period_ms):
    while True:
        led.on()
        await asyncio.sleep_ms(period_ms)
        led.off()
        await asyncio.sleep_ms(period_ms)

async def hello(period_s):
    while True:
        print("hello")
        await asyncio.sleep(period_s)

async def main():
    cam = AsyncCSI()
    cam.reset()
    cam.pixformat(csi.RGB565)
    cam.framesize(csi.QVGA)

    asyncio.create_task(blinker(LED("LED_BLUE"), 200))
    asyncio.create_task(hello(1))
    await capture_loop(cam)

asyncio.run(main())

Les trois coroutines progressent sur la même boucle d’événements. Pendant que capture_loop redonne la main entre les interrogations de captures non bloquantes, blinker bascule la LED et hello s’affiche. Pendant que blinker et hello sont en sommeil, capture_loop interroge la caméra. L’intervalle d’interrogation est court – un seul tic de boucle d’événements – de sorte qu’il ajoute une latence négligeable au moment où l’application voit une nouvelle trame.

La boucle de capture ne bloque pas la boucle d’événements. Ajouter davantage de travail concurrent – un client UART, par exemple – n’est qu’un appel create_task() supplémentaire à l’intérieur de main.

Note

Le réglage des framebuffers reste important dans cette configuration. Le mode à tampon unique fait que snapshot(blocking=False) renvoie None jusqu’à ce que la trame suivante soit capturée ; le double ou triple tampon lisse cela de sorte que le wrapper trouve généralement une trame en mémoire tampon disponible dès la première interrogation après le traitement de la trame précédente.