10.3. Streaming live – un solo spettatore¶
I browser possono renderizzare direttamente flussi Motion JPEG (MJPEG) multipart all’interno di un tag <img>. Consegna al browser una risposta HTTP che non termina mai, scrivi JPEG separati da un boundary multipart, e il browser mostra ogni frame man mano che arriva.
Il flusso sul canale è semplice: un header di risposta, Content-Type: multipart/x-mixed-replace; boundary=frame, poi una riga --frame, Content-Type: image/jpeg, una riga vuota, i byte JPEG, \r\n e si ripete. Il browser chiude la connessione quando l’elemento <img> viene rimosso o la scheda viene chiusa.
10.3.1. Catturare senza bloccare¶
La csi0.snapshot() bloccante usata finora ferma l’intero event loop finché il sensore non consegna un frame. Andava bene quando una richiesta scatenava uno snapshot e nient’altro era in esecuzione. Una volta aperto uno stream, il server deve continuare a gestire altre richieste mentre il frame successivo viene catturato – la chiamata di cattura deve cedere il controllo all’event loop mentre è in attesa del sensore.
Il pattern è un sottile wrapper AsyncCSI che interroga csi.CSI.snapshot() in modalità non bloccante e mette in sleep la coroutine tra un’interrogazione e l’altra. Il capitolo su asyncio ha illustrato questo pattern in AsyncCSI; per ora inseriscilo direttamente nello script:
import asyncio
class AsyncCSI:
def __init__(self, *args, **kwargs):
self._csi = csi.CSI(*args, **kwargs)
def __getattr__(self, name):
return getattr(self._csi, name)
async def snapshot(self):
while True:
img = self._csi.snapshot(blocking=False)
if img is not None:
return img
await asyncio.sleep_ms(0)
Ogni altro metodo CSI (reset(), pixformat(), framesize(), gain_db(), …) viene inoltrato tramite __getattr__; solo snapshot() viene sostituito con una versione awaitable che permette all’event loop di pianificare altre coroutine tra un’interrogazione e l’altra.
Sostituisci la nuda csi.CSI() della route dello snapshot con una AsyncCSI():
csi0 = AsyncCSI()
csi0.reset()
csi0.pixformat(csi.RGB565)
csi0.framesize(csi.QVGA)
10.3.2. I corpi in streaming sono iteratori basati su classe¶
Un corpo di risposta in streaming è semplicemente un oggetto che microdot itera con async for, inviando ogni chunk prodotto lungo il socket. Su CPython di norma è una funzione async generator – async def con yield. MicroPython non lo supporta:
Nota
L”asyncio di MicroPython non supporta le funzioni async-generator (async def name(): ... yield ...). I corpi di risposta in streaming devono essere iteratori asincroni basati su classe, con __aiter__ che restituisce self e __anext__ definito come async def.
Per uno stream MJPEG ciò significa una classe il cui __anext__ attende un frame e lo restituisce incapsulato nel wrapper multipart:
BOUNDARY = b'frame'
class FrameStream:
def __aiter__(self):
return self
async def __anext__(self):
img = await csi0.snapshot()
jpeg = bytes(img.compress(quality=85).bytearray())
return (b'--' + BOUNDARY + b'\r\n'
b'Content-Type: image/jpeg\r\n\r\n'
+ jpeg + b'\r\n')
@app.get('/stream.jpg')
async def stream(request):
return Response(
body=FrameStream(),
headers={
'Content-Type':
b'multipart/x-mixed-replace; boundary=' + BOUNDARY,
},
)
L’istanza è nuova per ogni richiesta, così ciascun client connesso ottiene il proprio iteratore. Quando il browser si disconnette, microdot smette di attendere __anext__ e l’iteratore viene raccolto dal garbage collector.
Nota
L’incapsulamento bytes(...) attorno al JPEG è una misura difensiva. bytearray() restituisce una vista nel buffer immagine della camera, e la successiva chiamata a snapshot() riscrive quel buffer sul posto. Incapsularlo in bytes copia il JPEG all’esterno, così il chunk che microdot sta scrivendo rimane stabile anche se il flush dello scrittore non è terminato nel momento in cui __anext__ viene eseguito di nuovo.
10.3.3. Eseguire il server dentro asyncio¶
La precedente chiamata app.run(host=..., port=...) è bloccante. L’handler MJPEG deve condividere il loop con le interrogazioni snapshot di AsyncCSI, quindi sostituisci app.run con start_server() all’interno di un asyncio.run():
async def main():
await app.start_server(host='0.0.0.0', port=80)
asyncio.run(main())
Il wrapper asyncio.run() permette al server di essere un task tra tanti – la coroutine main diventa quindi il punto naturale dove avviare la cattura, il rilevamento del movimento e qualsiasi altra cosa debba condividere il loop con il server HTTP.
10.3.4. Uno spettatore alla volta¶
Ogni client connesso esegue il proprio iteratore FrameStream, il che significa che ogni client scatena la propria chiamata csi0.snapshot(). Due browser significano due letture del sensore per intervallo di frame, tre ne significano tre, e così via. Il sensore non può in realtà consegnare frame più velocemente del proprio frame rate, quindi le richieste si accodano l’una dietro l’altra e lo stream di tutti rallenta.
La soluzione è un unico loop di cattura condiviso che pubblica un frame a molti lettori.