10.3. Live-Streaming – ein Betrachter¶
Browser können mehrteilige Motion-JPEG-Streams (MJPEG) direkt in einem <img>-Tag rendern. Geben Sie dem Browser eine HTTP-Antwort, die nie endet, schreiben Sie JPEGs, die durch eine Multipart-Begrenzung getrennt sind, und der Browser zeigt jedes Einzelbild an, sobald es eintrifft.
Die Leitung ist unkompliziert: ein Antwort-Header, Content-Type: multipart/x-mixed-replace; boundary=frame, dann eine --frame-Zeile, Content-Type: image/jpeg, eine Leerzeile, die JPEG-Bytes, \r\n, und das Ganze wiederholt sich. Der Browser schließt die Verbindung, wenn das <img> entfernt oder der Tab geschlossen wird.
10.3.1. Erfassen ohne Blockieren¶
Das bisher verwendete blockierende csi0.snapshot() legt die gesamte Ereignisschleife lahm, bis der Sensor ein Einzelbild liefert. Das war in Ordnung, als eine Anfrage einen Schnappschuss auslöste und sonst nichts lief. Sobald ein Stream geöffnet ist, muss der Server weiterhin andere Anfragen bearbeiten, während das nächste Einzelbild erfasst wird – der Erfassungsaufruf muss die Kontrolle an die Ereignisschleife abgeben, während er auf den Sensor wartet.
Das Muster ist ein schlanker AsyncCSI-Wrapper, der csi.CSI.snapshot() im nicht-blockierenden Modus abfragt und die Koroutine zwischen den Abfragen schlafen legt. Das asyncio-Kapitel hat dieses Muster in AsyncCSI durchgegangen; binden Sie es vorerst direkt in das Skript ein:
import asyncio
class AsyncCSI:
def __init__(self, *args, **kwargs):
self._csi = csi.CSI(*args, **kwargs)
def __getattr__(self, name):
return getattr(self._csi, name)
async def snapshot(self):
while True:
img = self._csi.snapshot(blocking=False)
if img is not None:
return img
await asyncio.sleep_ms(0)
Jede andere CSI-Methode (reset(), pixformat(), framesize(), gain_db(), …) wird über __getattr__ weitergeleitet; nur snapshot() wird durch eine awaitable Version ersetzt, die die Ereignisschleife andere Koroutinen zwischen den Abfragen einplanen lässt.
Tauschen Sie das nackte csi.CSI() aus der Snapshot-Route gegen ein AsyncCSI() aus:
csi0 = AsyncCSI()
csi0.reset()
csi0.pixformat(csi.RGB565)
csi0.framesize(csi.QVGA)
10.3.2. Streaming-Bodies sind klassenbasierte Iteratoren¶
Ein Streaming-Antwort-Body ist einfach ein Objekt, das microdot mit async for durchläuft und jeden gelieferten Chunk über den Socket sendet. Auf CPython ist dies normalerweise eine asynchrone Generatorfunktion – async def mit yield. MicroPython unterstützt das nicht:
Bemerkung
Das asyncio von MicroPython unterstützt keine asynchronen Generatorfunktionen (async def name(): ... yield ...). Streaming-Antwort-Bodies müssen klassenbasierte asynchrone Iteratoren sein, bei denen __aiter__ self zurückgibt und __anext__ als async def definiert ist.
Für einen MJPEG-Stream bedeutet das eine Klasse, deren __anext__ ein Einzelbild abwartet und es eingebettet in den Multipart-Wrapper zurückgibt:
BOUNDARY = b'frame'
class FrameStream:
def __aiter__(self):
return self
async def __anext__(self):
img = await csi0.snapshot()
jpeg = bytes(img.compress(quality=85).bytearray())
return (b'--' + BOUNDARY + b'\r\n'
b'Content-Type: image/jpeg\r\n\r\n'
+ jpeg + b'\r\n')
@app.get('/stream.jpg')
async def stream(request):
return Response(
body=FrameStream(),
headers={
'Content-Type':
b'multipart/x-mixed-replace; boundary=' + BOUNDARY,
},
)
Die Instanz ist pro Anfrage neu, sodass jeder verbundene Client seinen eigenen Iterator erhält. Wenn der Browser die Verbindung trennt, hört microdot auf, __anext__ abzuwarten, und der Iterator wird per Garbage Collection entfernt.
Bemerkung
Das bytes(...) um das JPEG herum ist defensiv. bytearray() gibt eine Ansicht in den Bildpuffer der Kamera zurück, und der nächste snapshot()-Aufruf überschreibt diesen Puffer an Ort und Stelle. Das Einpacken in bytes kopiert das JPEG heraus, sodass der Chunk, den microdot gerade schreibt, stabil bleibt – selbst wenn das Flushen des Writers noch nicht abgeschlossen ist, wenn __anext__ erneut läuft.
10.3.3. Den Server innerhalb von asyncio ausführen¶
Der frühere Aufruf app.run(host=..., port=...) ist blockierend. Der MJPEG-Handler muss die Schleife mit den AsyncCSI-Snapshot-Abfragen teilen, also tauschen Sie app.run gegen start_server() innerhalb eines asyncio.run() aus:
async def main():
await app.start_server(host='0.0.0.0', port=80)
asyncio.run(main())
Der asyncio.run()-Wrapper lässt den Server eine Aufgabe unter mehreren sein – die main-Koroutine ist dann der natürliche Ort, um Erfassung, Bewegungserkennung und alles andere zu starten, was die Schleife mit dem HTTP-Server teilen muss.
10.3.4. Ein Betrachter zur gleichen Zeit¶
Jeder verbundene Client führt seinen eigenen FrameStream-Iterator aus, was bedeutet, dass jeder Client seinen eigenen csi0.snapshot()-Aufruf auslöst. Zwei Browser bedeuten zwei Sensor-Lesevorgänge pro Einzelbildintervall, drei bedeuten drei und so weiter. Der Sensor kann Einzelbilder tatsächlich nicht schneller liefern als seine eigene Bildrate, also stauen sich die Anfragen hintereinander und jedermanns Stream wird langsamer.
Die Lösung ist eine einzige gemeinsame Erfassungsschleife, die ein Einzelbild für viele Leser veröffentlicht.