8.14. AsyncCSI¶
Un script típico de la OpenMV Cam termina en while True: img = csi0.snapshot() – un bucle de captura bloqueante que no necesita asyncio en absoluto. En el momento en que la aplicación tiene que hacer algo más junto con las capturas – escuchar un botón, enviar datos a un par, ejecutar una tarea en segundo plano – la llamada bloqueante se interpone. Mientras snapshot espera el siguiente fotograma, el bucle de eventos no se está ejecutando, por lo que cualquier otra corrutina del programa queda congelada hasta que llega el fotograma.
Esta página construye un pequeño envoltorio alrededor de CSI que convierte snapshot en una corrutina compatible con await. El resultado es un reemplazo directo que permite que un bucle de captura coexista con el resto de un programa asyncio.
8.14.1. Las piezas¶
Una pieza de la API de CSI hace la mayor parte del trabajo – snapshot() en su modo no bloqueante. Llamar a snapshot(blocking=False) devuelve el siguiente fotograma (si hay uno listo) o None (si no lo hay). La primera llamada no bloqueante también inicia la captura DMA de la cámara si no estaba ya en marcha, por lo que el envoltorio no tiene que hacer nada especial para arrancar.
La otra pieza es asyncio.sleep_ms(). El envoltorio sondea capturas no bloqueantes en un bucle, cediendo el control al bucle de eventos con await asyncio.sleep_ms(0) entre comprobaciones, de modo que cualquier otra corrutina lista tenga oportunidad de ejecutarse antes del siguiente sondeo.
8.14.2. El envoltorio¶
import asyncio
import csi
class AsyncCSI:
def __init__(self, *args, **kwargs):
self._csi = csi.CSI(*args, **kwargs)
def __getattr__(self, name):
return getattr(self._csi, name)
async def snapshot(self):
while True:
img = self._csi.snapshot(blocking=False)
if img is not None:
return img
await asyncio.sleep_ms(0)
El constructor envuelve una instancia de CSI. __getattr__ reenvía todos los atributos que el propio envoltorio no define – reset, pixformat, framesize, todos los ajustes del sensor – al CSI subyacente, de modo que el envoltorio parece idéntico al objeto sin envolver salvo por el único método que importa.
async def snapshot es la pieza nueva. Llama a snapshot(blocking=False); si la llamada devuelve una imagen, la corrutina la retorna. De lo contrario, cede el control al bucle de eventos con await asyncio.sleep_ms(0) para que otras corrutinas tengan oportunidad de ejecutarse, y luego vuelve al bucle e intenta de nuevo. La primera iteración inicia el DMA; las iteraciones siguientes recogen los fotogramas a medida que están disponibles.
8.14.3. Un bucle de captura acompañado¶
Con el envoltorio en su sitio, un bucle de captura encaja en un programa asyncio más grande igual que cualquier otra corrutina. El ejemplo siguiente ejecuta tres corrutinas de forma concurrente: el bucle de captura, un parpadeador de LED y un latido que imprime hello una vez por segundo:
import asyncio
import csi
from machine import LED
async def capture_loop(cam):
while True:
img = await cam.snapshot()
# process img here
async def blinker(led, period_ms):
while True:
led.on()
await asyncio.sleep_ms(period_ms)
led.off()
await asyncio.sleep_ms(period_ms)
async def hello(period_s):
while True:
print("hello")
await asyncio.sleep(period_s)
async def main():
cam = AsyncCSI()
cam.reset()
cam.pixformat(csi.RGB565)
cam.framesize(csi.QVGA)
asyncio.create_task(blinker(LED("LED_BLUE"), 200))
asyncio.create_task(hello(1))
await capture_loop(cam)
asyncio.run(main())
Las tres corrutinas avanzan en el mismo bucle de eventos. Mientras capture_loop cede el control entre sondeos de captura no bloqueantes, blinker conmuta el LED y hello imprime. Mientras blinker y hello están durmiendo, capture_loop sondea la cámara. El intervalo de sondeo es corto – un solo tick del bucle de eventos – por lo que añade una latencia insignificante al momento en que la aplicación ve un nuevo fotograma.
El bucle de captura no bloquea el bucle de eventos. Añadir más trabajo concurrente – un cliente UART, por ejemplo – es simplemente otra llamada a create_task() dentro de main.
Nota
El ajuste de framebuffers sigue importando en esta forma. El modo de búfer único hace que snapshot(blocking=False) devuelva None hasta que se capture el siguiente fotograma; el doble o triple búfer lo suaviza, de modo que el envoltorio normalmente encuentra un fotograma en búfer esperando en el primer sondeo después de que se haya procesado el fotograma anterior.