10.4. 让多个观看者共享一个捕获循环¶
让每个连接的客户端各自独立调用 csi0.snapshot() 是一种浪费,而且一旦同时开启两路流,情况会更糟:传感器以自己的速率交付帧,每一次重复的捕获都会拖慢所有人。正确的做法是用一个捕获协程,把“最新的帧”发布到一个共享槽位,再配上从该槽位读取的、每个客户端各自的迭代器。
10.4.1. 捕获任务¶
一个后台协程以传感器交付帧的最快速度抓取帧,将每一帧 JPEG 压缩到一个共享的 bytes 中,并触发一个事件,从而唤醒任何正在等待的客户端:
latest_jpeg = None
new_frame = asyncio.Event()
async def capture_loop():
global latest_jpeg
while True:
img = await csi0.snapshot()
latest_jpeg = bytes(img.compress(quality=85).bytearray())
new_frame.set()
new_frame.clear()
set() / clear() 这一对是 脉冲 模式。set() 会一次性解除所有当前正在该事件上等待的协程的阻塞;clear() 则立即重置该事件,使下一次 wait() 再次阻塞。在有多个消费者(一个观看者、另一个观看者,以及任何其他需要对新帧做出反应的协程)的情况下,没有任何单个消费者负责重置该事件,也没有谁会窃取属于别人的唤醒。
备注
JPEG 外层的 bytes(...) 包裹在这里至关重要。bytearray() 返回的是指向摄像头图像缓冲区的一个视图;紧接着的下一次 snapshot() 调用会就地用下一帧重写该缓冲区。latest_jpeg 的存活时间长于局部的 img,所以如果不做复制,每个读取方都会在每次捕获时看到槽位在自己脚下发生变化。
10.4.2. 每个客户端各自的迭代器从槽位读取¶
MJPEG 流处理函数不再自己调用 csi0.snapshot()。取而代之的是,每个 FrameStream 实例在共享事件上等待,并从共享的 bytes 中读取:
class FrameStream:
# One instance per connected client. Each one independently
# waits on the shared new_frame pulse; the capture loop is
# responsible for resetting the event between frames.
def __aiter__(self):
return self
async def __anext__(self):
await new_frame.wait()
if latest_jpeg is None:
return b''
return (b'--' + BOUNDARY + b'\r\n'
b'Content-Type: image/jpeg\r\n\r\n'
+ latest_jpeg + b'\r\n')
快照路由也随之改变:它不再触发一次捕获,而是返回 latest_jpeg 当前所持有的内容:
@app.get('/snapshot.jpg')
async def snapshot(request):
if latest_jpeg is None:
return 'no frame yet', 503
return Response(
body=latest_jpeg,
headers={'Content-Type': 'image/jpeg'},
)
(body, status) 元组是 microdot 用来设置 HTTP 状态码、而无需构造 microdot.Response 的简写。503 表示 我在这里但还没准备好——这正是“稍后再来问”的恰当状态码。
10.4.3. 让捕获与服务器并行运行¶
现在 main 有了两个顶层协程:捕获循环和 HTTP 服务器。asyncio.gather() 会同时运行它们,并且如果其中一个崩溃,另一个会被取消:
async def main():
await asyncio.gather(
capture_loop(),
app.start_server(host='0.0.0.0', port=80),
)
asyncio.run(main())
现在,无论连接了多少观看者,传感器每个周期只读取一帧。第一个访问 /stream.jpg 的浏览器能看到帧;第二个、第三个、第十个也都能——它们全都共享同一份捕获,而摄像头在其他路由上依然保持响应灵敏。