10.3. 即時串流——單一觀看者¶
瀏覽器可以直接在 <img> 標籤內算繪多部分的 Motion JPEG (MJPEG) 串流。給瀏覽器一個永遠不會結束的 HTTP 回應,寫出以 multipart 邊界分隔的 JPEG,瀏覽器就會在每張影格到達時逐一顯示。
線路上的傳輸很直接:一個回應標頭 Content-Type: multipart/x-mixed-replace; boundary=frame,接著是一行 --frame、Content-Type: image/jpeg、一個空行、JPEG 位元組、\r\n,然後重複。當 <img> 被移除或分頁被關閉時,瀏覽器會關閉連線。
10.3.1. 不阻塞地擷取¶
目前為止所用的阻塞式 csi0.snapshot() 會卡住整個事件迴圈,直到感測器送出一張影格為止。當一個請求觸發一次快照、且沒有其他東西在執行時,這沒有問題。但一旦串流開啟,伺服器就必須在擷取下一張影格的同時繼續處理其他請求——擷取呼叫需要在等待感測器時 讓出 給事件迴圈。
這個模式是一個輕薄的 AsyncCSI 包裝層,它以非阻塞模式輪詢 csi.CSI.snapshot(),並在輪詢之間讓協程睡眠。asyncio 章節已在 AsyncCSI 中走過這個模式;目前先把它內嵌進指令碼:
import asyncio
class AsyncCSI:
def __init__(self, *args, **kwargs):
self._csi = csi.CSI(*args, **kwargs)
def __getattr__(self, name):
return getattr(self._csi, name)
async def snapshot(self):
while True:
img = self._csi.snapshot(blocking=False)
if img is not None:
return img
await asyncio.sleep_ms(0)
其他每個 CSI 方法(reset()、pixformat()、framesize()、gain_db()、……)都透過 __getattr__ 轉發;只有 snapshot() 被替換成一個可 await 的版本,讓事件迴圈能在輪詢之間排程其他協程。
把快照路由中那個裸的 csi.CSI() 換成 AsyncCSI():
csi0 = AsyncCSI()
csi0.reset()
csi0.pixformat(csi.RGB565)
csi0.framesize(csi.QVGA)
10.3.2. 串流主體是以類別為基礎的迭代器¶
串流回應主體只是一個 microdot 用 async for 迭代的物件,把每個 yield 出來的區塊寫到 socket 上。在 CPython 上,這通常是一個 async 產生器函式——帶有 yield 的 async def。MicroPython 不支援這種寫法:
備註
MicroPython 的 asyncio 不支援 async 產生器函式(async def name(): ... yield ...)。串流回應主體必須是 以類別為基礎的 async 迭代器,其 __aiter__ 回傳 self,而 __anext__ 定義為 async def。
對於 MJPEG 串流而言,這代表一個類別,其 __anext__ 會 await 一張影格,並把它包進 multipart 包裝後回傳:
BOUNDARY = b'frame'
class FrameStream:
def __aiter__(self):
return self
async def __anext__(self):
img = await csi0.snapshot()
jpeg = bytes(img.compress(quality=85).bytearray())
return (b'--' + BOUNDARY + b'\r\n'
b'Content-Type: image/jpeg\r\n\r\n'
+ jpeg + b'\r\n')
@app.get('/stream.jpg')
async def stream(request):
return Response(
body=FrameStream(),
headers={
'Content-Type':
b'multipart/x-mixed-replace; boundary=' + BOUNDARY,
},
)
這個實例每個請求都是全新的,所以每個已連線的用戶端都會得到自己的迭代器。當瀏覽器斷線時,microdot 會停止 await __anext__,該迭代器便會被垃圾回收。
備註
圍繞 JPEG 的 bytes(...) 包裝是防禦性的。bytearray() 回傳的是相機影像緩衝區的一個檢視,而下一次 snapshot() 呼叫會就地改寫該緩衝區。用 bytes 包裝會把 JPEG 複製出來,這樣即使在 __anext__ 再次執行時寫入端的 flush 尚未完成,microdot 正在寫入的那個區塊也能保持穩定。
10.3.3. 在 asyncio 內執行伺服器¶
先前的 app.run(host=..., port=...) 呼叫是阻塞的。MJPEG 處理常式需要與 AsyncCSI 的快照輪詢共用事件迴圈,所以把 app.run 換成 asyncio.run() 內部的 start_server():
async def main():
await app.start_server(host='0.0.0.0', port=80)
asyncio.run(main())
asyncio.run() 包裝層讓伺服器成為眾多工作之一——main 協程於是成為衍生擷取、動作偵測,以及任何其他必須與 HTTP 伺服器共用事件迴圈之物的自然位置。
10.3.4. 一次一個觀看者¶
每個已連線的用戶端都會執行自己的 FrameStream 迭代器,這代表每個用戶端都會觸發自己的 csi0.snapshot() 呼叫。兩個瀏覽器代表每個影格間隔有兩次感測器讀取,三個就是三次,依此類推。感測器實際上無法比它自己的影格率更快地送出影格,所以這些請求會彼此排隊,導致每個人的串流都變慢。
解決辦法是用單一的共用擷取迴圈,把一張影格發布給許多讀取者。