8.14. AsyncCSI

典型的 OpenMV Cam 指令碼最後都會以 while True: img = csi0.snapshot() 結尾 -- 這是一個會阻塞的快照迴圈,完全不需要 asyncio。但是當應用程式必須在拍攝的同時做其他事情 -- 例如監聽按鈕、傳送資料給對端、執行背景工作 -- 阻塞式呼叫就會造成阻礙。當 snapshot 正在等待下一個影格時,事件迴圈並未在執行,因此程式中其他每個協程都會凍結,直到影格抵達為止。

本頁將圍繞 CSI 建立一個小型包裝器,把 snapshot 轉換成可搭配 await 使用的協程。其成果是一個可直接替換的版本,讓拍攝迴圈能與 asyncio 程式的其餘部分共存。

8.14.1. 組成元件

CSI API 中有一個元件完成了大部分工作 -- 即非阻塞模式下的 snapshot()。呼叫 snapshot(blocking=False) 會回傳下一個影格(若已就緒)或回傳 None(若尚未就緒)。第一次非阻塞呼叫同時也會啟動相機的 DMA 拍攝(若尚未在執行),因此包裝器不需要做任何特殊處理來進行初始化。

另一個元件是 asyncio.sleep_ms()。包裝器會在迴圈中輪詢非阻塞快照,並在每次檢查之間以 await asyncio.sleep_ms(0) 讓出控制權給事件迴圈,讓其他每個就緒的協程都有機會在下一次輪詢前執行。

8.14.2. 包裝器

import asyncio
import csi


class AsyncCSI:
    def __init__(self, *args, **kwargs):
        self._csi = csi.CSI(*args, **kwargs)

    def __getattr__(self, name):
        return getattr(self._csi, name)

    async def snapshot(self):
        while True:
            img = self._csi.snapshot(blocking=False)
            if img is not None:
                return img
            await asyncio.sleep_ms(0)

建構函式包裝一個 CSI 實例。__getattr__ 會把包裝器本身未定義的每個屬性 -- resetpixformatframesize,以及所有的 感測器調整項 -- 轉發給底層的 CSI,因此除了那個關鍵方法之外,包裝器看起來與未包裝的物件完全相同。

async def snapshot 就是新增的部分。它會呼叫 snapshot(blocking=False);若呼叫回傳影像,協程就會回傳該影像。否則它會以 await asyncio.sleep_ms(0) 讓出控制權給事件迴圈,讓其他協程有機會執行,接著再回到迴圈重新嘗試。第一次迭代會啟動 DMA;後續的迭代則會在影格就緒時取得它們。

8.14.3. 有同伴的快照迴圈

有了這個包裝器之後,快照迴圈就能像其他任何協程一樣融入更大的 asyncio 程式中。下面的範例同時執行三個協程:拍攝迴圈、一個 LED 閃爍器,以及一個每秒印出 hello 的心跳訊號:

import asyncio
import csi
from machine import LED


async def capture_loop(cam):
    while True:
        img = await cam.snapshot()
        # process img here

async def blinker(led, period_ms):
    while True:
        led.on()
        await asyncio.sleep_ms(period_ms)
        led.off()
        await asyncio.sleep_ms(period_ms)

async def hello(period_s):
    while True:
        print("hello")
        await asyncio.sleep(period_s)

async def main():
    cam = AsyncCSI()
    cam.reset()
    cam.pixformat(csi.RGB565)
    cam.framesize(csi.QVGA)

    asyncio.create_task(blinker(LED("LED_BLUE"), 200))
    asyncio.create_task(hello(1))
    await capture_loop(cam)

asyncio.run(main())

三個協程都在同一個事件迴圈上推進。當 capture_loop 在非阻塞快照輪詢之間讓出控制權時,blinker 會切換 LED 而 hello 會印出訊息。當 blinkerhello 在睡眠時,capture_loop 則輪詢相機。輪詢間隔很短 -- 只有單一個事件迴圈的時脈週期 -- 因此它對應用程式看到新影格的時機所增加的延遲可以忽略不計。

拍攝迴圈不會阻塞事件迴圈。要加入更多並行工作 -- 例如一個 UART 用戶端 -- 只需在 main 中再加一個 create_task() 呼叫即可。

備註

在這種形式下,影格緩衝區 設定仍然重要。單緩衝模式會讓 snapshot(blocking=False) 持續回傳 None 直到下一個影格被拍攝為止;雙重或三重緩衝則可平滑化這個過程,使包裝器在處理完前一個影格後的第一次輪詢時,通常就能找到一個等待中的已緩衝影格。