11.14. AsyncCSI¶
A typical OpenMV Cam script ends in
while True: img = csi0.snapshot() – a blocking
snapshot loop that does not need asyncio at all. The
moment the application has to do something else alongside
captures – listen for a button, send data to a peer, run a
background task – the blocking call gets in the way. While
snapshot is waiting on the next frame the event loop is
not running, so every other coroutine in the program is
frozen until the frame arrives.
This page builds a small wrapper around CSI
that turns snapshot into an await-friendly
coroutine. The result is a drop-in replacement that lets a
capture loop coexist with the rest of an asyncio program.
11.14.1. The pieces¶
One piece of the CSI API does most of the work
– snapshot() in its non-blocking mode.
Calling snapshot(blocking=False) either returns the next
frame (if one is ready) or None (if not). The first
non-blocking call also starts the camera’s DMA capture if
it wasn’t already running, so the wrapper does not have to
do anything special to bootstrap.
The other piece is asyncio.sleep_ms(). The wrapper
polls non-blocking snapshots in a loop, yielding to the
event loop with await asyncio.sleep_ms(0) between
checks so every other ready coroutine gets a chance to run
before the next poll.
11.14.2. The wrapper¶
import asyncio
import csi
class AsyncCSI:
def __init__(self, *args, **kwargs):
self._csi = csi.CSI(*args, **kwargs)
def __getattr__(self, name):
return getattr(self._csi, name)
async def snapshot(self):
while True:
img = self._csi.snapshot(blocking=False)
if img is not None:
return img
await asyncio.sleep_ms(0)
The constructor wraps a CSI instance.
__getattr__ forwards every attribute the wrapper does
not itself define – reset, pixformat, framesize,
all the sensor knobs – to the underlying
CSI, so the wrapper looks identical to the
unwrapped object except for the one method that matters.
async def snapshot is the new piece. It calls
snapshot(blocking=False); if the call returns an image,
the coroutine returns it. Otherwise it yields back to the
event loop with await asyncio.sleep_ms(0) so other
coroutines get a chance to run, then loops back and tries
again. The first iteration starts the DMA; subsequent
iterations pick up frames as they become available.
11.14.3. A snapshot loop with company¶
With the wrapper in place, a snapshot loop fits into a
larger asyncio program the same way any other coroutine
does. The example below runs three coroutines concurrently:
the capture loop, an LED blinker, and a heartbeat that
prints hello once a second:
import asyncio
import csi
from machine import LED
async def capture_loop(cam):
while True:
img = await cam.snapshot()
# process img here
async def blinker(led, period_ms):
while True:
led.on()
await asyncio.sleep_ms(period_ms)
led.off()
await asyncio.sleep_ms(period_ms)
async def hello(period_s):
while True:
print("hello")
await asyncio.sleep(period_s)
async def main():
cam = AsyncCSI()
cam.reset()
cam.pixformat(csi.RGB565)
cam.framesize(csi.QVGA)
asyncio.create_task(blinker(LED("LED_BLUE"), 200))
asyncio.create_task(hello(1))
await capture_loop(cam)
asyncio.run(main())
All three coroutines make progress on the same event loop.
While capture_loop is yielding between non-blocking
snapshot polls, blinker toggles the LED and hello
prints. While blinker and hello are sleeping,
capture_loop polls the camera. The polling interval is
short – a single event-loop tick – so it adds negligible
latency to when the application sees a new frame.
The capture loop does not block the event loop. Adding
more concurrent work – a UART client, for instance – is
just another create_task() call inside
main.
Note
The framebuffers
setting still matters in this shape. Single-buffer mode
makes snapshot(blocking=False) return None until
the next frame is captured; double or triple buffering
smooths that over so that the wrapper usually finds a
buffered frame waiting on the first poll after the
previous frame has been processed.