10.3. Live-стримінг — один глядач

Браузери можуть відтворювати потоки Motion JPEG (MJPEG) безпосередньо всередині тегу <img>. Надайте браузеру одну HTTP-відповідь, яка ніколи не завершується, записуйте JPEG-файли, розділені межею multipart, і браузер відображатиме кожен кадр у міру його надходження.

The browser sends GET /stream.jpg; the cam responds with Content-Type multipart/x-mixed-replace and writes one JPEG-bodied part per frame until the browser disconnects.

Протокол передачі простий: один заголовок відповіді Content-Type: multipart/x-mixed-replace; boundary=frame, потім рядок --frame, Content-Type: image/jpeg, порожній рядок, байти JPEG, \r\n, і повтор. Браузер закриває з’єднання, коли тег <img> видаляється або вкладка закривається.

10.3.1. Захоплення без блокування

Блокуючий csi0.snapshot(), який використовувався раніше, зупиняє весь цикл подій, доки датчик не доставить кадр. Це було прийнятно, коли один запит запускав один знімок і більше нічого не виконувалось. Після відкриття потоку сервер повинен продовжувати обробляти інші запити, поки захоплюється наступний кадр — виклик захоплення повинен передавати керування циклу подій під час очікування датчика.

Шаблон — це тонка обгортка AsyncCSI, яка опитує csi.CSI.snapshot() у неблокуючому режимі і призупиняє корутину між опитуваннями. Розділ про asyncio описав цей шаблон у AsyncCSI; вбудуйте його в скрипт поки що:

import asyncio

class AsyncCSI:
    def __init__(self, *args, **kwargs):
        self._csi = csi.CSI(*args, **kwargs)

    def __getattr__(self, name):
        return getattr(self._csi, name)

    async def snapshot(self):
        while True:
            img = self._csi.snapshot(blocking=False)
            if img is not None:
                return img
            await asyncio.sleep_ms(0)

Усі інші методи CSI (reset(), pixformat(), framesize(), gain_db(), …) перенаправляються через __getattr__; лише snapshot() замінюється очікуваною версією, яка дозволяє циклу подій планувати інші корутини між опитуваннями.

Замініть простий csi.CSI() зі snapshot-маршруту на AsyncCSI():

csi0 = AsyncCSI()
csi0.reset()
csi0.pixformat(csi.RGB565)
csi0.framesize(csi.QVGA)

10.3.2. Тіла потокових відповідей — це класові ітератори

Тіло потокової відповіді — це просто об’єкт, по якому microdot ітерується за допомогою async for, надсилаючи кожен отриманий фрагмент у сокет. У CPython це зазвичай функція асинхронного генератораasync def з yield. MicroPython це не підтримує:

Примітка

asyncio MicroPython не підтримує функції асинхронних генераторів (async def name(): ... yield ...). Тіла потокових відповідей мають бути класовими асинхронними ітераторами з __aiter__, що повертає self, та __anext__, визначеним як async def.

Для MJPEG-потоку це означає клас, чий __anext__ очікує один кадр і повертає його, обрамленим у multipart-обгортку:

BOUNDARY = b'frame'

class FrameStream:
    def __aiter__(self):
        return self

    async def __anext__(self):
        img = await csi0.snapshot()
        jpeg = bytes(img.compress(quality=85).bytearray())
        return (b'--' + BOUNDARY + b'\r\n'
                b'Content-Type: image/jpeg\r\n\r\n'
                + jpeg + b'\r\n')

@app.get('/stream.jpg')
async def stream(request):
    return Response(
        body=FrameStream(),
        headers={
            'Content-Type':
                b'multipart/x-mixed-replace; boundary=' + BOUNDARY,
        },
    )

Екземпляр є новим для кожного запиту, тому кожен підключений клієнт отримує власний ітератор. Коли браузер відключається, microdot перестає очікувати __anext__ і ітератор видаляється збирачем сміття.

Примітка

Обгортка bytes(...) навколо JPEG є захисним заходом. bytearray() повертає вигляд у кадровий буфер камери, а наступний виклик snapshot() перезаписує цей буфер на місці. Обгортання в bytes копіює JPEG, щоб фрагмент, який microdot перебуває у процесі запису, залишався стабільним, навіть якщо flush записувача не завершився на момент повторного запуску __anext__.

10.3.3. Запуск сервера всередині asyncio

Попередній виклик app.run(host=..., port=...) є блокуючим. Обробнику MJPEG потрібно ділити цикл подій з опитуваннями знімків AsyncCSI, тому замініть app.run на start_server() всередині asyncio.run():

async def main():
    await app.start_server(host='0.0.0.0', port=80)

asyncio.run(main())

Обгортка asyncio.run() дозволяє серверу бути одним завданням серед кількох — корутина main тоді є природним місцем для запуску захоплення, виявлення руху та всього іншого, що повинно ділити цикл подій з HTTP-сервером.

10.3.4. Один глядач за раз

Кожен підключений клієнт запускає власний ітератор FrameStream, що означає, що кожен клієнт ініціює власний виклик csi0.snapshot(). Два браузери означають два зчитування датчика на інтервал кадру, три — три, і так далі. Датчик насправді не може доставляти кадри швидше, ніж його власна частота кадрів, тому запити шикуються в чергу один за одним і потік у всіх сповільнюється.

Виправлення полягає в єдиному спільному циклі захоплення, який публікує один кадр для багатьох читачів.