10.4. 1つのキャプチャループを複数の視聴者で共有する

接続された各クライアントが個別に csi0.snapshot() を呼ぶのは無駄であり、2つのストリームが同時に開かれると事態は悪化します。センサーは自身のレートでフレームを返すので、重複したキャプチャはそれぞれ全員を遅くします。正しいアプローチは、「最新のフレーム」を共有スロットに発行する1つのキャプチャコルーチンと、そのスロットから読み取るクライアントごとのイテレーターです。

1つのキャプチャタスクが、単一の latest_jpeg スロットに JPEGバイトを書き込みます。3つのストリームクライアント イテレーターがそのスロットから読み取り、それぞれ共有の new_frame イベントを待ちます。

10.4.1. キャプチャタスク

バックグラウンドのコルーチンが、センサーが返すのと同じ速さでフレームを取得し、それぞれを共有の bytes にJPEG圧縮し、イベントをパルスして待機中のクライアントを起こします。

latest_jpeg = None
new_frame = asyncio.Event()

async def capture_loop():
    global latest_jpeg
    while True:
        img = await csi0.snapshot()
        latest_jpeg = bytes(img.compress(quality=85).bytearray())
        new_frame.set()
        new_frame.clear()

set() / clear() のペアが パルス パターンです。set() はそのイベントを現在待っているすべてのコルーチンを一度にブロック解除します。clear() は直ちにイベントをリセットし、次の wait() が再びブロックするようにします。複数のコンシューマー(視聴者、別の視聴者、新しいフレームに反応する必要のある他のコルーチン)がいても、特定のコンシューマーがイベントのリセットに責任を負うことはなく、誰かが他の誰かから起床を横取りすることもありません。

注釈

JPEGの周りの bytes(...) ラップはここでは不可欠です。bytearray() はカメラの画像バッファへのビューを返します。直後の snapshot() 呼び出しが、そのバッファを次のフレームでその場で書き換えます。latest_jpeg はローカルの img より長く存続するので、コピーしなければすべての読み手がキャプチャのたびにスロットが足元でずれるのを目にすることになります。

10.4.2. クライアントごとのイテレーターがスロットから読み取る

MJPEGストリームハンドラーは自身で csi0.snapshot() を呼ぶのをやめます。代わりに、各 FrameStream インスタンスは共有イベントを待ち、共有のbytesから読み取ります。

class FrameStream:
    # One instance per connected client. Each one independently
    # waits on the shared new_frame pulse; the capture loop is
    # responsible for resetting the event between frames.

    def __aiter__(self):
        return self

    async def __anext__(self):
        await new_frame.wait()
        if latest_jpeg is None:
            return b''
        return (b'--' + BOUNDARY + b'\r\n'
                b'Content-Type: image/jpeg\r\n\r\n'
                + latest_jpeg + b'\r\n')

スナップショットルートも変わります。もはやキャプチャをトリガーせず、latest_jpeg が現在保持しているものを返します。

@app.get('/snapshot.jpg')
async def snapshot(request):
    if latest_jpeg is None:
        return 'no frame yet', 503
    return Response(
        body=latest_jpeg,
        headers={'Content-Type': 'image/jpeg'},
    )

(body, status) タプルは、microdot.Response を構築せずにHTTPステータスコードを設定するためのmicrodotの省略記法です。503は ここにいるがまだ準備ができていない という意味で、「少し後にもう一度尋ねてほしい」に対する正しいコードです。

10.4.3. サーバーと並行してキャプチャを実行する

main には2つのトップレベルコルーチンができました。キャプチャループとHTTPサーバーです。asyncio.gather() が両方を実行し、どちらかがクラッシュするともう一方はキャンセルされます。

async def main():
    await asyncio.gather(
        capture_loop(),
        app.start_server(host='0.0.0.0', port=80),
    )

asyncio.run(main())

これで、接続されている視聴者の数に関係なく、センサーは1サイクルにつき1フレームを読み取ります。最初に /stream.jpg を開いたブラウザはフレームを見られます。2番目、3番目、10番目も同様です。すべてが同じキャプチャを共有し、カメラは他のルートでも応答性を保ち続けます。