8.14. AsyncCSI

일반적인 OpenMV Cam 스크립트는 while True: img = csi0.snapshot() 로 끝납니다 – 이는 asyncio가 전혀 필요 없는 블로킹 스냅샷 루프입니다. 애플리케이션이 캡처와 함께 다른 무언가를 해야 하는 순간 – 버튼 입력을 듣거나, 피어에게 데이터를 보내거나, 백그라운드 작업을 실행하는 등 – 블로킹 호출이 방해가 됩니다. snapshot 이 다음 프레임을 기다리는 동안 이벤트 루프는 실행되지 않으므로, 프레임이 도착할 때까지 프로그램의 다른 모든 코루틴이 멈춰 있게 됩니다.

이 페이지에서는 CSI 를 감싸는 작은 래퍼를 만들어 snapshotawait 친화적인 코루틴으로 바꿉니다. 그 결과로 캡처 루프가 나머지 asyncio 프로그램과 공존할 수 있게 해주는 즉시 교체 가능한 대체물이 만들어집니다.

8.14.1. 구성 요소

CSI API의 한 부분이 대부분의 작업을 담당합니다 – 바로 비블로킹 모드의 snapshot() 입니다. snapshot(blocking=False) 을 호출하면 다음 프레임이 준비되어 있으면 그것을 반환하고, 그렇지 않으면 None 을 반환합니다. 첫 번째 비블로킹 호출은 또한 카메라의 DMA 캡처가 아직 실행 중이 아니라면 그것을 시작하므로, 래퍼가 부트스트랩을 위해 특별한 일을 할 필요가 없습니다.

다른 한 부분은 asyncio.sleep_ms() 입니다. 래퍼는 루프 안에서 비블로킹 스냅샷을 폴링하며, 검사 사이에 await asyncio.sleep_ms(0) 으로 이벤트 루프에 제어권을 양보하여 다음 폴링 전에 준비된 다른 모든 코루틴이 실행될 기회를 얻도록 합니다.

8.14.2. 래퍼

import asyncio
import csi


class AsyncCSI:
    def __init__(self, *args, **kwargs):
        self._csi = csi.CSI(*args, **kwargs)

    def __getattr__(self, name):
        return getattr(self._csi, name)

    async def snapshot(self):
        while True:
            img = self._csi.snapshot(blocking=False)
            if img is not None:
                return img
            await asyncio.sleep_ms(0)

생성자는 CSI 인스턴스를 감쌉니다. __getattr__ 은 래퍼 자체가 정의하지 않은 모든 속성 – reset, pixformat, framesize, 그리고 모든 센서 조절값 – 을 기반이 되는 CSI 로 전달하므로, 래퍼는 중요한 그 하나의 메서드를 제외하고는 감싸지 않은 객체와 동일하게 보입니다.

async def snapshot 이 새로운 부분입니다. 이는 snapshot(blocking=False) 을 호출하며, 호출이 이미지를 반환하면 코루틴은 그것을 반환합니다. 그렇지 않으면 await asyncio.sleep_ms(0) 으로 이벤트 루프에 제어권을 양보하여 다른 코루틴이 실행될 기회를 얻도록 한 다음, 다시 루프를 돌아 재시도합니다. 첫 번째 반복은 DMA를 시작하고, 이후 반복은 프레임이 사용 가능해지는 대로 가져옵니다.

8.14.3. 동료가 있는 스냅샷 루프

래퍼가 자리를 잡으면, 스냅샷 루프는 다른 어떤 코루틴과도 같은 방식으로 더 큰 asyncio 프로그램에 들어맞습니다. 아래 예제는 세 개의 코루틴을 동시에 실행합니다: 캡처 루프, LED 점멸기, 그리고 1초에 한 번씩 hello 를 출력하는 하트비트입니다:

import asyncio
import csi
from machine import LED


async def capture_loop(cam):
    while True:
        img = await cam.snapshot()
        # process img here

async def blinker(led, period_ms):
    while True:
        led.on()
        await asyncio.sleep_ms(period_ms)
        led.off()
        await asyncio.sleep_ms(period_ms)

async def hello(period_s):
    while True:
        print("hello")
        await asyncio.sleep(period_s)

async def main():
    cam = AsyncCSI()
    cam.reset()
    cam.pixformat(csi.RGB565)
    cam.framesize(csi.QVGA)

    asyncio.create_task(blinker(LED("LED_BLUE"), 200))
    asyncio.create_task(hello(1))
    await capture_loop(cam)

asyncio.run(main())

세 코루틴 모두 동일한 이벤트 루프에서 진행됩니다. capture_loop 이 비블로킹 스냅샷 폴링 사이에서 제어권을 양보하는 동안, blinker 는 LED를 토글하고 hello 는 출력합니다. blinkerhello 가 잠들어 있는 동안, capture_loop 은 카메라를 폴링합니다. 폴링 간격은 짧습니다 – 단일 이벤트 루프 틱이므로 – 애플리케이션이 새 프레임을 보게 되는 시점에 미치는 지연 시간은 무시할 만큼 적습니다.

캡처 루프는 이벤트 루프를 블로킹하지 않습니다. 더 많은 동시 작업을 추가하는 것 – 예를 들어 UART 클라이언트 – 은 main 안에서 또 하나의 create_task() 호출일 뿐입니다.

참고

프레임버퍼 설정은 이러한 형태에서도 여전히 중요합니다. 단일 버퍼 모드는 다음 프레임이 캡처될 때까지 snapshot(blocking=False)None 을 반환하게 만듭니다. 더블 또는 트리플 버퍼링은 이를 매끄럽게 해주어, 이전 프레임이 처리된 후 첫 폴링에서 래퍼가 대개 버퍼링된 프레임을 발견하도록 합니다.