10.3. Live streamen – één kijker

Browsers kunnen meerdelige Motion JPEG (MJPEG) streams rechtstreeks binnen een <img> tag weergeven. Geef de browser één HTTP-respons die nooit eindigt, schrijf JPEG’s gescheiden door een multipart-grens, en de browser toont elk frame zodra het binnenkomt.

De browser verstuurt GET /stream.jpg; de cam reageert met Content-Type multipart/x-mixed-replace en schrijft één deel met een JPEG-body per frame totdat de browser de verbinding verbreekt.

De verbinding is eenvoudig: één responsheader, Content-Type: multipart/x-mixed-replace; boundary=frame, dan een --frame regel, Content-Type: image/jpeg, een lege regel, de JPEG-bytes, \r\n, en herhaal. De browser sluit de verbinding wanneer de <img> wordt verwijderd of het tabblad wordt gesloten.

10.3.1. Vastleggen zonder te blokkeren

De blokkerende csi0.snapshot() die tot nu toe is gebruikt, legt de hele event loop stil totdat de sensor een frame levert. Dat was prima toen één verzoek één momentopname afvuurde en er verder niets draaide. Zodra een stream geopend is, moet de server andere verzoeken blijven afhandelen terwijl het volgende frame wordt vastgelegd – de vastlegaanroep moet teruggeven aan de event loop terwijl deze op de sensor wacht.

Het patroon is een dunne AsyncCSI wrapper die csi.CSI.snapshot() in niet-blokkerende modus pollt en de coroutine tussen polls laat slapen. Het asyncio-hoofdstuk doorliep dit patroon in AsyncCSI; neem het voorlopig inline op in het script:

import asyncio

class AsyncCSI:
    def __init__(self, *args, **kwargs):
        self._csi = csi.CSI(*args, **kwargs)

    def __getattr__(self, name):
        return getattr(self._csi, name)

    async def snapshot(self):
        while True:
            img = self._csi.snapshot(blocking=False)
            if img is not None:
                return img
            await asyncio.sleep_ms(0)

Elke andere CSI-methode (reset(), pixformat(), framesize(), gain_db(), …) wordt doorgegeven via __getattr__; alleen snapshot() wordt vervangen door een awaitbare versie die de event loop andere coroutines tussen polls laat plannen.

Vervang de kale csi.CSI() van de snapshot-route door een AsyncCSI():

csi0 = AsyncCSI()
csi0.reset()
csi0.pixformat(csi.RGB565)
csi0.framesize(csi.QVGA)

10.3.2. Streaming-bodies zijn klassegebaseerde iterators

Een streaming-responsbody is gewoon een object dat microdot itereert met async for, waarbij elke opgeleverde brok naar de socket wordt verstuurd. Op CPython is dit normaal gesproken een async-generatorfunctieasync def met yield. MicroPython ondersteunt dat niet:

Notitie

MicroPython’s asyncio ondersteunt geen async-generatorfuncties (async def name(): ... yield ...). Streaming-responsbodies moeten klassegebaseerde async-iterators zijn met __aiter__ die self teruggeeft en __anext__ gedefinieerd als async def.

Voor een MJPEG-stream betekent dat een klasse waarvan __anext__ één frame await en het teruggeeft, omkaderd in de multipart-wrapper:

BOUNDARY = b'frame'

class FrameStream:
    def __aiter__(self):
        return self

    async def __anext__(self):
        img = await csi0.snapshot()
        jpeg = bytes(img.compress(quality=85).bytearray())
        return (b'--' + BOUNDARY + b'\r\n'
                b'Content-Type: image/jpeg\r\n\r\n'
                + jpeg + b'\r\n')

@app.get('/stream.jpg')
async def stream(request):
    return Response(
        body=FrameStream(),
        headers={
            'Content-Type':
                b'multipart/x-mixed-replace; boundary=' + BOUNDARY,
        },
    )

De instantie is per verzoek vers, dus elke verbonden client krijgt zijn eigen iterator. Wanneer de browser de verbinding verbreekt, stopt microdot met het awaiten van __anext__ en wordt de iterator door garbage collection opgeruimd.

Notitie

De bytes(...) omhulling rond de JPEG is defensief. bytearray() geeft een view in de beeldbuffer van de camera terug, en de volgende snapshot() aanroep herschrijft die buffer ter plekke. Het omhullen in bytes kopieert de JPEG eruit, zodat de brok die microdot halverwege het schrijven is, stabiel blijft, zelfs als de flush van de schrijver nog niet klaar is tegen de tijd dat __anext__ opnieuw draait.

10.3.3. De server binnen asyncio draaien

De eerdere app.run(host=..., port=...) aanroep is blokkerend. De MJPEG-handler moet de loop delen met de AsyncCSI snapshot-polls, dus verruil app.run voor start_server() binnen een asyncio.run():

async def main():
    await app.start_server(host='0.0.0.0', port=80)

asyncio.run(main())

De asyncio.run() wrapper laat de server één taak onder meerdere zijn – de main coroutine is dan de natuurlijke plek om vastlegging, bewegingsdetectie en al het andere dat de loop met de HTTP-server moet delen, te starten.

10.3.4. Eén kijker tegelijk

Elke verbonden client draait zijn eigen FrameStream iterator, wat betekent dat elke client zijn eigen csi0.snapshot() aanroep triggert. Twee browsers betekent twee sensoruitlezingen per frame-interval, drie betekent drie, enzovoort. De sensor kan frames niet daadwerkelijk sneller leveren dan zijn eigen framesnelheid, dus de verzoeken stapelen zich achter elkaar op en ieders stream vertraagt.

De oplossing is één gedeelde vastleglus die één frame naar veel lezers publiceert.