10.3. Diffusion en direct – un seul spectateur¶
Les navigateurs peuvent restituer directement des flux Motion JPEG (MJPEG) multipartites à l’intérieur d’une balise <img>. Donnez au navigateur une réponse HTTP qui ne se termine jamais, écrivez des JPEG séparés par une limite multipartite, et le navigateur affiche chaque trame à mesure qu’elle arrive.
Le câble est simple : un en-tête de réponse, Content-Type: multipart/x-mixed-replace; boundary=frame, puis une ligne --frame, Content-Type: image/jpeg, une ligne vide, les octets JPEG, \r\n, et on recommence. Le navigateur ferme la connexion lorsque la balise <img> est retirée ou que l’onglet est fermé.
10.3.1. Capturer sans bloquer¶
L’appel bloquant csi0.snapshot() utilisé jusqu’ici fige toute la boucle d’événements jusqu’à ce que le capteur fournisse une trame. C’était acceptable lorsqu’une requête déclenchait une capture et que rien d’autre ne tournait. Dès qu’un flux est ouvert, le serveur doit continuer à traiter les autres requêtes pendant que la trame suivante est capturée – l’appel de capture doit céder la main à la boucle d’événements pendant qu’il attend le capteur.
Le motif est un mince enrobage AsyncCSI qui interroge csi.CSI.snapshot() en mode non bloquant et met la coroutine en veille entre les interrogations. Le chapitre asyncio a parcouru ce motif dans AsyncCSI ; intégrez-le directement dans le script pour l’instant :
import asyncio
class AsyncCSI:
def __init__(self, *args, **kwargs):
self._csi = csi.CSI(*args, **kwargs)
def __getattr__(self, name):
return getattr(self._csi, name)
async def snapshot(self):
while True:
img = self._csi.snapshot(blocking=False)
if img is not None:
return img
await asyncio.sleep_ms(0)
Toutes les autres méthodes CSI (reset(), pixformat(), framesize(), gain_db(), …) sont transmises via __getattr__ ; seule snapshot() est remplacée par une version attendable (awaitable) qui permet à la boucle d’événements d’ordonnancer d’autres coroutines entre les interrogations.
Remplacez le simple csi.CSI() de la route snapshot par un AsyncCSI() :
csi0 = AsyncCSI()
csi0.reset()
csi0.pixformat(csi.RGB565)
csi0.framesize(csi.QVGA)
10.3.2. Les corps de diffusion sont des itérateurs basés sur des classes¶
Un corps de réponse en diffusion n’est qu’un objet que microdot itère avec async for, envoyant chaque fragment produit sur le socket. Sur CPython, il s’agit normalement d’une fonction génératrice asynchrone – async def avec yield. MicroPython ne le prend pas en charge :
Note
Le module asyncio de MicroPython ne prend pas en charge les fonctions génératrices asynchrones (async def name(): ... yield ...). Les corps de réponse en diffusion doivent être des itérateurs asynchrones basés sur des classes, avec __aiter__ qui renvoie self et __anext__ défini comme async def.
Pour un flux MJPEG, cela signifie une classe dont __anext__ attend une trame et la renvoie encadrée dans l’enveloppe multipartite :
BOUNDARY = b'frame'
class FrameStream:
def __aiter__(self):
return self
async def __anext__(self):
img = await csi0.snapshot()
jpeg = bytes(img.compress(quality=85).bytearray())
return (b'--' + BOUNDARY + b'\r\n'
b'Content-Type: image/jpeg\r\n\r\n'
+ jpeg + b'\r\n')
@app.get('/stream.jpg')
async def stream(request):
return Response(
body=FrameStream(),
headers={
'Content-Type':
b'multipart/x-mixed-replace; boundary=' + BOUNDARY,
},
)
L’instance est fraîche à chaque requête, de sorte que chaque client connecté obtient son propre itérateur. Lorsque le navigateur se déconnecte, microdot cesse d’attendre __anext__ et l’itérateur est récupéré par le ramasse-miettes.
Note
L’enrobage bytes(...) autour du JPEG est défensif. bytearray() renvoie une vue sur le tampon d’image de la caméra, et l’appel suivant à snapshot() réécrit ce tampon sur place. L’enrobage dans bytes copie le JPEG vers l’extérieur, de sorte que le fragment que microdot est en train d’écrire reste stable même si le vidage de l’écriture n’est pas terminé au moment où __anext__ s’exécute à nouveau.
10.3.3. Exécuter le serveur à l’intérieur d’asyncio¶
L’appel précédent app.run(host=..., port=...) est bloquant. Le gestionnaire MJPEG doit partager la boucle avec les interrogations snapshot d’AsyncCSI, alors remplacez app.run par start_server() à l’intérieur d’un asyncio.run() :
async def main():
await app.start_server(host='0.0.0.0', port=80)
asyncio.run(main())
L’enrobage asyncio.run() permet au serveur d’être une tâche parmi plusieurs – la coroutine main est alors l’endroit naturel pour lancer la capture, la détection de mouvement et tout ce qui doit partager la boucle avec le serveur HTTP.
10.3.4. Un seul spectateur à la fois¶
Chaque client connecté exécute son propre itérateur FrameStream, ce qui signifie que chaque client déclenche son propre appel csi0.snapshot(). Deux navigateurs signifient deux lectures du capteur par intervalle de trame, trois en signifient trois, et ainsi de suite. Le capteur ne peut pas réellement fournir des trames plus vite que sa propre cadence d’images, de sorte que les requêtes s’empilent les unes derrière les autres et que le flux de chacun ralentit.
La solution est une unique boucle de capture partagée qui publie une trame vers de nombreux lecteurs.