10.4. Współdzielenie jednej pętli przechwytywania między widzami¶
Niezależne wywoływanie csi0.snapshot() przez każdego podłączonego klienta jest marnotrawstwem, a gdy otwarte są dwa strumienie naraz, robi się gorzej: sensor dostarcza ramki we własnym tempie, a każde zduplikowane przechwytywanie spowalnia wszystkich. Właściwym podejściem jest jedna korutyna przechwytywania, która publikuje „najnowszą ramkę” do współdzielonego miejsca, plus iteratory dla poszczególnych klientów, które z tego miejsca odczytują.
10.4.1. Zadanie przechwytywania¶
Korutyna działająca w tle pobiera ramki tak szybko, jak dostarcza je sensor, kompresuje każdą do współdzielonego obiektu bytes w formacie JPEG i wyzwala zdarzenie, aby każdy oczekujący klient się obudził:
latest_jpeg = None
new_frame = asyncio.Event()
async def capture_loop():
global latest_jpeg
while True:
img = await csi0.snapshot()
latest_jpeg = bytes(img.compress(quality=85).bytearray())
new_frame.set()
new_frame.clear()
Para set() / clear() to wzorzec impulsu. set() odblokowuje za jednym razem każdą korutynę aktualnie oczekującą na zdarzenie; clear() natychmiast resetuje zdarzenie, więc następne wait() znów blokuje. Przy wielu odbiorcach (widz, kolejny widz, dowolna inna korutyna, która musi zareagować na nową ramkę) żaden pojedynczy odbiorca nie jest odpowiedzialny za resetowanie zdarzenia i nikt nie kradnie pobudki innym.
Informacja
Opakowanie bytes(...) wokół danych JPEG jest tutaj krytyczne. bytearray() zwraca widok do bufora obrazu kamery; kolejne wywołanie snapshot() nadpisuje ten bufor w miejscu następną ramką. latest_jpeg przeżywa lokalny img, więc bez kopii każdy czytelnik widziałby, jak miejsce przesuwa się pod nim przy każdym przechwyceniu.
10.4.2. Iteratory dla poszczególnych klientów odczytują z miejsca¶
Procedura obsługi strumienia MJPEG przestaje sama wywoływać csi0.snapshot(). Zamiast tego każda instancja FrameStream oczekuje na współdzielone zdarzenie i odczytuje ze współdzielonego obiektu bytes:
class FrameStream:
# One instance per connected client. Each one independently
# waits on the shared new_frame pulse; the capture loop is
# responsible for resetting the event between frames.
def __aiter__(self):
return self
async def __anext__(self):
await new_frame.wait()
if latest_jpeg is None:
return b''
return (b'--' + BOUNDARY + b'\r\n'
b'Content-Type: image/jpeg\r\n\r\n'
+ latest_jpeg + b'\r\n')
Trasa zrzutu obrazu również się zmienia: nie wyzwala już przechwytywania, lecz zwraca to, co aktualnie przechowuje latest_jpeg:
@app.get('/snapshot.jpg')
async def snapshot(request):
if latest_jpeg is None:
return 'no frame yet', 503
return Response(
body=latest_jpeg,
headers={'Content-Type': 'image/jpeg'},
)
Krotka (body, status) to skrót microdot do ustawienia kodu statusu HTTP bez konstruowania obiektu microdot.Response. 503 mówi jestem tu, ale nie jestem gotowy – właściwy kod dla „zapytaj ponownie za chwilę”.
10.4.3. Uruchamianie przechwytywania równolegle z serwerem¶
main ma teraz dwie korutyny najwyższego poziomu: pętlę przechwytywania i serwer HTTP. asyncio.gather() uruchamia obie, a jeśli któraś ulegnie awarii, druga zostaje anulowana:
async def main():
await asyncio.gather(
capture_loop(),
app.start_server(host='0.0.0.0', port=80),
)
asyncio.run(main())
Teraz sensor odczytuje jedną ramkę na cykl, niezależnie od tego, ilu widzów jest podłączonych. Pierwsza przeglądarka, która trafi na /stream.jpg, widzi ramki; tak samo druga, trzecia, dziesiąta – wszystkie współdzielą to samo przechwytywanie, a kamera pozostaje równie responsywna na swoich pozostałych trasach.