10.3. 实时流——单个观看者¶
浏览器可以直接在一个 <img> 标签内渲染多部分的 Motion JPEG(MJPEG)流。只需向浏览器返回一个永不结束的 HTTP 响应,写入以多部分边界分隔的多张 JPEG,浏览器就会随着每一帧的到达逐一显示出来。
线路上的内容很简单:一个响应头 Content-Type: multipart/x-mixed-replace; boundary=frame,然后是一行 --frame、Content-Type: image/jpeg、一个空行、JPEG 字节、\r\n,如此反复。当 <img> 被移除或标签页被关闭时,浏览器会关闭连接。
10.3.1. 无阻塞地捕获¶
目前为止所用的阻塞式 csi0.snapshot() 会让整个事件循环停滞,直到传感器交付一帧。当一个请求只触发一次快照、且没有其他东西在运行时,这没有问题。可一旦有一个流处于开启状态,服务器就必须在捕获下一帧的同时继续处理其他请求——这个捕获调用需要在等待传感器期间向事件循环 让出 控制权。
这个模式是一个轻量的 AsyncCSI 包装器,它以非阻塞模式轮询 csi.CSI.snapshot(),并在两次轮询之间让协程休眠。asyncio 章节在 AsyncCSI 中详细讲解过这个模式;这里暂且把它内联到脚本中:
import asyncio
class AsyncCSI:
def __init__(self, *args, **kwargs):
self._csi = csi.CSI(*args, **kwargs)
def __getattr__(self, name):
return getattr(self._csi, name)
async def snapshot(self):
while True:
img = self._csi.snapshot(blocking=False)
if img is not None:
return img
await asyncio.sleep_ms(0)
其他所有 CSI 方法(reset()、pixformat()、framesize()、gain_db() 等)都通过 __getattr__ 转发;只有 snapshot() 被替换成一个可 await 的版本,从而让事件循环能在两次轮询之间调度其他协程。
把快照路由中裸用的 csi.CSI() 替换成一个 AsyncCSI():
csi0 = AsyncCSI()
csi0.reset()
csi0.pixformat(csi.RGB565)
csi0.framesize(csi.QVGA)
10.3.2. 流式响应体是基于类的迭代器¶
流式响应体只是一个 microdot 用 async for 迭代的对象,它会把每个产出的数据块写入套接字。在 CPython 上,这通常是一个 异步生成器函数——带 yield 的 async def。但 MicroPython 不支持这种写法:
备注
MicroPython 的 asyncio 不支持异步生成器函数(async def name(): ... yield ...)。流式响应体必须是 基于类的 异步迭代器,其 __aiter__ 返回 self,并将 __anext__ 定义为 async def。
对于一个 MJPEG 流来说,这意味着需要一个类,其 __anext__ 会 await 一帧,并将其包裹在多部分封装中返回:
BOUNDARY = b'frame'
class FrameStream:
def __aiter__(self):
return self
async def __anext__(self):
img = await csi0.snapshot()
jpeg = bytes(img.compress(quality=85).bytearray())
return (b'--' + BOUNDARY + b'\r\n'
b'Content-Type: image/jpeg\r\n\r\n'
+ jpeg + b'\r\n')
@app.get('/stream.jpg')
async def stream(request):
return Response(
body=FrameStream(),
headers={
'Content-Type':
b'multipart/x-mixed-replace; boundary=' + BOUNDARY,
},
)
每个请求都会创建一个全新的实例,因此每个连接的客户端都拥有自己的迭代器。当浏览器断开连接时,microdot 停止 await __anext__,该迭代器随即被垃圾回收。
备注
JPEG 外层的 bytes(...) 包裹是一种防御性写法。bytearray() 返回的是指向摄像头图像缓冲区的一个视图,而下一次 snapshot() 调用会就地重写该缓冲区。用 bytes 包裹会把 JPEG 复制出来,这样即使在 __anext__ 再次运行时写入方的刷新尚未完成,microdot 正在写入的那个数据块也能保持稳定。
10.3.3. 在 asyncio 中运行服务器¶
前面的 app.run(host=..., port=...) 调用是阻塞的。MJPEG 处理函数需要与 AsyncCSI 的快照轮询共享事件循环,因此要把 app.run 替换成置于 asyncio.run() 中的 start_server():
async def main():
await app.start_server(host='0.0.0.0', port=80)
asyncio.run(main())
asyncio.run() 包装让服务器成为多个任务中的一个——这样 main 协程便自然成了启动捕获、运动检测以及任何其他必须与 HTTP 服务器共享事件循环的任务的合适位置。
10.3.4. 一次只能一个观看者¶
每个连接的客户端都会运行它自己的 FrameStream 迭代器,这意味着每个客户端都会触发它自己的 csi0.snapshot() 调用。两个浏览器就意味着每个帧间隔有两次传感器读取,三个就是三次,以此类推。传感器实际上无法以快于自身帧率的速度交付帧,因此这些请求会相互排队,导致每个人的流都变慢。
解决办法是用一个共享的捕获循环,把一帧发布给多个读取方。