10.3. Transmisja na żywo – jeden widz¶
Przeglądarki potrafią renderować wieloczęściowe strumienie Motion JPEG (MJPEG) bezpośrednio wewnątrz znacznika <img>. Podaj przeglądarce jedną odpowiedź HTTP, która nigdy się nie kończy, zapisuj obrazy JPEG oddzielone granicą multipart, a przeglądarka wyświetli każdą ramkę w miarę jej napływania.
Łącze jest proste: jeden nagłówek odpowiedzi, Content-Type: multipart/x-mixed-replace; boundary=frame, następnie linia --frame, Content-Type: image/jpeg, pusta linia, bajty JPEG, \r\n, i powtórka. Przeglądarka zamyka połączenie, gdy znacznik <img> zostaje usunięty lub karta zostaje zamknięta.
10.3.1. Przechwytywanie bez blokowania¶
Blokujące csi0.snapshot() używane do tej pory zatrzymuje całą pętlę zdarzeń, dopóki sensor nie dostarczy ramki. To było w porządku, gdy jedno żądanie wyzwalało jeden zrzut obrazu i nic innego nie działało. Po otwarciu strumienia serwer musi nadal obsługiwać inne żądania podczas przechwytywania kolejnej ramki – wywołanie przechwytywania musi oddawać sterowanie pętli zdarzeń, gdy oczekuje na sensor.
Wzorcem jest cienka nakładka AsyncCSI, która odpytuje csi.CSI.snapshot() w trybie nieblokującym i usypia korutynę między odpytaniami. Rozdział o asyncio przeszedł przez ten wzorzec w AsyncCSI; na razie wstaw go bezpośrednio do skryptu:
import asyncio
class AsyncCSI:
def __init__(self, *args, **kwargs):
self._csi = csi.CSI(*args, **kwargs)
def __getattr__(self, name):
return getattr(self._csi, name)
async def snapshot(self):
while True:
img = self._csi.snapshot(blocking=False)
if img is not None:
return img
await asyncio.sleep_ms(0)
Każda inna metoda CSI (reset(), pixformat(), framesize(), gain_db(), …) jest przekazywana przez __getattr__; tylko snapshot() jest zastępowana wersją oczekiwalną, która pozwala pętli zdarzeń planować inne korutyny między odpytaniami.
Zamień zwykłe csi.CSI() z trasy zrzutu obrazu na AsyncCSI():
csi0 = AsyncCSI()
csi0.reset()
csi0.pixformat(csi.RGB565)
csi0.framesize(csi.QVGA)
10.3.2. Treści strumieniujące to iteratory oparte na klasach¶
Treść strumieniującej odpowiedzi to po prostu obiekt, po którym microdot iteruje za pomocą async for, wysyłając każdy wygenerowany fragment do gniazda. W CPython jest to zwykle funkcja generatora asynchronicznego – async def z yield. MicroPython tego nie obsługuje:
Informacja
asyncio w MicroPython nie obsługuje funkcji generatorów asynchronicznych (async def name(): ... yield ...). Treści strumieniujących odpowiedzi muszą być iteratorami asynchronicznymi opartymi na klasach, w których __aiter__ zwraca self, a __anext__ jest zdefiniowane jako async def.
Dla strumienia MJPEG oznacza to klasę, której __anext__ oczekuje na jedną ramkę i zwraca ją opakowaną w opakowanie multipart:
BOUNDARY = b'frame'
class FrameStream:
def __aiter__(self):
return self
async def __anext__(self):
img = await csi0.snapshot()
jpeg = bytes(img.compress(quality=85).bytearray())
return (b'--' + BOUNDARY + b'\r\n'
b'Content-Type: image/jpeg\r\n\r\n'
+ jpeg + b'\r\n')
@app.get('/stream.jpg')
async def stream(request):
return Response(
body=FrameStream(),
headers={
'Content-Type':
b'multipart/x-mixed-replace; boundary=' + BOUNDARY,
},
)
Instancja jest tworzona od nowa dla każdego żądania, więc każdy podłączony klient otrzymuje własny iterator. Gdy przeglądarka się rozłącza, microdot przestaje oczekiwać na __anext__, a iterator jest usuwany przez mechanizm zbierania nieużytków.
Informacja
Opakowanie bytes(...) wokół danych JPEG jest zabezpieczeniem. bytearray() zwraca widok do bufora obrazu kamery, a kolejne wywołanie snapshot() nadpisuje ten bufor w miejscu. Opakowanie w bytes kopiuje dane JPEG na zewnątrz, dzięki czemu fragment, który microdot jest w trakcie zapisywania, pozostaje stabilny, nawet jeśli opróżnianie zapisu nie zakończyło się do czasu, gdy __anext__ zostaje uruchomione ponownie.
10.3.3. Uruchamianie serwera wewnątrz asyncio¶
Wcześniejsze wywołanie app.run(host=..., port=...) jest blokujące. Procedura obsługi MJPEG musi współdzielić pętlę z odpytaniami zrzutu obrazu AsyncCSI, więc zamień app.run na start_server() wewnątrz asyncio.run():
async def main():
await app.start_server(host='0.0.0.0', port=80)
asyncio.run(main())
Opakowanie asyncio.run() pozwala, by serwer był jednym zadaniem spośród wielu – korutyna main jest wtedy naturalnym miejscem do uruchomienia przechwytywania, wykrywania ruchu i wszystkiego innego, co musi współdzielić pętlę z serwerem HTTP.
10.3.4. Jeden widz naraz¶
Każdy podłączony klient uruchamia własny iterator FrameStream, co oznacza, że każdy klient wyzwala własne wywołanie csi0.snapshot(). Dwie przeglądarki oznaczają dwa odczyty sensora na interwał ramki, trzy oznaczają trzy, i tak dalej. Sensor nie jest w stanie dostarczać ramek szybciej niż jego własna częstotliwość ramek, więc żądania ustawiają się w kolejce jedno za drugim i strumień wszystkich zwalnia.
Rozwiązaniem jest pojedyncza współdzielona pętla przechwytywania publikująca jedną ramkę dla wielu czytelników.