10.3. Strujanje uživo – jedan gledatelj

Preglednici mogu izravno prikazivati višedijelne Motion JPEG (MJPEG) strujanja unutar <img> oznake. Predajte pregledniku jedan HTTP odgovor koji nikada ne završava, zapisujte JPEG-ove odvojene višedijelnom granicom, i preglednik prikazuje svaku sličicu čim stigne.

Preglednik šalje GET /stream.jpg; kamera odgovara s Content-Type multipart/x-mixed-replace i zapisuje jedan dio s JPEG tijelom po sličici dok se preglednik ne odspoji.

Linija je jednostavna: jedno zaglavlje odgovora, Content-Type: multipart/x-mixed-replace; boundary=frame, zatim redak --frame, Content-Type: image/jpeg, prazan redak, JPEG bajtovi, \r\n i ponavljanje. Preglednik zatvara vezu kada se <img> ukloni ili kada se kartica zatvori.

10.3.1. Snimanje bez blokiranja

Blokirajući csi0.snapshot() korišten dosad zaustavlja cijelu petlju događaja dok senzor ne isporuči sličicu. To je bilo u redu kada je jedan zahtjev pokrenuo jednu snimku i ništa drugo nije bilo pokrenuto. Kada je strujanje otvoreno, poslužitelj mora nastaviti obrađivati druge zahtjeve dok se snima sljedeća sličica – poziv snimanja mora prepustiti (yield) petlji događaja dok čeka na senzor.

Uzorak je tanki omotač AsyncCSI koji ispituje csi.CSI.snapshot() u neblokirajućem načinu rada i uspavljuje korutinu između ispitivanja. Poglavlje o asyncio prošlo je kroz ovaj uzorak u AsyncCSI; za sada ga umetnite izravno u skriptu:

import asyncio

class AsyncCSI:
    def __init__(self, *args, **kwargs):
        self._csi = csi.CSI(*args, **kwargs)

    def __getattr__(self, name):
        return getattr(self._csi, name)

    async def snapshot(self):
        while True:
            img = self._csi.snapshot(blocking=False)
            if img is not None:
                return img
            await asyncio.sleep_ms(0)

Svaka druga CSI metoda (reset(), pixformat(), framesize(), gain_db(), …) prosljeđuje se kroz __getattr__; samo se snapshot() zamjenjuje verzijom koja se može čekati (awaitable) i koja omogućuje petlji događaja da rasporedi druge korutine između ispitivanja.

Zamijenite goli csi.CSI() iz rute snimanja s AsyncCSI():

csi0 = AsyncCSI()
csi0.reset()
csi0.pixformat(csi.RGB565)
csi0.framesize(csi.QVGA)

10.3.2. Tijela strujanja su iteratori temeljeni na klasama

Tijelo odgovora strujanja samo je objekt kroz koji microdot iterira s async for, šaljući svaki proizvedeni dio kroz socket. Na CPythonu je to obično funkcija asinkronog generatoraasync def s yield. MicroPython to ne podržava:

Napomena

MicroPythonov asyncio ne podržava funkcije asinkronih generatora (async def name(): ... yield ...). Tijela odgovora strujanja moraju biti asinkroni iteratori temeljeni na klasama s __aiter__ koji vraća self i __anext__ definiranim kao async def.

Za MJPEG strujanje to znači klasu čiji __anext__ čeka jednu sličicu i vraća je uokvirenu u višedijelni omotač:

BOUNDARY = b'frame'

class FrameStream:
    def __aiter__(self):
        return self

    async def __anext__(self):
        img = await csi0.snapshot()
        jpeg = bytes(img.compress(quality=85).bytearray())
        return (b'--' + BOUNDARY + b'\r\n'
                b'Content-Type: image/jpeg\r\n\r\n'
                + jpeg + b'\r\n')

@app.get('/stream.jpg')
async def stream(request):
    return Response(
        body=FrameStream(),
        headers={
            'Content-Type':
                b'multipart/x-mixed-replace; boundary=' + BOUNDARY,
        },
    )

Instanca je svježa po zahtjevu, pa svaki povezani klijent dobiva vlastiti iterator. Kada se preglednik odspoji, microdot prestaje čekati __anext__ i iterator se sakuplja kao smeće.

Napomena

Omot bytes(...) oko JPEG-a je obrambeni. bytearray() vraća pogled u međuspremnik slike kamere, a sljedeći poziv snapshot() prepisuje taj međuspremnik na licu mjesta. Omatanje u bytes kopira JPEG van pa dio koji microdot upravo zapisuje ostaje stabilan čak i ako pisac ne dovrši pražnjenje (flush) do trenutka kada se __anext__ ponovno pokrene.

10.3.3. Pokretanje poslužitelja unutar asyncio

Raniji poziv app.run(host=..., port=...) blokira. MJPEG rukovatelj mora dijeliti petlju s ispitivanjima AsyncCSI snimanja, pa zamijenite app.run s start_server() unutar asyncio.run():

async def main():
    await app.start_server(host='0.0.0.0', port=80)

asyncio.run(main())

Omotač asyncio.run() omogućuje da poslužitelj bude jedan zadatak među nekoliko njih – korutina main tada je prirodno mjesto za pokretanje snimanja, detekcije pokreta i svega ostalog što mora dijeliti petlju s HTTP poslužiteljem.

10.3.4. Jedan gledatelj odjednom

Svaki povezani klijent pokreće vlastiti FrameStream iterator, što znači da svaki klijent pokreće vlastiti poziv csi0.snapshot(). Dva preglednika znače dva čitanja senzora po intervalu sličice, tri znače tri, i tako dalje. Senzor zapravo ne može isporučiti sličice brže od vlastite brzine sličica, pa se zahtjevi nižu jedan iza drugog i strujanje svima usporava.

Rješenje je jedna dijeljena petlja snimanja koja objavljuje jednu sličicu mnogim čitateljima.