10.4. Sdílení jedné smyčky zachytávání mezi diváky

Když každý připojený klient volá csi0.snapshot() nezávisle, je to plýtvání, a jakmile jsou otevřeny dva streamy najednou, je to horší: senzor dodává snímky vlastní rychlostí a každé duplikované zachycení všechny zpomaluje. Správným přístupem je jedna korutina zachytávání, která publikuje „nejnovější snímek“ do sdíleného slotu, plus iterátory pro jednotlivé klienty, které čtou ze slotu.

Jedna úloha zachytávání zapisuje JPEG bajty do jediného slotu latest_jpeg; tři iterátory stream-klientů čtou ze slotu a každý čeká na sdílenou událost new_frame.

10.4.1. Úloha zachytávání

Korutina na pozadí získává snímky tak rychle, jak je senzor dodává, každý JPEG zkomprimuje do sdíleného bytes a vyšle puls události, aby se probudil každý čekající klient:

latest_jpeg = None
new_frame = asyncio.Event()

async def capture_loop():
    global latest_jpeg
    while True:
        img = await csi0.snapshot()
        latest_jpeg = bytes(img.compress(quality=85).bytearray())
        new_frame.set()
        new_frame.clear()

Dvojice set() / clear() je vzor puls. set() odblokuje najednou každou korutinu, která aktuálně čeká na událost; clear() událost okamžitě resetuje, takže další wait() zase zablokuje. S více konzumenty (jeden divák, další divák, jakákoli jiná korutina, která potřebuje reagovat na nový snímek) není žádný jednotlivý konzument zodpovědný za resetování události a nikdo nikomu neukradne probuzení.

Poznámka

Obal bytes(...) kolem JPEGu je zde nosný. bytearray() vrací pohled do obrazového bufferu kamery; úplně další volání snapshot() tento buffer přepíše na místě dalším snímkem. latest_jpeg přežívá lokální img, takže bez kopie by každý čtenář viděl, jak se mu slot při každém zachycení posouvá pod rukama.

10.4.2. Iterátory jednotlivých klientů čtou ze slotu

Handler MJPEG streamu sám přestane volat csi0.snapshot(). Místo toho každá instance FrameStream čeká na sdílenou událost a čte ze sdílených bajtů:

class FrameStream:
    # One instance per connected client. Each one independently
    # waits on the shared new_frame pulse; the capture loop is
    # responsible for resetting the event between frames.

    def __aiter__(self):
        return self

    async def __anext__(self):
        await new_frame.wait()
        if latest_jpeg is None:
            return b''
        return (b'--' + BOUNDARY + b'\r\n'
                b'Content-Type: image/jpeg\r\n\r\n'
                + latest_jpeg + b'\r\n')

Cesta snapshot se také mění: již nespouští zachycení, vrací cokoli, co latest_jpeg aktuálně drží:

@app.get('/snapshot.jpg')
async def snapshot(request):
    if latest_jpeg is None:
        return 'no frame yet', 503
    return Response(
        body=latest_jpeg,
        headers={'Content-Type': 'image/jpeg'},
    )

N-tice (body, status) je microdotová zkratka pro nastavení HTTP stavového kódu bez konstrukce microdot.Response. 503 říká jsem zde, ale nejsem připraven – správný kód pro „zeptejte se znovu za chvíli.“

10.4.3. Spuštění zachytávání spolu se serverem

main nyní má dvě korutiny nejvyšší úrovně: smyčku zachytávání a HTTP server. asyncio.gather() spustí obě, a pokud kterákoli spadne, druhá je zrušena:

async def main():
    await asyncio.gather(
        capture_loop(),
        app.start_server(host='0.0.0.0', port=80),
    )

asyncio.run(main())

Nyní senzor čte jeden snímek na cyklus bez ohledu na to, kolik diváků je připojeno. První prohlížeč na /stream.jpg vidí snímky; stejně tak druhý, třetí, desátý – všichni sdílejí stejné zachytávání a kamera zůstává stejně responzivní na svých ostatních cestách.