8.14. AsyncCSI

סקריפט טיפוסי של OpenMV Cam מסתיים ב-while True: img = csi0.snapshot() – לולאת snapshot חוסמת שאינה זקוקה ל-asyncio כלל. ברגע שעל היישום לבצע משהו נוסף לצד הצילומים – להאזין ללחצן, לשלוח נתונים לעמית, להריץ משימת רקע – הקריאה החוסמת עומדת בדרך. בעוד snapshot ממתינה לפריים הבא, לולאת האירועים אינה רצה, ולכן כל קורוטינה אחרת בתוכנית קופאת עד שהפריים מגיע.

עמוד זה בונה עוטף קטן סביב CSI שהופך את snapshot לקורוטינה תואמת await. התוצאה היא תחליף מוכן לשימוש המאפשר ללולאת צילום להתקיים יחד עם שאר תוכנית asyncio.

8.14.1. הרכיבים

רכיב אחד מתוך ה-API של CSI מבצע את רוב העבודה – snapshot() במצבה הלא-חוסם. קריאה ל-snapshot(blocking=False) מחזירה את הפריים הבא (אם פריים מוכן) או None (אם לא). הקריאה הלא-חוסמת הראשונה גם מתחילה את צילום ה-DMA של המצלמה אם הוא לא רץ כבר, כך שהעוטף אינו צריך לעשות דבר מיוחד כדי לאתחל.

הרכיב השני הוא asyncio.sleep_ms(). העוטף סוקר snapshots לא-חוסמות בלולאה, ומוותר ללולאת האירועים באמצעות await asyncio.sleep_ms(0) בין הבדיקות, כך שכל קורוטינה מוכנה אחרת מקבלת הזדמנות לרוץ לפני הסקירה הבאה.

8.14.2. העוטף

import asyncio
import csi


class AsyncCSI:
    def __init__(self, *args, **kwargs):
        self._csi = csi.CSI(*args, **kwargs)

    def __getattr__(self, name):
        return getattr(self._csi, name)

    async def snapshot(self):
        while True:
            img = self._csi.snapshot(blocking=False)
            if img is not None:
                return img
            await asyncio.sleep_ms(0)

הבנאי עוטף מופע של CSI. __getattr__ מעביר כל תכונה שהעוטף אינו מגדיר בעצמו – reset, pixformat, framesize, כל כפתורי החיישן – ל-CSI הבסיסי, כך שהעוטף נראה זהה לאובייקט הלא-עטוף למעט המתודה האחת שחשובה.

async def snapshot הוא הרכיב החדש. הוא קורא ל-snapshot(blocking=False); אם הקריאה מחזירה תמונה, הקורוטינה מחזירה אותה. אחרת היא מוותרת בחזרה ללולאת האירועים באמצעות await asyncio.sleep_ms(0) כדי שקורוטינות אחרות יקבלו הזדמנות לרוץ, ואז חוזרת ללולאה ומנסה שוב. האיטרציה הראשונה מתחילה את ה-DMA; איטרציות עוקבות אוספות פריימים כשהם נעשים זמינים.

8.14.3. לולאת snapshot עם חברה

כשהעוטף במקומו, לולאת snapshot משתלבת בתוך תוכנית asyncio גדולה יותר באותו אופן שבו כל קורוטינה אחרת משתלבת. הדוגמה שלהלן מריצה שלוש קורוטינות במקביל: לולאת הצילום, מהבהב LED, ופעימת לב שמדפיסה hello פעם בשנייה:

import asyncio
import csi
from machine import LED


async def capture_loop(cam):
    while True:
        img = await cam.snapshot()
        # process img here

async def blinker(led, period_ms):
    while True:
        led.on()
        await asyncio.sleep_ms(period_ms)
        led.off()
        await asyncio.sleep_ms(period_ms)

async def hello(period_s):
    while True:
        print("hello")
        await asyncio.sleep(period_s)

async def main():
    cam = AsyncCSI()
    cam.reset()
    cam.pixformat(csi.RGB565)
    cam.framesize(csi.QVGA)

    asyncio.create_task(blinker(LED("LED_BLUE"), 200))
    asyncio.create_task(hello(1))
    await capture_loop(cam)

asyncio.run(main())

כל שלוש הקורוטינות מתקדמות על אותה לולאת אירועים. בעוד capture_loop מוותרת בין סקירות snapshot לא-חוסמות, blinker מחליפה את מצב ה-LED ו-hello מדפיסה. בעוד blinker ו-hello ישנות, capture_loop סוקרת את המצלמה. מרווח הסקירה קצר – טיק יחיד של לולאת האירועים – כך שהוא מוסיף השהיה זניחה למועד שבו היישום רואה פריים חדש.

לולאת הצילום אינה חוסמת את לולאת האירועים. הוספת עבודה מקבילית נוספת – לקוח UART, למשל – היא פשוט קריאת create_task() נוספת בתוך main.

הערה

הגדרת חוצצי הפריימים עדיין חשובה בצורה הזו. מצב חוצץ יחיד גורם ל-snapshot(blocking=False) להחזיר None עד שהפריים הבא מצולם; חציצה כפולה או משולשת מחליקה זאת כך שהעוטף בדרך כלל מוצא פריים מוחצן ממתין בסקירה הראשונה לאחר שהפריים הקודם עובד.