8.14. AsyncCSI¶
Tipična skripta za OpenMV Cam završava s while True: img = csi0.snapshot() – blokirajućom petljom snimanja kojoj asyncio uopće nije potreban. Onog trenutka kad aplikacija mora raditi nešto drugo uz snimanje – osluškivati gumb, slati podatke partneru, izvršavati pozadinski zadatak – blokirajući poziv postaje smetnja. Dok snapshot čeka sljedeću sličicu, petlja događaja se ne izvršava, pa je svaka druga korutina u programu zamrznuta sve dok sličica ne stigne.
Ova stranica gradi mali omotač oko CSI koji pretvara snapshot u korutinu prilagođenu naredbi await. Rezultat je zamjena bez izmjena koja omogućuje da petlja snimanja koegzistira s ostatkom asyncio programa.
8.14.1. Dijelovi¶
Jedan dio CSI API-ja obavlja većinu posla – snapshot() u svojem neblokirajućem načinu rada. Poziv snapshot(blocking=False) ili vraća sljedeću sličicu (ako je spremna) ili None (ako nije). Prvi neblokirajući poziv također pokreće DMA snimanje kamere ako već nije bilo aktivno, pa omotač ne mora raditi ništa posebno za inicijalizaciju.
Drugi dio je asyncio.sleep_ms(). Omotač u petlji ispituje neblokirajuće snimke, predajući kontrolu petlji događaja s await asyncio.sleep_ms(0) između provjera, tako da svaka druga spremna korutina dobije priliku za izvršavanje prije sljedeće provjere.
8.14.2. Omotač¶
import asyncio
import csi
class AsyncCSI:
def __init__(self, *args, **kwargs):
self._csi = csi.CSI(*args, **kwargs)
def __getattr__(self, name):
return getattr(self._csi, name)
async def snapshot(self):
while True:
img = self._csi.snapshot(blocking=False)
if img is not None:
return img
await asyncio.sleep_ms(0)
Konstruktor omata instancu CSI. __getattr__ prosljeđuje svaki atribut koji omotač sam ne definira – reset, pixformat, framesize, sve postavke senzora – temeljnom objektu CSI, pa omotač izgleda identično kao i neomotani objekt, osim po jednoj metodi koja je bitna.
async def snapshot je novi dio. Poziva snapshot(blocking=False); ako poziv vrati sliku, korutina je vraća. U suprotnom predaje kontrolu natrag petlji događaja s await asyncio.sleep_ms(0) kako bi druge korutine dobile priliku za izvršavanje, zatim se vraća u petlju i pokušava ponovno. Prva iteracija pokreće DMA; sljedeće iteracije preuzimaju sličice čim postanu dostupne.
8.14.3. Petlja snimanja u društvu¶
S postavljenim omotačem, petlja snimanja uklapa se u veći asyncio program na isti način kao i bilo koja druga korutina. Primjer u nastavku izvršava tri korutine istovremeno: petlju snimanja, treptanje LED-ice i otkucaj srca koji ispisuje hello jednom u sekundi:
import asyncio
import csi
from machine import LED
async def capture_loop(cam):
while True:
img = await cam.snapshot()
# process img here
async def blinker(led, period_ms):
while True:
led.on()
await asyncio.sleep_ms(period_ms)
led.off()
await asyncio.sleep_ms(period_ms)
async def hello(period_s):
while True:
print("hello")
await asyncio.sleep(period_s)
async def main():
cam = AsyncCSI()
cam.reset()
cam.pixformat(csi.RGB565)
cam.framesize(csi.QVGA)
asyncio.create_task(blinker(LED("LED_BLUE"), 200))
asyncio.create_task(hello(1))
await capture_loop(cam)
asyncio.run(main())
Sve tri korutine napreduju na istoj petlji događaja. Dok capture_loop predaje kontrolu između neblokirajućih provjera snimaka, blinker prebacuje LED-icu, a hello ispisuje. Dok blinker i hello spavaju, capture_loop ispituje kameru. Interval ispitivanja je kratak – jedan otkucaj petlje događaja – pa dodaje zanemarivo kašnjenje u trenutku kad aplikacija ugleda novu sličicu.
Petlja snimanja ne blokira petlju događaja. Dodavanje više istovremenog posla – na primjer, UART klijenta – samo je još jedan poziv create_task() unutar main.
Napomena
Postavka framebuffers i dalje je bitna u ovom obliku. Način rada s jednim međuspremnikom uzrokuje da snapshot(blocking=False) vraća None sve dok se ne snimi sljedeća sličica; dvostruko ili trostruko međuspremanje to izglađuje tako da omotač obično pronađe međuspremljenu sličicu koja čeka na prvoj provjeri nakon što je prethodna sličica obrađena.