8.14. AsyncCSI

สคริปต์ OpenMV Cam ทั่วไปจบด้วย while True: img = csi0.snapshot() ซึ่งเป็นลูปสแนปช็อตที่บล็อกและไม่ต้องการ asyncio เลย ทันทีที่แอปพลิเคชันต้องทำ สิ่งอื่น ควบคู่ไปกับการจับภาพ ไม่ว่าจะเป็นการฟังปุ่ม การส่งข้อมูลไปยังอุปกรณ์คู่ หรือการรันงานพื้นหลัง การเรียกที่บล็อกก็ขัดขวาง ขณะที่ snapshot รอเฟรมถัดไป event loop ไม่ได้ทำงาน ดังนั้นคอรูทีนอื่นๆ ทุกตัวในโปรแกรมจะถูกหยุดจนกว่าเฟรมจะมาถึง

หน้านี้สร้างตัวครอบขนาดเล็กรอบ CSI ที่เปลี่ยน snapshot ให้เป็นคอรูทีนที่รองรับ await ผลลัพธ์คือการแทนที่แบบ drop-in ที่ช่วยให้ลูปจับภาพอยู่ร่วมกับโปรแกรม asyncio ที่เหลือได้

8.14.1. ส่วนประกอบ

ส่วนหนึ่งของ API CSI ทำงานส่วนใหญ่ คือ snapshot() ในโหมดไม่บล็อก การเรียก snapshot(blocking=False) จะคืนเฟรมถัดไป (ถ้ามีพร้อม) หรือ None (ถ้าไม่มี) การเรียกแบบไม่บล็อกครั้งแรกยัง เริ่มต้น การจับภาพ DMA ของกล้องหากยังไม่ได้ทำงาน ดังนั้นตัวครอบไม่จำเป็นต้องทำอะไรพิเศษในการบูตสแตรป

ส่วนอื่นคือ asyncio.sleep_ms() ตัวครอบสำรวจ snapshot แบบไม่บล็อกในลูป โดยยอมให้ event loop ทำงานด้วย await asyncio.sleep_ms(0) ระหว่างการตรวจสอบ เพื่อให้คอรูทีนที่พร้อมอื่นๆ มีโอกาสทำงานก่อนการสำรวจครั้งถัดไป

8.14.2. ตัวครอบ

import asyncio
import csi


class AsyncCSI:
    def __init__(self, *args, **kwargs):
        self._csi = csi.CSI(*args, **kwargs)

    def __getattr__(self, name):
        return getattr(self._csi, name)

    async def snapshot(self):
        while True:
            img = self._csi.snapshot(blocking=False)
            if img is not None:
                return img
            await asyncio.sleep_ms(0)

ตัวสร้างครอบอินสแตนซ์ CSI __getattr__ ส่งต่อทุกแอตทริบิวต์ที่ตัวครอบไม่ได้กำหนดเอง ไม่ว่าจะเป็น reset, pixformat, framesize และปุ่มปรับ sensor knobs ทั้งหมด ไปยัง CSI ที่อยู่ภายใน ดังนั้นตัวครอบจึงดูเหมือนกันกับอ็อบเจกต์ที่ไม่ได้ครอบยกเว้นเมธอดเดียวที่สำคัญ

async def snapshot คือส่วนใหม่ มันเรียก snapshot(blocking=False) หากการเรียกคืนภาพ คอรูทีนจะคืนค่ามัน มิฉะนั้นมันจะยอมกลับไปยัง event loop ด้วย await asyncio.sleep_ms(0) เพื่อให้คอรูทีนอื่นมีโอกาสทำงาน แล้ววนกลับและลองอีกครั้ง การวนซ้ำครั้งแรกเริ่ม DMA การวนซ้ำถัดไปรับเฟรมเมื่อพร้อม

8.14.3. ลูปสแนปช็อตพร้อมเพื่อน

เมื่อตัวครอบอยู่ในตำแหน่ง ลูปสแนปช็อตจะพอดีกับโปรแกรม asyncio ที่ใหญ่กว่าในแบบเดียวกับคอรูทีนอื่นๆ ตัวอย่างด้านล่างรันสามคอรูทีนพร้อมกัน: ลูปจับภาพ ตัวกะพริบ LED และ heartbeat ที่พิมพ์ hello ทุกวินาที:

import asyncio
import csi
from machine import LED


async def capture_loop(cam):
    while True:
        img = await cam.snapshot()
        # process img here

async def blinker(led, period_ms):
    while True:
        led.on()
        await asyncio.sleep_ms(period_ms)
        led.off()
        await asyncio.sleep_ms(period_ms)

async def hello(period_s):
    while True:
        print("hello")
        await asyncio.sleep(period_s)

async def main():
    cam = AsyncCSI()
    cam.reset()
    cam.pixformat(csi.RGB565)
    cam.framesize(csi.QVGA)

    asyncio.create_task(blinker(LED("LED_BLUE"), 200))
    asyncio.create_task(hello(1))
    await capture_loop(cam)

asyncio.run(main())

คอรูทีนทั้งสามก้าวหน้าบน event loop เดียวกัน ขณะที่ capture_loop กำลังยอมระหว่างการสำรวจสแนปช็อตแบบไม่บล็อก blinker สลับ LED และ hello พิมพ์ ขณะที่ blinker และ hello กำลังนอน capture_loop สำรวจกล้อง ช่วงการสำรวจสั้นมาก ซึ่งก็คือ event-loop tick เดียว ดังนั้นจึงเพิ่มเวลาแฝงน้อยมากเมื่อแอปพลิเคชันเห็นเฟรมใหม่

ลูปจับภาพ ไม่ได้ บล็อก event loop การเพิ่มงานพร้อมกันอื่นๆ เช่น UART client เป็นเพียงการเรียก create_task() อีกครั้งภายใน main

Note

การตั้งค่า framebuffers ยังคงมีความสำคัญในรูปแบบนี้ โหมด single-buffer ทำให้ snapshot(blocking=False) คืนค่า None จนกว่าจะจับเฟรมถัดไป การบัฟเฟอร์คู่หรือสามชั้นทำให้ราบเรียบขึ้นเพื่อให้ตัวครอบมักพบเฟรมที่บัฟเฟอร์ไว้รออยู่ในการสำรวจครั้งแรกหลังจากเฟรมก่อนหน้าได้รับการประมวลผล