10.4. 在多個觀看者間共用單一擷取迴圈¶
每個已連線的用戶端各自獨立呼叫 csi0.snapshot() 是浪費的,而且一旦同時開啟兩個串流,情況會更糟:感測器以它自己的速率送出影格,每一次重複的擷取都會拖慢所有人。正確的做法是用一個擷取協程,把「最新的影格」發布到一個共用插槽,再加上各用戶端各自的迭代器去從該插槽讀取。
10.4.1. 擷取工作¶
一個背景協程以感測器送出影格的最快速度抓取影格,把每一張 JPEG 壓縮進一個共用的 bytes,並觸發一個事件,好讓任何正在等待的用戶端醒來:
latest_jpeg = None
new_frame = asyncio.Event()
async def capture_loop():
global latest_jpeg
while True:
img = await csi0.snapshot()
latest_jpeg = bytes(img.compress(quality=85).bytearray())
new_frame.set()
new_frame.clear()
set() / clear() 這一對是 脈衝 模式。set() 會一口氣解除所有目前正在等待該事件的協程的阻塞;clear() 則立即重置該事件,讓下一次 wait() 再次阻塞。在有多個消費者(一個觀看者、另一個觀看者,以及任何其他需要對新影格做出反應的協程)的情況下,沒有任何單一消費者要負責重置該事件,也沒有人會偷走別人的喚醒。
備註
圍繞 JPEG 的 bytes(...) 包裝在此是關鍵。bytearray() 回傳的是相機影像緩衝區的一個檢視;緊接著的下一次 snapshot() 呼叫會就地用下一張影格改寫該緩衝區。latest_jpeg 的存活時間比區域變數 img 更長,所以若沒有這份複製,每個讀取者都會在每次擷取時看到插槽在腳下被換掉。
10.4.2. 各用戶端的迭代器從插槽讀取¶
MJPEG 串流處理常式不再自己呼叫 csi0.snapshot()。取而代之,每個 FrameStream 實例都等待共用事件,並從共用的 bytes 讀取:
class FrameStream:
# One instance per connected client. Each one independently
# waits on the shared new_frame pulse; the capture loop is
# responsible for resetting the event between frames.
def __aiter__(self):
return self
async def __anext__(self):
await new_frame.wait()
if latest_jpeg is None:
return b''
return (b'--' + BOUNDARY + b'\r\n'
b'Content-Type: image/jpeg\r\n\r\n'
+ latest_jpeg + b'\r\n')
快照路由也跟著改變:它不再觸發擷取,而是回傳 latest_jpeg 目前所持有的任何內容:
@app.get('/snapshot.jpg')
async def snapshot(request):
if latest_jpeg is None:
return 'no frame yet', 503
return Response(
body=latest_jpeg,
headers={'Content-Type': 'image/jpeg'},
)
(body, status) 元組是 microdot 在不建構 microdot.Response 的情況下設定 HTTP 狀態碼的簡寫。503 表示 我在這裡,但還沒準備好——這正是「請稍後再問一次」的恰當狀態碼。
10.4.3. 讓擷取與伺服器並行執行¶
main 現在有兩個頂層協程:擷取迴圈與 HTTP 伺服器。asyncio.gather() 會同時執行兩者,且如果其中一個當掉,另一個會被取消:
async def main():
await asyncio.gather(
capture_loop(),
app.start_server(host='0.0.0.0', port=80),
)
asyncio.run(main())
現在,無論連線了多少個觀看者,感測器每個週期都只讀取一張影格。第一個連到 /stream.jpg 的瀏覽器會看到影格;第二個、第三個、第十個也一樣——它們全都共用同一份擷取,而相機在其他路由上也維持同樣的回應能力。