10.3. Живая трансляция – один зритель¶
Браузеры могут отображать многочастные потоки Motion JPEG (MJPEG) прямо внутри тега <img>. Передайте браузеру один HTTP-ответ, который никогда не завершается, записывайте JPEG-кадры, разделённые многочастной границей, и браузер будет отображать каждый кадр по мере его поступления.
Канал устроен просто: один заголовок ответа, Content-Type: multipart/x-mixed-replace; boundary=frame, затем строка --frame, Content-Type: image/jpeg, пустая строка, байты JPEG, \r\n и повтор. Браузер закрывает соединение, когда <img> удаляется или вкладка закрывается.
10.3.1. Захват без блокировки¶
Использовавшийся до сих пор блокирующий csi0.snapshot() останавливает весь цикл событий, пока датчик не выдаст кадр. Это было нормально, когда один запрос вызывал один снимок и больше ничего не работало. Как только поток открыт, сервер должен продолжать обрабатывать другие запросы, пока захватывается следующий кадр – вызов захвата должен уступать управление циклу событий, пока он ожидает датчик.
Этот шаблон – тонкая обёртка AsyncCSI, которая опрашивает csi.CSI.snapshot() в неблокирующем режиме и усыпляет корутину между опросами. В главе об asyncio этот шаблон был разобран в AsyncCSI; пока что встройте его прямо в скрипт:
import asyncio
class AsyncCSI:
def __init__(self, *args, **kwargs):
self._csi = csi.CSI(*args, **kwargs)
def __getattr__(self, name):
return getattr(self._csi, name)
async def snapshot(self):
while True:
img = self._csi.snapshot(blocking=False)
if img is not None:
return img
await asyncio.sleep_ms(0)
Все остальные методы CSI (reset(), pixformat(), framesize(), gain_db(), …) перенаправляются через __getattr__; только snapshot() заменяется ожидаемой версией, которая позволяет циклу событий планировать другие корутины между опросами.
Замените голый csi.CSI() из маршрута снимка на AsyncCSI():
csi0 = AsyncCSI()
csi0.reset()
csi0.pixformat(csi.RGB565)
csi0.framesize(csi.QVGA)
10.3.2. Тела потоков – это итераторы на основе классов¶
Тело потокового ответа – это просто объект, который microdot перебирает с помощью async for, отправляя каждый выданный фрагмент в сокет. В CPython это обычно функция асинхронного генератора – async def с yield. MicroPython этого не поддерживает:
Примечание
asyncio в MicroPython не поддерживает функции асинхронных генераторов (async def name(): ... yield ...). Тела потоковых ответов должны быть асинхронными итераторами на основе классов, где __aiter__ возвращает self, а __anext__ определён как async def.
Для потока MJPEG это означает класс, чей __anext__ ожидает один кадр и возвращает его, обрамлённый в многочастную обёртку:
BOUNDARY = b'frame'
class FrameStream:
def __aiter__(self):
return self
async def __anext__(self):
img = await csi0.snapshot()
jpeg = bytes(img.compress(quality=85).bytearray())
return (b'--' + BOUNDARY + b'\r\n'
b'Content-Type: image/jpeg\r\n\r\n'
+ jpeg + b'\r\n')
@app.get('/stream.jpg')
async def stream(request):
return Response(
body=FrameStream(),
headers={
'Content-Type':
b'multipart/x-mixed-replace; boundary=' + BOUNDARY,
},
)
Экземпляр создаётся заново для каждого запроса, поэтому каждый подключённый клиент получает собственный итератор. Когда браузер отключается, microdot перестаёт ожидать __anext__, и итератор удаляется сборщиком мусора.
Примечание
Обёртка bytes(...) вокруг JPEG – это защитная мера. bytearray() возвращает представление буфера изображения камеры, а следующий вызов snapshot() перезаписывает этот буфер на месте. Обёртывание в bytes копирует JPEG наружу, чтобы фрагмент, который microdot записывает в данный момент, оставался стабильным, даже если сброс данных записывающей стороны не завершился к моменту повторного запуска __anext__.
10.3.3. Запуск сервера внутри asyncio¶
Предыдущий вызов app.run(host=..., port=...) является блокирующим. Обработчику MJPEG нужно разделять цикл с опросами снимков AsyncCSI, поэтому замените app.run на start_server() внутри asyncio.run():
async def main():
await app.start_server(host='0.0.0.0', port=80)
asyncio.run(main())
Обёртка asyncio.run() позволяет серверу быть одной из нескольких задач – корутина main тогда становится естественным местом для запуска захвата, обнаружения движения и всего остального, что должно разделять цикл с HTTP-сервером.
10.3.4. Один зритель за раз¶
Каждый подключённый клиент запускает собственный итератор FrameStream, что означает, что каждый клиент вызывает собственный csi0.snapshot(). Два браузера означают два чтения датчика на интервал кадра, три – три, и так далее. Датчик на самом деле не может выдавать кадры быстрее своей собственной частоты кадров, поэтому запросы выстраиваются в очередь друг за другом, и потоки у всех замедляются.
Решение – единый общий цикл захвата, публикующий один кадр для многих читателей.