10.4. 在多個觀看者間共用單一擷取迴圈

每個已連線的用戶端各自獨立呼叫 csi0.snapshot() 是浪費的,而且一旦同時開啟兩個串流,情況會更糟:感測器以它自己的速率送出影格,每一次重複的擷取都會拖慢所有人。正確的做法是用一個擷取協程,把「最新的影格」發布到一個共用插槽,再加上各用戶端各自的迭代器去從該插槽讀取。

一個擷取工作把 JPEG 位元組寫到單一的 latest_jpeg 插槽;三個串流用戶端的迭代器從該插槽讀取,並 各自等待共用的 new_frame 事件。

10.4.1. 擷取工作

一個背景協程以感測器送出影格的最快速度抓取影格,把每一張 JPEG 壓縮進一個共用的 bytes,並觸發一個事件,好讓任何正在等待的用戶端醒來:

latest_jpeg = None
new_frame = asyncio.Event()

async def capture_loop():
    global latest_jpeg
    while True:
        img = await csi0.snapshot()
        latest_jpeg = bytes(img.compress(quality=85).bytearray())
        new_frame.set()
        new_frame.clear()

set() / clear() 這一對是 脈衝 模式。set() 會一口氣解除所有目前正在等待該事件的協程的阻塞;clear() 則立即重置該事件,讓下一次 wait() 再次阻塞。在有多個消費者(一個觀看者、另一個觀看者,以及任何其他需要對新影格做出反應的協程)的情況下,沒有任何單一消費者要負責重置該事件,也沒有人會偷走別人的喚醒。

備註

圍繞 JPEG 的 bytes(...) 包裝在此是關鍵。bytearray() 回傳的是相機影像緩衝區的一個檢視;緊接著的下一次 snapshot() 呼叫會就地用下一張影格改寫該緩衝區。latest_jpeg 的存活時間比區域變數 img 更長,所以若沒有這份複製,每個讀取者都會在每次擷取時看到插槽在腳下被換掉。

10.4.2. 各用戶端的迭代器從插槽讀取

MJPEG 串流處理常式不再自己呼叫 csi0.snapshot()。取而代之,每個 FrameStream 實例都等待共用事件,並從共用的 bytes 讀取:

class FrameStream:
    # One instance per connected client. Each one independently
    # waits on the shared new_frame pulse; the capture loop is
    # responsible for resetting the event between frames.

    def __aiter__(self):
        return self

    async def __anext__(self):
        await new_frame.wait()
        if latest_jpeg is None:
            return b''
        return (b'--' + BOUNDARY + b'\r\n'
                b'Content-Type: image/jpeg\r\n\r\n'
                + latest_jpeg + b'\r\n')

快照路由也跟著改變:它不再觸發擷取,而是回傳 latest_jpeg 目前所持有的任何內容:

@app.get('/snapshot.jpg')
async def snapshot(request):
    if latest_jpeg is None:
        return 'no frame yet', 503
    return Response(
        body=latest_jpeg,
        headers={'Content-Type': 'image/jpeg'},
    )

(body, status) 元組是 microdot 在不建構 microdot.Response 的情況下設定 HTTP 狀態碼的簡寫。503 表示 我在這裡,但還沒準備好——這正是「請稍後再問一次」的恰當狀態碼。

10.4.3. 讓擷取與伺服器並行執行

main 現在有兩個頂層協程:擷取迴圈與 HTTP 伺服器。asyncio.gather() 會同時執行兩者,且如果其中一個當掉,另一個會被取消:

async def main():
    await asyncio.gather(
        capture_loop(),
        app.start_server(host='0.0.0.0', port=80),
    )

asyncio.run(main())

現在,無論連線了多少個觀看者,感測器每個週期都只讀取一張影格。第一個連到 /stream.jpg 的瀏覽器會看到影格;第二個、第三個、第十個也一樣——它們全都共用同一份擷取,而相機在其他路由上也維持同樣的回應能力。