10.4. Eine Erfassungsschleife über mehrere Betrachter hinweg teilen¶
Wenn jeder verbundene Client unabhängig csi0.snapshot() aufruft, ist das verschwenderisch, und sobald zwei Streams gleichzeitig geöffnet sind, wird es schlimmer: Der Sensor liefert Einzelbilder mit seiner eigenen Rate, und jede doppelte Erfassung verlangsamt alle. Der richtige Ansatz ist eine Erfassungs-Koroutine, die „das neueste Einzelbild“ in einem gemeinsamen Slot veröffentlicht, plus Iteratoren pro Client, die aus dem Slot lesen.
10.4.1. Die Erfassungsaufgabe¶
Eine Hintergrund-Koroutine greift Einzelbilder so schnell ab, wie der Sensor sie liefert, JPEG-komprimiert jedes davon in ein gemeinsames bytes und setzt einen Impuls auf ein Ereignis, sodass jeder wartende Client aufwacht:
latest_jpeg = None
new_frame = asyncio.Event()
async def capture_loop():
global latest_jpeg
while True:
img = await csi0.snapshot()
latest_jpeg = bytes(img.compress(quality=85).bytearray())
new_frame.set()
new_frame.clear()
Das Paar set() / clear() ist das Impuls-Muster. set() entsperrt jede Koroutine, die gerade auf das Ereignis wartet, auf einen Schlag; clear() setzt das Ereignis sofort zurück, sodass das nächste wait() wieder blockiert. Bei mehreren Verbrauchern (ein Betrachter, ein weiterer Betrachter, jede andere Koroutine, die auf ein neues Einzelbild reagieren muss) ist kein einzelner Verbraucher für das Zurücksetzen des Ereignisses verantwortlich, und niemand stiehlt jemand anderem ein Aufwecken.
Bemerkung
Das bytes(...) um das JPEG herum ist hier tragend. bytearray() gibt eine Ansicht in den Bildpuffer der Kamera zurück; der unmittelbar nächste snapshot()-Aufruf überschreibt diesen Puffer an Ort und Stelle mit dem nächsten Einzelbild. latest_jpeg überlebt das lokale img, sodass ohne die Kopie jeder Leser den Slot bei jeder Erfassung unter sich verschieben sehen würde.
10.4.2. Iteratoren pro Client lesen aus dem Slot¶
Der MJPEG-Stream-Handler ruft selbst nicht mehr csi0.snapshot() auf. Stattdessen wartet jede FrameStream-Instanz auf das gemeinsame Ereignis und liest aus den gemeinsamen Bytes:
class FrameStream:
# One instance per connected client. Each one independently
# waits on the shared new_frame pulse; the capture loop is
# responsible for resetting the event between frames.
def __aiter__(self):
return self
async def __anext__(self):
await new_frame.wait()
if latest_jpeg is None:
return b''
return (b'--' + BOUNDARY + b'\r\n'
b'Content-Type: image/jpeg\r\n\r\n'
+ latest_jpeg + b'\r\n')
Die Snapshot-Route ändert sich ebenfalls: Sie löst keine Erfassung mehr aus, sondern gibt zurück, was latest_jpeg aktuell enthält:
@app.get('/snapshot.jpg')
async def snapshot(request):
if latest_jpeg is None:
return 'no frame yet', 503
return Response(
body=latest_jpeg,
headers={'Content-Type': 'image/jpeg'},
)
Das Tupel (body, status) ist microdots Kurzschreibweise zum Setzen eines HTTP-Statuscodes, ohne eine microdot.Response zu konstruieren. 503 sagt Ich bin da, aber noch nicht bereit – der richtige Code für „frag gleich noch einmal“.
10.4.3. Die Erfassung neben dem Server ausführen¶
main hat nun zwei Top-Level-Koroutinen: die Erfassungsschleife und den HTTP-Server. asyncio.gather() führt beide aus, und wenn eine abstürzt, wird die andere abgebrochen:
async def main():
await asyncio.gather(
capture_loop(),
app.start_server(host='0.0.0.0', port=80),
)
asyncio.run(main())
Jetzt liest der Sensor ein Einzelbild pro Zyklus, egal wie viele Betrachter verbunden sind. Der erste Browser auf /stream.jpg sieht Einzelbilder; ebenso der zweite, der dritte, der zehnte – sie alle teilen sich dieselbe Erfassung, und die Kamera bleibt bei ihren anderen Routen ebenso reaktionsschnell.