10.4. Спільний цикл захоплення для кількох глядачів¶
Кожен підключений клієнт, який незалежно викликає csi0.snapshot(), є марнотратством, а коли одночасно відкрито два потоки, ситуація погіршується: датчик доставляє кадри з власною швидкістю, і кожне дубльоване захоплення сповільнює всіх. Правильний підхід — одна корутина захоплення, яка публікує «останній кадр» у спільний слот, а також ітератори для кожного клієнта, які читають із цього слота.
10.4.1. Завдання захоплення¶
Фонова корутина захоплює кадри так швидко, як їх доставляє датчик, стискає кожен у JPEG у спільний bytes і генерує подію, щоб усі очікуючі клієнти прокинулись:
latest_jpeg = None
new_frame = asyncio.Event()
async def capture_loop():
global latest_jpeg
while True:
img = await csi0.snapshot()
latest_jpeg = bytes(img.compress(quality=85).bytearray())
new_frame.set()
new_frame.clear()
Пара set() / clear() — це шаблон імпульсу. set() одночасно розблоковує всі корутини, що наразі очікують на подію; clear() негайно скидає подію, щоб наступний wait() знову заблокувався. За наявності кількох споживачів (один глядач, інший глядач, будь-яка інша корутина, яка повинна реагувати на новий кадр), жоден окремий споживач не відповідає за скидання події, і ніхто не «краде» пробудження в іншого.
Примітка
Обгортка bytes(...) навколо JPEG тут є принципово важливою. bytearray() повертає вигляд у кадровий буфер камери; наступний виклик snapshot() одразу перезаписує цей буфер наступним кадром. latest_jpeg живе довше, ніж локальна img, тому без копіювання кожен читач бачив би, як слот змінюється під ним при кожному захопленні.
10.4.2. Ітератори для кожного клієнта читають із слота¶
Обробник потоку MJPEG більше не викликає csi0.snapshot() самостійно. Натомість кожен екземпляр FrameStream очікує на спільну подію і читає зі спільних байтів:
class FrameStream:
# One instance per connected client. Each one independently
# waits on the shared new_frame pulse; the capture loop is
# responsible for resetting the event between frames.
def __aiter__(self):
return self
async def __anext__(self):
await new_frame.wait()
if latest_jpeg is None:
return b''
return (b'--' + BOUNDARY + b'\r\n'
b'Content-Type: image/jpeg\r\n\r\n'
+ latest_jpeg + b'\r\n')
Маршрут snapshot теж змінюється: він більше не ініціює захоплення, а повертає те, що зараз зберігається в latest_jpeg:
@app.get('/snapshot.jpg')
async def snapshot(request):
if latest_jpeg is None:
return 'no frame yet', 503
return Response(
body=latest_jpeg,
headers={'Content-Type': 'image/jpeg'},
)
Кортеж (body, status) — це скорочений синтаксис microdot для встановлення HTTP-коду статусу без конструювання microdot.Response. 503 означає я тут, але не готовий — правильний код для «спробуйте ще раз за мить».
10.4.3. Запуск захоплення разом із сервером¶
main тепер має дві верхньорівневі корутини: цикл захоплення та HTTP-сервер. asyncio.gather() запускає їх обидві, і якщо одна з них завершиться з помилкою, інша скасовується:
async def main():
await asyncio.gather(
capture_loop(),
app.start_server(host='0.0.0.0', port=80),
)
asyncio.run(main())
Тепер датчик зчитує один кадр за цикл незалежно від кількості підключених глядачів. Перший браузер, що підключається до /stream.jpg, бачить кадри; так само другий, третій, десятий — усі вони використовують одне й те саме захоплення, і камера залишається так само чуйною на інших маршрутах.