8.14. AsyncCSI¶
典型的なOpenMV Camのスクリプトは while True: img = csi0.snapshot() で終わります -- これは asyncio をまったく必要としないブロッキング型のスナップショットループです。しかし、アプリケーションがキャプチャと並行して 何か別のこと をしなければならなくなった瞬間 -- ボタンの入力を待つ、相手にデータを送る、バックグラウンドタスクを実行するなど -- このブロッキング呼び出しが邪魔になります。snapshot が次のフレームを待っている間はイベントループが動かないため、プログラム内の他のすべてのコルーチンはフレームが届くまで凍結されてしまいます。
このページでは、CSI のまわりに小さなラッパーを構築し、snapshot を await に対応したコルーチンに変換します。その結果として得られるのは、キャプチャループを asyncio プログラムの他の部分と共存させられる、そのまま差し替え可能な代替品です。
8.14.1. 構成要素¶
CSI API のある一部分が作業の大半をこなします -- 非ブロッキングモードの snapshot() です。snapshot(blocking=False) を呼び出すと、(準備ができていれば)次のフレームを返すか、(できていなければ)None を返します。最初の非ブロッキング呼び出しは、カメラのDMAキャプチャがまだ動作していなければそれを 開始 もするため、ラッパー側で起動のために特別なことをする必要はありません。
もう一つの構成要素は asyncio.sleep_ms() です。ラッパーはループ内で非ブロッキングのスナップショットをポーリングし、チェックの合間に await asyncio.sleep_ms(0) でイベントループに制御を譲るので、次のポーリングの前に他の実行可能なすべてのコルーチンが動作する機会を得られます。
8.14.2. ラッパー¶
import asyncio
import csi
class AsyncCSI:
def __init__(self, *args, **kwargs):
self._csi = csi.CSI(*args, **kwargs)
def __getattr__(self, name):
return getattr(self._csi, name)
async def snapshot(self):
while True:
img = self._csi.snapshot(blocking=False)
if img is not None:
return img
await asyncio.sleep_ms(0)
コンストラクタは CSI インスタンスをラップします。__getattr__ は、ラッパー自身が定義していないすべての属性 -- reset、pixformat、framesize、そして センサーの調整項目 のすべて -- を内部の CSI に転送するため、ラッパーは唯一重要なメソッドを除いて、ラップされていないオブジェクトとまったく同じように見えます。
async def snapshot が新しい部分です。これは snapshot(blocking=False) を呼び出し、その呼び出しが画像を返せばコルーチンはそれを返します。そうでなければ await asyncio.sleep_ms(0) でイベントループに制御を戻し、他のコルーチンが動作する機会を与えてから、ループの先頭に戻って再試行します。最初の反復でDMAが開始され、それ以降の反復ではフレームが利用可能になるたびにそれを取得します。
8.14.3. 仲間と一緒のスナップショットループ¶
ラッパーを用意すれば、スナップショットループは他のどのコルーチンとも同じ要領で、より大きな asyncio プログラムに組み込めます。以下の例では、3つのコルーチンを同時に実行します。キャプチャループ、LEDの点滅、そして1秒に一度 hello を出力するハートビートです:
import asyncio
import csi
from machine import LED
async def capture_loop(cam):
while True:
img = await cam.snapshot()
# process img here
async def blinker(led, period_ms):
while True:
led.on()
await asyncio.sleep_ms(period_ms)
led.off()
await asyncio.sleep_ms(period_ms)
async def hello(period_s):
while True:
print("hello")
await asyncio.sleep(period_s)
async def main():
cam = AsyncCSI()
cam.reset()
cam.pixformat(csi.RGB565)
cam.framesize(csi.QVGA)
asyncio.create_task(blinker(LED("LED_BLUE"), 200))
asyncio.create_task(hello(1))
await capture_loop(cam)
asyncio.run(main())
3つのコルーチンはすべて同じイベントループ上で進行します。capture_loop が非ブロッキングのスナップショットポーリングの合間に制御を譲っている間に、blinker はLEDを切り替え、hello は出力します。blinker と hello がスリープしている間に、capture_loop はカメラをポーリングします。ポーリング間隔は短く -- イベントループの1ティック分 -- なので、アプリケーションが新しいフレームを認識するまでに加わるレイテンシは無視できる程度です。
キャプチャループはイベントループを ブロックしません。同時実行する処理を追加すること -- たとえばUARTクライアント -- は、main の中でもう一つ create_task() を呼び出すだけです。
注釈
フレームバッファ の設定はこの形でも依然として重要です。シングルバッファモードでは、snapshot(blocking=False) は次のフレームがキャプチャされるまで None を返します。ダブルバッファリングやトリプルバッファリングはこれを滑らかにし、前のフレームの処理が終わった後の最初のポーリングで、ラッパーが通常はバッファ済みのフレームが待機しているのを見つけられるようにします。