8.14. AsyncCSI¶
Un tipico script per OpenMV Cam termina con while True: img = csi0.snapshot() – un ciclo di snapshot bloccante che non ha alcun bisogno di asyncio. Nel momento in cui l’applicazione deve fare qualcos’altro insieme alle acquisizioni – ascoltare un pulsante, inviare dati a un peer, eseguire un task in background – la chiamata bloccante diventa un ostacolo. Mentre snapshot attende il frame successivo, l’event loop non è in esecuzione, quindi ogni altra coroutine del programma resta congelata finché il frame non arriva.
Questa pagina costruisce un piccolo wrapper attorno a CSI che trasforma snapshot in una coroutine compatibile con await. Il risultato è un sostituto drop-in che permette a un ciclo di acquisizione di coesistere con il resto di un programma asyncio.
8.14.1. I componenti¶
Una parte dell’API di CSI svolge gran parte del lavoro – snapshot() nella sua modalità non bloccante. Chiamare snapshot(blocking=False) restituisce il frame successivo (se è pronto) oppure None (se non lo è). La prima chiamata non bloccante avvia anche l’acquisizione DMA della camera se non era già in corso, così il wrapper non deve fare nulla di particolare per inizializzare il tutto.
L’altra parte è asyncio.sleep_ms(). Il wrapper interroga snapshot non bloccanti in un ciclo, cedendo il controllo all’event loop con await asyncio.sleep_ms(0) tra una verifica e l’altra, così che ogni altra coroutine pronta abbia la possibilità di essere eseguita prima del polling successivo.
8.14.2. Il wrapper¶
import asyncio
import csi
class AsyncCSI:
def __init__(self, *args, **kwargs):
self._csi = csi.CSI(*args, **kwargs)
def __getattr__(self, name):
return getattr(self._csi, name)
async def snapshot(self):
while True:
img = self._csi.snapshot(blocking=False)
if img is not None:
return img
await asyncio.sleep_ms(0)
Il costruttore incapsula un’istanza di CSI. __getattr__ inoltra ogni attributo che il wrapper non definisce direttamente – reset, pixformat, framesize, tutte le manopole del sensore – al sottostante CSI, così il wrapper appare identico all’oggetto non incapsulato, fatta eccezione per l’unico metodo che conta.
async def snapshot è la novità. Chiama snapshot(blocking=False); se la chiamata restituisce un’immagine, la coroutine la restituisce. Altrimenti cede il controllo all’event loop con await asyncio.sleep_ms(0) in modo che le altre coroutine abbiano la possibilità di essere eseguite, poi ripete il ciclo e riprova. La prima iterazione avvia il DMA; le iterazioni successive recuperano i frame man mano che diventano disponibili.
8.14.3. Un ciclo di snapshot in compagnia¶
Con il wrapper a disposizione, un ciclo di snapshot si inserisce in un programma asyncio più ampio nello stesso modo in cui lo fa qualsiasi altra coroutine. L’esempio seguente esegue tre coroutine in modo concorrente: il ciclo di acquisizione, un lampeggiatore LED e un heartbeat che stampa hello una volta al secondo:
import asyncio
import csi
from machine import LED
async def capture_loop(cam):
while True:
img = await cam.snapshot()
# process img here
async def blinker(led, period_ms):
while True:
led.on()
await asyncio.sleep_ms(period_ms)
led.off()
await asyncio.sleep_ms(period_ms)
async def hello(period_s):
while True:
print("hello")
await asyncio.sleep(period_s)
async def main():
cam = AsyncCSI()
cam.reset()
cam.pixformat(csi.RGB565)
cam.framesize(csi.QVGA)
asyncio.create_task(blinker(LED("LED_BLUE"), 200))
asyncio.create_task(hello(1))
await capture_loop(cam)
asyncio.run(main())
Tutte e tre le coroutine avanzano sullo stesso event loop. Mentre capture_loop cede il controllo tra un polling di snapshot non bloccante e l’altro, blinker commuta il LED e hello stampa. Mentre blinker e hello sono in attesa, capture_loop interroga la camera. L’intervallo di polling è breve – un singolo tick dell’event loop – quindi aggiunge una latenza trascurabile al momento in cui l’applicazione vede un nuovo frame.
Il ciclo di acquisizione non blocca l’event loop. Aggiungere altro lavoro concorrente – un client UART, per esempio – è semplicemente un’altra chiamata a create_task() all’interno di main.
Nota
L’impostazione dei framebuffer conta ancora in questa configurazione. La modalità a buffer singolo fa sì che snapshot(blocking=False) restituisca None finché il frame successivo non viene acquisito; il doppio o triplo buffering attenua questo effetto, così che il wrapper trovi solitamente un frame già bufferizzato in attesa al primo polling successivo all’elaborazione del frame precedente.