10.4. 让多个观看者共享一个捕获循环

让每个连接的客户端各自独立调用 csi0.snapshot() 是一种浪费,而且一旦同时开启两路流,情况会更糟:传感器以自己的速率交付帧,每一次重复的捕获都会拖慢所有人。正确的做法是用一个捕获协程,把“最新的帧”发布到一个共享槽位,再配上从该槽位读取的、每个客户端各自的迭代器。

一个捕获任务把 JPEG 字节写入单一的 latest_jpeg 槽位;三个流客户端迭代器从该槽位读取,并 各自在共享的 new_frame 事件上等待。

10.4.1. 捕获任务

一个后台协程以传感器交付帧的最快速度抓取帧,将每一帧 JPEG 压缩到一个共享的 bytes 中,并触发一个事件,从而唤醒任何正在等待的客户端:

latest_jpeg = None
new_frame = asyncio.Event()

async def capture_loop():
    global latest_jpeg
    while True:
        img = await csi0.snapshot()
        latest_jpeg = bytes(img.compress(quality=85).bytearray())
        new_frame.set()
        new_frame.clear()

set() / clear() 这一对是 脉冲 模式。set() 会一次性解除所有当前正在该事件上等待的协程的阻塞;clear() 则立即重置该事件,使下一次 wait() 再次阻塞。在有多个消费者(一个观看者、另一个观看者,以及任何其他需要对新帧做出反应的协程)的情况下,没有任何单个消费者负责重置该事件,也没有谁会窃取属于别人的唤醒。

备注

JPEG 外层的 bytes(...) 包裹在这里至关重要。bytearray() 返回的是指向摄像头图像缓冲区的一个视图;紧接着的下一次 snapshot() 调用会就地用下一帧重写该缓冲区。latest_jpeg 的存活时间长于局部的 img,所以如果不做复制,每个读取方都会在每次捕获时看到槽位在自己脚下发生变化。

10.4.2. 每个客户端各自的迭代器从槽位读取

MJPEG 流处理函数不再自己调用 csi0.snapshot()。取而代之的是,每个 FrameStream 实例在共享事件上等待,并从共享的 bytes 中读取:

class FrameStream:
    # One instance per connected client. Each one independently
    # waits on the shared new_frame pulse; the capture loop is
    # responsible for resetting the event between frames.

    def __aiter__(self):
        return self

    async def __anext__(self):
        await new_frame.wait()
        if latest_jpeg is None:
            return b''
        return (b'--' + BOUNDARY + b'\r\n'
                b'Content-Type: image/jpeg\r\n\r\n'
                + latest_jpeg + b'\r\n')

快照路由也随之改变:它不再触发一次捕获,而是返回 latest_jpeg 当前所持有的内容:

@app.get('/snapshot.jpg')
async def snapshot(request):
    if latest_jpeg is None:
        return 'no frame yet', 503
    return Response(
        body=latest_jpeg,
        headers={'Content-Type': 'image/jpeg'},
    )

(body, status) 元组是 microdot 用来设置 HTTP 状态码、而无需构造 microdot.Response 的简写。503 表示 我在这里但还没准备好——这正是“稍后再来问”的恰当状态码。

10.4.3. 让捕获与服务器并行运行

现在 main 有了两个顶层协程:捕获循环和 HTTP 服务器。asyncio.gather() 会同时运行它们,并且如果其中一个崩溃,另一个会被取消:

async def main():
    await asyncio.gather(
        capture_loop(),
        app.start_server(host='0.0.0.0', port=80),
    )

asyncio.run(main())

现在,无论连接了多少观看者,传感器每个周期只读取一帧。第一个访问 /stream.jpg 的浏览器能看到帧;第二个、第三个、第十个也都能——它们全都共享同一份捕获,而摄像头在其他路由上依然保持响应灵敏。