10.3. ライブストリーミング -- 1人の視聴者¶
ブラウザはマルチパートの Motion JPEG(MJPEG)ストリームを <img> タグ内で直接レンダリングできます。終了することのないHTTPレスポンスを1つブラウザに渡し、マルチパート境界で区切られたJPEGを書き込むと、ブラウザは各フレームを到着するたびに表示します。
通信は単純です。1つのレスポンスヘッダー Content-Type: multipart/x-mixed-replace; boundary=frame の後、--frame 行、Content-Type: image/jpeg、空行、JPEGバイト、\r\n と続き、これを繰り返します。<img> が削除されるかタブが閉じられると、ブラウザは接続を閉じます。
10.3.1. ブロックせずにキャプチャする¶
これまで使っていたブロッキングの csi0.snapshot() は、センサーがフレームを返すまでイベントループ全体を停止させます。1つのリクエストが1つのスナップショットを発火し、他に何も実行されていないときはそれで問題ありませんでした。しかしストリームが開かれると、サーバーは次のフレームをキャプチャしている間も他のリクエストを処理し続ける必要があります。つまりキャプチャ呼び出しは、センサーを待っている間、イベントループに 制御を譲る 必要があります。
このパターンは、csi.CSI.snapshot() を非ブロッキングモードでポーリングし、ポーリングの合間にコルーチンをスリープさせる薄い AsyncCSI ラッパーです。asyncioの章では、AsyncCSI でこのパターンを説明しました。今のところはスクリプトにインラインで記述します。
import asyncio
class AsyncCSI:
def __init__(self, *args, **kwargs):
self._csi = csi.CSI(*args, **kwargs)
def __getattr__(self, name):
return getattr(self._csi, name)
async def snapshot(self):
while True:
img = self._csi.snapshot(blocking=False)
if img is not None:
return img
await asyncio.sleep_ms(0)
他のすべてのCSIメソッド(reset()、pixformat()、framesize()、gain_db()、...)は __getattr__ 経由で転送されます。snapshot() だけが、ポーリングの合間にイベントループが他のコルーチンをスケジュールできるようにするawait可能なバージョンに置き換えられます。
スナップショットルートのむき出しの csi.CSI() を AsyncCSI() に置き換えます。
csi0 = AsyncCSI()
csi0.reset()
csi0.pixformat(csi.RGB565)
csi0.framesize(csi.QVGA)
10.3.2. ストリーミングボディはクラスベースのイテレーターです¶
ストリーミングレスポンスボディは、microdotが async for で反復処理し、yieldされた各チャンクをソケットに送るだけのオブジェクトです。CPythonでは通常これは 非同期ジェネレーター関数(yield を伴う async def)です。MicroPythonはこれをサポートしていません。
注釈
MicroPythonの asyncio は非同期ジェネレーター関数(async def name(): ... yield ...)をサポートしていません。ストリーミングレスポンスボディは、__aiter__ が self を返し、__anext__ が async def として定義された クラスベース の非同期イテレーターでなければなりません。
MJPEGストリームの場合、これは __anext__ が1フレームをawaitし、それをマルチパートラッパーで包んで返すクラスを意味します。
BOUNDARY = b'frame'
class FrameStream:
def __aiter__(self):
return self
async def __anext__(self):
img = await csi0.snapshot()
jpeg = bytes(img.compress(quality=85).bytearray())
return (b'--' + BOUNDARY + b'\r\n'
b'Content-Type: image/jpeg\r\n\r\n'
+ jpeg + b'\r\n')
@app.get('/stream.jpg')
async def stream(request):
return Response(
body=FrameStream(),
headers={
'Content-Type':
b'multipart/x-mixed-replace; boundary=' + BOUNDARY,
},
)
インスタンスはリクエストごとに新しく生成されるので、接続された各クライアントは独自のイテレーターを得ます。ブラウザが切断すると、microdotは __anext__ のawaitをやめ、イテレーターはガベージコレクションされます。
注釈
JPEGの周りの bytes(...) ラップは防御的なものです。bytearray() はカメラの画像バッファへのビューを返し、次の snapshot() 呼び出しがそのバッファをその場で書き換えます。bytes で包むことでJPEGをコピーして取り出すので、microdotが書き込み途中のチャンクは、次に __anext__ が実行されるまでにライターのフラッシュが完了していなくても安定したままになります。
10.3.3. asyncioの中でサーバーを実行する¶
以前の app.run(host=..., port=...) 呼び出しはブロッキングです。MJPEGハンドラーはAsyncCSIのスナップショットポーリングとループを共有する必要があるので、app.run を asyncio.run() の中の start_server() に置き換えます。
async def main():
await app.start_server(host='0.0.0.0', port=80)
asyncio.run(main())
asyncio.run() ラッパーにより、サーバーは複数のタスクの1つになります。main コルーチンはその後、キャプチャ、モーション検出、その他HTTPサーバーとループを共有する必要があるものを起動する自然な場所になります。
10.3.4. 一度に1人の視聴者¶
接続された各クライアントはそれぞれ独自の FrameStream イテレーターを実行するので、各クライアントはそれぞれ独自の csi0.snapshot() 呼び出しをトリガーします。ブラウザが2つあればフレーム間隔ごとに2回のセンサー読み取りが、3つあれば3回が発生し、以降同様です。センサーは実際には自身のフレームレートより速くフレームを返せないので、リクエストは互いの後ろにキューイングされ、全員のストリームが遅くなります。
解決策は、1つのフレームを多数の読み手に発行する単一の共有キャプチャループです。