10.3. Streaming live – un singur spectator

Browserele pot reda direct stream-uri Motion JPEG (MJPEG) multi-parte în interiorul unei etichete <img>. Dă browserului un singur răspuns HTTP care nu se termină niciodată, scrie JPEG-uri separate de o limită multipart, iar browserul afișează fiecare cadru pe măsură ce sosește.

Browserul trimite GET /stream.jpg; camera răspunde cu Content-Type multipart/x-mixed-replace și scrie câte o parte cu corp JPEG per cadru până când browserul se deconectează.

Firul este simplu: un antet de răspuns, Content-Type: multipart/x-mixed-replace; boundary=frame, apoi o linie --frame, Content-Type: image/jpeg, o linie goală, octeții JPEG, \r\n și se repetă. Browserul închide conexiunea când eticheta <img> este eliminată sau fila este închisă.

10.3.1. Captarea fără blocare

Apelul blocant csi0.snapshot() folosit până acum blochează întreaga buclă de evenimente până când senzorul livrează un cadru. Acest lucru era în regulă când o cerere declanșa un instantaneu și nimic altceva nu rula. Odată ce un stream este deschis, serverul trebuie să continue să gestioneze alte cereri în timp ce următorul cadru este captat – apelul de captare trebuie să cedeze controlul buclei de evenimente cât timp așteaptă senzorul.

Modelul este un wrapper subțire AsyncCSI care interoghează csi.CSI.snapshot() în mod non-blocant și pune corutina în repaus între interogări. Capitolul despre asyncio a parcurs acest model în AsyncCSI; inserează-l direct în script pentru moment:

import asyncio

class AsyncCSI:
    def __init__(self, *args, **kwargs):
        self._csi = csi.CSI(*args, **kwargs)

    def __getattr__(self, name):
        return getattr(self._csi, name)

    async def snapshot(self):
        while True:
            img = self._csi.snapshot(blocking=False)
            if img is not None:
                return img
            await asyncio.sleep_ms(0)

Orice altă metodă CSI (reset(), pixformat(), framesize(), gain_db(), …) este redirecționată prin __getattr__; doar snapshot() este înlocuită cu o versiune awaitable care permite buclei de evenimente să programeze alte corutine între interogări.

Înlocuiește csi.CSI() simplu din ruta de instantaneu cu un AsyncCSI():

csi0 = AsyncCSI()
csi0.reset()
csi0.pixformat(csi.RGB565)
csi0.framesize(csi.QVGA)

10.3.2. Corpurile de streaming sunt iteratoare bazate pe clase

Un corp de răspuns de streaming este pur și simplu un obiect pe care microdot îl iterează cu async for, trimițând fiecare bucată produsă pe socket. Pe CPython aceasta este în mod normal o funcție generator asincronăasync def cu yield. MicroPython nu acceptă asta:

Notă

asyncio din MicroPython nu acceptă funcții generator asincrone (async def name(): ... yield ...). Corpurile de răspuns de streaming trebuie să fie iteratoare asincrone bazate pe clase, cu __aiter__ returnând self și __anext__ definit ca async def.

Pentru un stream MJPEG asta înseamnă o clasă al cărei __anext__ așteaptă un cadru și îl returnează încadrat în wrapperul multipart:

BOUNDARY = b'frame'

class FrameStream:
    def __aiter__(self):
        return self

    async def __anext__(self):
        img = await csi0.snapshot()
        jpeg = bytes(img.compress(quality=85).bytearray())
        return (b'--' + BOUNDARY + b'\r\n'
                b'Content-Type: image/jpeg\r\n\r\n'
                + jpeg + b'\r\n')

@app.get('/stream.jpg')
async def stream(request):
    return Response(
        body=FrameStream(),
        headers={
            'Content-Type':
                b'multipart/x-mixed-replace; boundary=' + BOUNDARY,
        },
    )

Instanța este nouă per cerere, astfel încât fiecare client conectat își primește propriul iterator. Când browserul se deconectează, microdot încetează să aștepte __anext__ și iteratorul este eliberat de garbage collector.

Notă

Învelirea bytes(...) în jurul JPEG-ului este defensivă. bytearray() returnează o vedere asupra tamponului de imagine al camerei, iar următorul apel snapshot() rescrie acel tampon pe loc. Învelirea în bytes copiază JPEG-ul în afară, astfel încât bucata pe care microdot este în curs de a o scrie rămâne stabilă chiar dacă golirea celui ce scrie nu s-a terminat până când __anext__ rulează din nou.

10.3.3. Rularea serverului în interiorul asyncio

Apelul anterior app.run(host=..., port=...) este blocant. Handlerul MJPEG trebuie să partajeze bucla cu interogările de instantanee AsyncCSI, așa că înlocuiește app.run cu start_server() în interiorul unui asyncio.run():

async def main():
    await app.start_server(host='0.0.0.0', port=80)

asyncio.run(main())

Wrapperul asyncio.run() permite serverului să fie o sarcină printre mai multe – corutina main este apoi locul firesc pentru a lansa captarea, detectarea mișcării și orice altceva care trebuie să partajeze bucla cu serverul HTTP.

10.3.4. Un spectator pe rând

Fiecare client conectat își rulează propriul iterator FrameStream, ceea ce înseamnă că fiecare client declanșează propriul apel csi0.snapshot(). Două browsere înseamnă două citiri de senzor per interval de cadru, trei înseamnă trei, și așa mai departe. Senzorul nu poate de fapt livra cadre mai repede decât propria rată de cadre, așa că cererile se aliniază unele după altele și stream-ul tuturor încetinește.

Soluția este o singură buclă de captare partajată care publică un cadru către mai mulți cititori.