8.14. AsyncCSI¶
典型的 OpenMV Cam 指令碼最後都會以 while True: img = csi0.snapshot() 結尾 -- 這是一個會阻塞的快照迴圈,完全不需要 asyncio。但是當應用程式必須在拍攝的同時做其他事情 -- 例如監聽按鈕、傳送資料給對端、執行背景工作 -- 阻塞式呼叫就會造成阻礙。當 snapshot 正在等待下一個影格時,事件迴圈並未在執行,因此程式中其他每個協程都會凍結,直到影格抵達為止。
本頁將圍繞 CSI 建立一個小型包裝器,把 snapshot 轉換成可搭配 await 使用的協程。其成果是一個可直接替換的版本,讓拍攝迴圈能與 asyncio 程式的其餘部分共存。
8.14.1. 組成元件¶
CSI API 中有一個元件完成了大部分工作 -- 即非阻塞模式下的 snapshot()。呼叫 snapshot(blocking=False) 會回傳下一個影格(若已就緒)或回傳 None(若尚未就緒)。第一次非阻塞呼叫同時也會啟動相機的 DMA 拍攝(若尚未在執行),因此包裝器不需要做任何特殊處理來進行初始化。
另一個元件是 asyncio.sleep_ms()。包裝器會在迴圈中輪詢非阻塞快照,並在每次檢查之間以 await asyncio.sleep_ms(0) 讓出控制權給事件迴圈,讓其他每個就緒的協程都有機會在下一次輪詢前執行。
8.14.2. 包裝器¶
import asyncio
import csi
class AsyncCSI:
def __init__(self, *args, **kwargs):
self._csi = csi.CSI(*args, **kwargs)
def __getattr__(self, name):
return getattr(self._csi, name)
async def snapshot(self):
while True:
img = self._csi.snapshot(blocking=False)
if img is not None:
return img
await asyncio.sleep_ms(0)
建構函式包裝一個 CSI 實例。__getattr__ 會把包裝器本身未定義的每個屬性 -- reset、pixformat、framesize,以及所有的 感測器調整項 -- 轉發給底層的 CSI,因此除了那個關鍵方法之外,包裝器看起來與未包裝的物件完全相同。
async def snapshot 就是新增的部分。它會呼叫 snapshot(blocking=False);若呼叫回傳影像,協程就會回傳該影像。否則它會以 await asyncio.sleep_ms(0) 讓出控制權給事件迴圈,讓其他協程有機會執行,接著再回到迴圈重新嘗試。第一次迭代會啟動 DMA;後續的迭代則會在影格就緒時取得它們。
8.14.3. 有同伴的快照迴圈¶
有了這個包裝器之後,快照迴圈就能像其他任何協程一樣融入更大的 asyncio 程式中。下面的範例同時執行三個協程:拍攝迴圈、一個 LED 閃爍器,以及一個每秒印出 hello 的心跳訊號:
import asyncio
import csi
from machine import LED
async def capture_loop(cam):
while True:
img = await cam.snapshot()
# process img here
async def blinker(led, period_ms):
while True:
led.on()
await asyncio.sleep_ms(period_ms)
led.off()
await asyncio.sleep_ms(period_ms)
async def hello(period_s):
while True:
print("hello")
await asyncio.sleep(period_s)
async def main():
cam = AsyncCSI()
cam.reset()
cam.pixformat(csi.RGB565)
cam.framesize(csi.QVGA)
asyncio.create_task(blinker(LED("LED_BLUE"), 200))
asyncio.create_task(hello(1))
await capture_loop(cam)
asyncio.run(main())
三個協程都在同一個事件迴圈上推進。當 capture_loop 在非阻塞快照輪詢之間讓出控制權時,blinker 會切換 LED 而 hello 會印出訊息。當 blinker 與 hello 在睡眠時,capture_loop 則輪詢相機。輪詢間隔很短 -- 只有單一個事件迴圈的時脈週期 -- 因此它對應用程式看到新影格的時機所增加的延遲可以忽略不計。
拍攝迴圈不會阻塞事件迴圈。要加入更多並行工作 -- 例如一個 UART 用戶端 -- 只需在 main 中再加一個 create_task() 呼叫即可。
備註
在這種形式下,影格緩衝區 設定仍然重要。單緩衝模式會讓 snapshot(blocking=False) 持續回傳 None 直到下一個影格被拍攝為止;雙重或三重緩衝則可平滑化這個過程,使包裝器在處理完前一個影格後的第一次輪詢時,通常就能找到一個等待中的已緩衝影格。