8.14. AsyncCSI

Типовий скрипт OpenMV Cam завершується while True: img = csi0.snapshot() – блокуючим циклом знімків, якому asyncio взагалі не потрібен. Як тільки програмі потрібно робити щось ще поряд із захопленням – слухати кнопку, надсилати дані однорангу, виконувати фонову задачу – блокуючий виклик стає на заваді. Поки snapshot чекає наступного кадру, цикл подій не працює, тому кожна інша корутина в програмі заморожена до появи кадру.

Ця сторінка будує невеликий обгортковий клас навколо CSI, який перетворює snapshot на корутину, сумісну з await. Результат – це замінник, який дозволяє циклу захоплення співіснувати з рештою програми asyncio.

8.14.1. Складові частини

Одна частина API CSI виконує більшість роботи – snapshot() у неблокуючому режимі. Виклик snapshot(blocking=False) або повертає наступний кадр (якщо він готовий), або None (якщо ні). Перший неблокуючий виклик також запускає DMA-захоплення камери, якщо воно ще не виконувалося, тому оболонці не потрібно нічого особливого робити для ініціалізації.

Інша частина – asyncio.sleep_ms(). Оболонка здійснює опитування неблокуючих знімків у циклі, поступаючись циклу подій через await asyncio.sleep_ms(0) між перевірками, щоб кожна інша готова корутина мала можливість виконатися перед наступним опитуванням.

8.14.2. Оболонка

import asyncio
import csi


class AsyncCSI:
    def __init__(self, *args, **kwargs):
        self._csi = csi.CSI(*args, **kwargs)

    def __getattr__(self, name):
        return getattr(self._csi, name)

    async def snapshot(self):
        while True:
            img = self._csi.snapshot(blocking=False)
            if img is not None:
                return img
            await asyncio.sleep_ms(0)

Конструктор обгортає екземпляр CSI. __getattr__ перенаправляє кожен атрибут, який оболонка не визначає сама – reset, pixformat, framesize, всі параметри датчика – до базового CSI, тому оболонка виглядає ідентично незагорнутому об’єкту, за винятком одного важливого методу.

async def snapshot – це нова частина. Вона викликає snapshot(blocking=False); якщо виклик повертає зображення, корутина повертає його. Інакше вона поступається циклу подій через await asyncio.sleep_ms(0), щоб інші корутини могли виконатися, а потім повертається до циклу і пробує знову. Перша ітерація запускає DMA; наступні ітерації підхоплюють кадри в міру їх надходження.

8.14.3. Цикл знімків з компанією

З оболонкою на місці цикл знімків вписується в більшу програму asyncio так само, як і будь-яка інша корутина. Наведений нижче приклад запускає три корутини одночасно: цикл захоплення, мигавку LED та серцебиття, яке друкує hello раз на секунду:

import asyncio
import csi
from machine import LED


async def capture_loop(cam):
    while True:
        img = await cam.snapshot()
        # process img here

async def blinker(led, period_ms):
    while True:
        led.on()
        await asyncio.sleep_ms(period_ms)
        led.off()
        await asyncio.sleep_ms(period_ms)

async def hello(period_s):
    while True:
        print("hello")
        await asyncio.sleep(period_s)

async def main():
    cam = AsyncCSI()
    cam.reset()
    cam.pixformat(csi.RGB565)
    cam.framesize(csi.QVGA)

    asyncio.create_task(blinker(LED("LED_BLUE"), 200))
    asyncio.create_task(hello(1))
    await capture_loop(cam)

asyncio.run(main())

Усі три корутини виконуються на одному циклі подій. Поки capture_loop поступається між неблокуючими опитуваннями знімків, blinker перемикає LED і hello друкує. Поки blinker і hello сплять, capture_loop опитує камеру. Інтервал опитування короткий – один тік циклу подій – тому він додає незначну затримку до моменту, коли програма бачить новий кадр.

Цикл захоплення не блокує цикл подій. Додавання більшої кількості паралельної роботи – наприклад, UART-клієнта – це просто ще один виклик create_task() всередині main.

Примітка

Налаштування кадрових буферів як і раніше важливе у цій конфігурації. Режим одного буфера змушує snapshot(blocking=False) повертати None до захоплення наступного кадру; подвійне або потрійне буферування згладжує це, тому оболонка зазвичай знаходить буферизований кадр, готовий на першому опитуванні після обробки попереднього кадру.