10.4. Chia sẻ một vòng lặp capture cho nhiều người xem¶
Mỗi client kết nối gọi csi0.snapshot() độc lập là lãng phí, và khi có hai luồng mở cùng lúc thì còn tệ hơn: cảm biến cung cấp khung hình theo tốc độ của chính nó, và mỗi lần capture trùng lặp làm chậm tất cả mọi người. Cách tiếp cận đúng là một coroutine capture duy nhất xuất bản "khung hình mới nhất" vào một slot dùng chung, cộng với các iterator trên mỗi client đọc từ slot đó.
10.4.1. Capture task¶
Một coroutine nền lấy khung hình nhanh nhất khi cảm biến cung cấp, nén JPEG từng cái vào một bytes dùng chung, và phát tín hiệu sự kiện để bất kỳ client nào đang chờ đều thức dậy:
latest_jpeg = None
new_frame = asyncio.Event()
async def capture_loop():
global latest_jpeg
while True:
img = await csi0.snapshot()
latest_jpeg = bytes(img.compress(quality=85).bytearray())
new_frame.set()
new_frame.clear()
Cặp set() / clear() là pattern pulse. set() giải phóng mọi coroutine hiện đang chờ trên sự kiện cùng lúc; clear() ngay lập tức reset sự kiện để lần wait() tiếp theo sẽ bị chặn lại. Với nhiều consumer (một người xem, một người xem khác, bất kỳ coroutine nào khác cần phản ứng với khung hình mới), không có consumer đơn lẻ nào chịu trách nhiệm reset sự kiện, và không ai chiếm mất tín hiệu thức dậy của người khác.
Ghi chú
Việc bọc bytes(...) quanh JPEG là thiết yếu ở đây. bytearray() trả về một view vào bộ đệm ảnh của camera; lần gọi snapshot() ngay tiếp theo sẽ ghi đè bộ đệm đó tại chỗ với khung hình tiếp theo. latest_jpeg tồn tại lâu hơn biến cục bộ img, vì vậy nếu không sao chép thì mọi reader sẽ thấy slot thay đổi dưới chân mình sau mỗi lần capture.
10.4.2. Iterator trên mỗi client đọc từ slot¶
Handler luồng MJPEG không còn tự gọi csi0.snapshot(). Thay vào đó, mỗi instance FrameStream chờ trên sự kiện dùng chung và đọc từ bytes dùng chung:
class FrameStream:
# One instance per connected client. Each one independently
# waits on the shared new_frame pulse; the capture loop is
# responsible for resetting the event between frames.
def __aiter__(self):
return self
async def __anext__(self):
await new_frame.wait()
if latest_jpeg is None:
return b''
return (b'--' + BOUNDARY + b'\r\n'
b'Content-Type: image/jpeg\r\n\r\n'
+ latest_jpeg + b'\r\n')
Route snapshot cũng thay đổi: nó không còn kích hoạt capture nữa, nó trả về bất cứ thứ gì latest_jpeg hiện đang giữ:
@app.get('/snapshot.jpg')
async def snapshot(request):
if latest_jpeg is None:
return 'no frame yet', 503
return Response(
body=latest_jpeg,
headers={'Content-Type': 'image/jpeg'},
)
Tuple (body, status) là cú pháp tắt của microdot để đặt HTTP status code mà không cần xây dựng một microdot.Response. 503 có nghĩa là Tôi đang ở đây nhưng chưa sẵn sàng -- mã đúng cho trường hợp "hỏi lại sau một lúc".
10.4.3. Chạy capture song song với server¶
main giờ có hai coroutine cấp cao nhất: vòng lặp capture và HTTP server. asyncio.gather() chạy cả hai, và nếu một cái bị crash thì cái kia sẽ bị hủy:
async def main():
await asyncio.gather(
capture_loop(),
app.start_server(host='0.0.0.0', port=80),
)
asyncio.run(main())
Giờ đây cảm biến đọc một khung hình mỗi chu kỳ bất kể có bao nhiêu người xem đang kết nối. Trình duyệt đầu tiên vào /stream.jpg thấy khung hình; trình duyệt thứ hai, thứ ba, thứ mười cũng vậy -- tất cả đều chia sẻ cùng một capture, và cam vẫn phản hồi tốt trên các route khác của nó.