10.4. Sharing one capture loop across viewers¶
Each connected client calling csi0.snapshot() independently is
wasteful, and once two streams are open at once it gets worse: the
sensor delivers frames at its own rate, and every duplicated capture
slows everybody down. The right approach is one capture coroutine
that publishes “the latest frame” to a shared slot, plus per-client
iterators that read from the slot.
10.4.1. The capture task¶
A background coroutine grabs frames as fast as the sensor delivers
them, JPEG-compresses each one into a shared bytes, and pulses an
event so any waiting client wakes up:
latest_jpeg = None
new_frame = asyncio.Event()
async def capture_loop():
global latest_jpeg
while True:
img = await csi0.snapshot()
latest_jpeg = bytes(img.compress(quality=85).bytearray())
new_frame.set()
new_frame.clear()
The set() / clear() pair is the pulse pattern. set()
unblocks every coroutine currently waiting on the event in one go;
clear() immediately resets the event so the next wait()
blocks again. With multiple consumers (a viewer, another viewer, any
other coroutine that needs to react to a new frame), no single
consumer is responsible for resetting the event, and nobody steals a
wake-up from anybody else.
Note
The bytes(...) wrap around the JPEG is load-bearing here.
bytearray() returns a view into the camera’s
image buffer; the very next snapshot() call
rewrites that buffer in place with the next frame.
latest_jpeg outlives the local img, so without the copy
every reader would see the slot shift under them on every
capture.
10.4.2. Per-client iterators read from the slot¶
The MJPEG stream handler stops calling csi0.snapshot() itself.
Instead, each FrameStream instance waits on the shared event and
reads from the shared bytes:
class FrameStream:
# One instance per connected client. Each one independently
# waits on the shared new_frame pulse; the capture loop is
# responsible for resetting the event between frames.
def __aiter__(self):
return self
async def __anext__(self):
await new_frame.wait()
if latest_jpeg is None:
return b''
return (b'--' + BOUNDARY + b'\r\n'
b'Content-Type: image/jpeg\r\n\r\n'
+ latest_jpeg + b'\r\n')
The snapshot route changes too: it no longer triggers a capture, it
returns whatever latest_jpeg currently holds:
@app.get('/snapshot.jpg')
async def snapshot(request):
if latest_jpeg is None:
return 'no frame yet', 503
return Response(
body=latest_jpeg,
headers={'Content-Type': 'image/jpeg'},
)
The (body, status) tuple is microdot’s shorthand for setting an
HTTP status code without constructing a microdot.Response.
503 says I’m here but not ready – the right code for “ask again in
a moment.”
10.4.3. Running capture alongside the server¶
main now has two top-level coroutines: the capture loop and the
HTTP server. asyncio.gather() runs them both, and if either
crashes the other is cancelled:
async def main():
await asyncio.gather(
capture_loop(),
app.start_server(host='0.0.0.0', port=80),
)
asyncio.run(main())
Now the sensor reads one frame per cycle no matter how many viewers
are connected. The first browser to /stream.jpg sees frames; so
does the second, the third, the tenth – they all share the same
capture, and the cam stays as responsive on its other routes.