10.4. Совместное использование одного цикла захвата несколькими зрителями¶
Когда каждый подключённый клиент независимо вызывает csi0.snapshot(), это расточительно, а как только одновременно открыты два потока, становится хуже: датчик выдаёт кадры со своей собственной частотой, и каждый дублирующий захват замедляет всех. Правильный подход – одна корутина захвата, которая публикует «последний кадр» в общий слот, плюс отдельные для каждого клиента итераторы, читающие из этого слота.
10.4.1. Задача захвата¶
Фоновая корутина захватывает кадры так быстро, как их выдаёт датчик, сжимает каждый в JPEG в общий bytes и подаёт импульс события, чтобы любой ожидающий клиент проснулся:
latest_jpeg = None
new_frame = asyncio.Event()
async def capture_loop():
global latest_jpeg
while True:
img = await csi0.snapshot()
latest_jpeg = bytes(img.compress(quality=85).bytearray())
new_frame.set()
new_frame.clear()
Пара set() / clear() – это шаблон импульса. set() разблокирует все корутины, ожидающие события в данный момент, разом; clear() немедленно сбрасывает событие, чтобы следующий wait() снова заблокировался. При нескольких потребителях (зритель, ещё один зритель, любая другая корутина, которой нужно реагировать на новый кадр) ни один потребитель не отвечает за сброс события, и никто не крадёт пробуждение у другого.
Примечание
Обёртка bytes(...) вокруг JPEG здесь несёт важную нагрузку. bytearray() возвращает представление буфера изображения камеры; самый следующий вызов snapshot() перезаписывает этот буфер на месте следующим кадром. latest_jpeg живёт дольше локального img, поэтому без копии каждый читатель видел бы, как слот смещается под ним при каждом захвате.
10.4.2. Итераторы каждого клиента читают из слота¶
Обработчик потока MJPEG больше не вызывает csi0.snapshot() сам. Вместо этого каждый экземпляр FrameStream ожидает общее событие и читает из общих байтов:
class FrameStream:
# One instance per connected client. Each one independently
# waits on the shared new_frame pulse; the capture loop is
# responsible for resetting the event between frames.
def __aiter__(self):
return self
async def __anext__(self):
await new_frame.wait()
if latest_jpeg is None:
return b''
return (b'--' + BOUNDARY + b'\r\n'
b'Content-Type: image/jpeg\r\n\r\n'
+ latest_jpeg + b'\r\n')
Маршрут снимка тоже меняется: он больше не запускает захват, а возвращает то, что в данный момент содержит latest_jpeg:
@app.get('/snapshot.jpg')
async def snapshot(request):
if latest_jpeg is None:
return 'no frame yet', 503
return Response(
body=latest_jpeg,
headers={'Content-Type': 'image/jpeg'},
)
Кортеж (body, status) – это сокращённая запись в microdot для установки кода состояния HTTP без создания microdot.Response. 503 означает я здесь, но не готов – правильный код для «спросите ещё раз через мгновение».
10.4.3. Запуск захвата вместе с сервером¶
Теперь у main две корутины верхнего уровня: цикл захвата и HTTP-сервер. asyncio.gather() запускает обе, и если одна из них падает, другая отменяется:
async def main():
await asyncio.gather(
capture_loop(),
app.start_server(host='0.0.0.0', port=80),
)
asyncio.run(main())
Теперь датчик читает один кадр за цикл независимо от того, сколько зрителей подключено. Первый браузер на /stream.jpg видит кадры; так же видят и второй, и третий, и десятый – все они используют один и тот же захват, а камера остаётся столь же отзывчивой на других своих маршрутах.