8.14. AsyncCSI¶
Типовий скрипт OpenMV Cam завершується while True: img = csi0.snapshot() – блокуючим циклом знімків, якому asyncio взагалі не потрібен. Як тільки програмі потрібно робити щось ще поряд із захопленням – слухати кнопку, надсилати дані однорангу, виконувати фонову задачу – блокуючий виклик стає на заваді. Поки snapshot чекає наступного кадру, цикл подій не працює, тому кожна інша корутина в програмі заморожена до появи кадру.
Ця сторінка будує невеликий обгортковий клас навколо CSI, який перетворює snapshot на корутину, сумісну з await. Результат – це замінник, який дозволяє циклу захоплення співіснувати з рештою програми asyncio.
8.14.1. Складові частини¶
Одна частина API CSI виконує більшість роботи – snapshot() у неблокуючому режимі. Виклик snapshot(blocking=False) або повертає наступний кадр (якщо він готовий), або None (якщо ні). Перший неблокуючий виклик також запускає DMA-захоплення камери, якщо воно ще не виконувалося, тому оболонці не потрібно нічого особливого робити для ініціалізації.
Інша частина – asyncio.sleep_ms(). Оболонка здійснює опитування неблокуючих знімків у циклі, поступаючись циклу подій через await asyncio.sleep_ms(0) між перевірками, щоб кожна інша готова корутина мала можливість виконатися перед наступним опитуванням.
8.14.2. Оболонка¶
import asyncio
import csi
class AsyncCSI:
def __init__(self, *args, **kwargs):
self._csi = csi.CSI(*args, **kwargs)
def __getattr__(self, name):
return getattr(self._csi, name)
async def snapshot(self):
while True:
img = self._csi.snapshot(blocking=False)
if img is not None:
return img
await asyncio.sleep_ms(0)
Конструктор обгортає екземпляр CSI. __getattr__ перенаправляє кожен атрибут, який оболонка не визначає сама – reset, pixformat, framesize, всі параметри датчика – до базового CSI, тому оболонка виглядає ідентично незагорнутому об’єкту, за винятком одного важливого методу.
async def snapshot – це нова частина. Вона викликає snapshot(blocking=False); якщо виклик повертає зображення, корутина повертає його. Інакше вона поступається циклу подій через await asyncio.sleep_ms(0), щоб інші корутини могли виконатися, а потім повертається до циклу і пробує знову. Перша ітерація запускає DMA; наступні ітерації підхоплюють кадри в міру їх надходження.
8.14.3. Цикл знімків з компанією¶
З оболонкою на місці цикл знімків вписується в більшу програму asyncio так само, як і будь-яка інша корутина. Наведений нижче приклад запускає три корутини одночасно: цикл захоплення, мигавку LED та серцебиття, яке друкує hello раз на секунду:
import asyncio
import csi
from machine import LED
async def capture_loop(cam):
while True:
img = await cam.snapshot()
# process img here
async def blinker(led, period_ms):
while True:
led.on()
await asyncio.sleep_ms(period_ms)
led.off()
await asyncio.sleep_ms(period_ms)
async def hello(period_s):
while True:
print("hello")
await asyncio.sleep(period_s)
async def main():
cam = AsyncCSI()
cam.reset()
cam.pixformat(csi.RGB565)
cam.framesize(csi.QVGA)
asyncio.create_task(blinker(LED("LED_BLUE"), 200))
asyncio.create_task(hello(1))
await capture_loop(cam)
asyncio.run(main())
Усі три корутини виконуються на одному циклі подій. Поки capture_loop поступається між неблокуючими опитуваннями знімків, blinker перемикає LED і hello друкує. Поки blinker і hello сплять, capture_loop опитує камеру. Інтервал опитування короткий – один тік циклу подій – тому він додає незначну затримку до моменту, коли програма бачить новий кадр.
Цикл захоплення не блокує цикл подій. Додавання більшої кількості паралельної роботи – наприклад, UART-клієнта – це просто ще один виклик create_task() всередині main.
Примітка
Налаштування кадрових буферів як і раніше важливе у цій конфігурації. Режим одного буфера змушує snapshot(blocking=False) повертати None до захоплення наступного кадру; подвійне або потрійне буферування згладжує це, тому оболонка зазвичай знаходить буферизований кадр, готовий на першому опитуванні після обробки попереднього кадру.