10.3. 即時串流——單一觀看者

瀏覽器可以直接在 <img> 標籤內算繪多部分的 Motion JPEG (MJPEG) 串流。給瀏覽器一個永遠不會結束的 HTTP 回應,寫出以 multipart 邊界分隔的 JPEG,瀏覽器就會在每張影格到達時逐一顯示。

瀏覽器送出 GET /stream.jpg;相機以 Content-Type multipart/x-mixed-replace 回應,並在瀏覽器斷線之前 為每張影格寫出一個以 JPEG 為主體的部分。

線路上的傳輸很直接:一個回應標頭 Content-Type: multipart/x-mixed-replace; boundary=frame,接著是一行 --frameContent-Type: image/jpeg、一個空行、JPEG 位元組、\r\n,然後重複。當 <img> 被移除或分頁被關閉時,瀏覽器會關閉連線。

10.3.1. 不阻塞地擷取

目前為止所用的阻塞式 csi0.snapshot() 會卡住整個事件迴圈,直到感測器送出一張影格為止。當一個請求觸發一次快照、且沒有其他東西在執行時,這沒有問題。但一旦串流開啟,伺服器就必須在擷取下一張影格的同時繼續處理其他請求——擷取呼叫需要在等待感測器時 讓出 給事件迴圈。

這個模式是一個輕薄的 AsyncCSI 包裝層,它以非阻塞模式輪詢 csi.CSI.snapshot(),並在輪詢之間讓協程睡眠。asyncio 章節已在 AsyncCSI 中走過這個模式;目前先把它內嵌進指令碼:

import asyncio

class AsyncCSI:
    def __init__(self, *args, **kwargs):
        self._csi = csi.CSI(*args, **kwargs)

    def __getattr__(self, name):
        return getattr(self._csi, name)

    async def snapshot(self):
        while True:
            img = self._csi.snapshot(blocking=False)
            if img is not None:
                return img
            await asyncio.sleep_ms(0)

其他每個 CSI 方法(reset()pixformat()framesize()gain_db()、……)都透過 __getattr__ 轉發;只有 snapshot() 被替換成一個可 await 的版本,讓事件迴圈能在輪詢之間排程其他協程。

把快照路由中那個裸的 csi.CSI() 換成 AsyncCSI()

csi0 = AsyncCSI()
csi0.reset()
csi0.pixformat(csi.RGB565)
csi0.framesize(csi.QVGA)

10.3.2. 串流主體是以類別為基礎的迭代器

串流回應主體只是一個 microdot 用 async for 迭代的物件,把每個 yield 出來的區塊寫到 socket 上。在 CPython 上,這通常是一個 async 產生器函式——帶有 yieldasync def。MicroPython 不支援這種寫法:

備註

MicroPython 的 asyncio 不支援 async 產生器函式(async def name(): ... yield ...)。串流回應主體必須是 以類別為基礎的 async 迭代器,其 __aiter__ 回傳 self,而 __anext__ 定義為 async def

對於 MJPEG 串流而言,這代表一個類別,其 __anext__ 會 await 一張影格,並把它包進 multipart 包裝後回傳:

BOUNDARY = b'frame'

class FrameStream:
    def __aiter__(self):
        return self

    async def __anext__(self):
        img = await csi0.snapshot()
        jpeg = bytes(img.compress(quality=85).bytearray())
        return (b'--' + BOUNDARY + b'\r\n'
                b'Content-Type: image/jpeg\r\n\r\n'
                + jpeg + b'\r\n')

@app.get('/stream.jpg')
async def stream(request):
    return Response(
        body=FrameStream(),
        headers={
            'Content-Type':
                b'multipart/x-mixed-replace; boundary=' + BOUNDARY,
        },
    )

這個實例每個請求都是全新的,所以每個已連線的用戶端都會得到自己的迭代器。當瀏覽器斷線時,microdot 會停止 await __anext__,該迭代器便會被垃圾回收。

備註

圍繞 JPEG 的 bytes(...) 包裝是防禦性的。bytearray() 回傳的是相機影像緩衝區的一個檢視,而下一次 snapshot() 呼叫會就地改寫該緩衝區。用 bytes 包裝會把 JPEG 複製出來,這樣即使在 __anext__ 再次執行時寫入端的 flush 尚未完成,microdot 正在寫入的那個區塊也能保持穩定。

10.3.3. 在 asyncio 內執行伺服器

先前的 app.run(host=..., port=...) 呼叫是阻塞的。MJPEG 處理常式需要與 AsyncCSI 的快照輪詢共用事件迴圈,所以把 app.run 換成 asyncio.run() 內部的 start_server()

async def main():
    await app.start_server(host='0.0.0.0', port=80)

asyncio.run(main())

asyncio.run() 包裝層讓伺服器成為眾多工作之一——main 協程於是成為衍生擷取、動作偵測,以及任何其他必須與 HTTP 伺服器共用事件迴圈之物的自然位置。

10.3.4. 一次一個觀看者

每個已連線的用戶端都會執行自己的 FrameStream 迭代器,這代表每個用戶端都會觸發自己的 csi0.snapshot() 呼叫。兩個瀏覽器代表每個影格間隔有兩次感測器讀取,三個就是三次,依此類推。感測器實際上無法比它自己的影格率更快地送出影格,所以這些請求會彼此排隊,導致每個人的串流都變慢。

解決辦法是用單一的共用擷取迴圈,把一張影格發布給許多讀取者。