8.14. AsyncCSI¶
Typowy skrypt OpenMV Cam kończy się instrukcją while True: img = csi0.snapshot() – blokującą pętlą zrzutu obrazu, która w ogóle nie potrzebuje asyncio. W momencie, gdy aplikacja musi robić coś innego równolegle z przechwytywaniem – nasłuchiwać przycisku, wysyłać dane do partnera, uruchomić zadanie w tle – blokujące wywołanie staje na przeszkodzie. Podczas gdy snapshot czeka na kolejną ramkę, pętla zdarzeń nie działa, więc każda inna korutyna w programie jest zamrożona aż do nadejścia ramki.
Ta strona buduje niewielki wrapper wokół CSI, który zamienia snapshot w korutynę przyjazną dla await. Wynikiem jest zamiennik typu drop-in, który pozwala pętli przechwytywania współistnieć z resztą programu asyncio.
8.14.1. Elementy składowe¶
Jeden element API CSI wykonuje większość pracy – snapshot() w trybie nieblokującym. Wywołanie snapshot(blocking=False) zwraca albo następną ramkę (jeśli jest gotowa), albo None (jeśli nie). Pierwsze nieblokujące wywołanie również uruchamia przechwytywanie DMA kamery, jeśli nie działało ono jeszcze, więc wrapper nie musi robić nic specjalnego, aby je rozruszać.
Drugim elementem jest asyncio.sleep_ms(). Wrapper odpytuje nieblokujące zrzuty obrazu w pętli, oddając sterowanie pętli zdarzeń za pomocą await asyncio.sleep_ms(0) pomiędzy sprawdzeniami, dzięki czemu każda inna gotowa korutyna ma szansę się uruchomić przed kolejnym odpytaniem.
8.14.2. Wrapper¶
import asyncio
import csi
class AsyncCSI:
def __init__(self, *args, **kwargs):
self._csi = csi.CSI(*args, **kwargs)
def __getattr__(self, name):
return getattr(self._csi, name)
async def snapshot(self):
while True:
img = self._csi.snapshot(blocking=False)
if img is not None:
return img
await asyncio.sleep_ms(0)
Konstruktor opakowuje instancję CSI. __getattr__ przekazuje każdy atrybut, którego wrapper sam nie definiuje – reset, pixformat, framesize, wszystkie pokrętła sensora – do bazowego CSI, dzięki czemu wrapper wygląda identycznie jak nieopakowany obiekt, z wyjątkiem tej jednej metody, która ma znaczenie.
async def snapshot jest nowym elementem. Wywołuje snapshot(blocking=False); jeśli wywołanie zwróci obraz, korutyna go zwraca. W przeciwnym razie oddaje sterowanie pętli zdarzeń za pomocą await asyncio.sleep_ms(0), aby inne korutyny miały szansę się uruchomić, a następnie wraca do pętli i próbuje ponownie. Pierwsza iteracja uruchamia DMA; kolejne iteracje pobierają ramki w miarę jak stają się dostępne.
8.14.3. Pętla zrzutu obrazu w towarzystwie¶
Mając wrapper na miejscu, pętla zrzutu obrazu wpasowuje się w większy program asyncio w taki sam sposób jak każda inna korutyna. Poniższy przykład uruchamia trzy korutyny współbieżnie: pętlę przechwytywania, migacz diody LED oraz puls (heartbeat), który wypisuje hello raz na sekundę:
import asyncio
import csi
from machine import LED
async def capture_loop(cam):
while True:
img = await cam.snapshot()
# process img here
async def blinker(led, period_ms):
while True:
led.on()
await asyncio.sleep_ms(period_ms)
led.off()
await asyncio.sleep_ms(period_ms)
async def hello(period_s):
while True:
print("hello")
await asyncio.sleep(period_s)
async def main():
cam = AsyncCSI()
cam.reset()
cam.pixformat(csi.RGB565)
cam.framesize(csi.QVGA)
asyncio.create_task(blinker(LED("LED_BLUE"), 200))
asyncio.create_task(hello(1))
await capture_loop(cam)
asyncio.run(main())
Wszystkie trzy korutyny robią postępy na tej samej pętli zdarzeń. Podczas gdy capture_loop oddaje sterowanie pomiędzy nieblokującymi odpytaniami zrzutu obrazu, blinker przełącza diodę LED, a hello wypisuje tekst. Podczas gdy blinker i hello śpią, capture_loop odpytuje kamerę. Interwał odpytywania jest krótki – pojedynczy takt pętli zdarzeń – więc dodaje on pomijalne opóźnienie do momentu, w którym aplikacja zobaczy nową ramkę.
Pętla przechwytywania nie blokuje pętli zdarzeń. Dodanie kolejnej współbieżnej pracy – na przykład klienta UART – to po prostu kolejne wywołanie create_task() wewnątrz main.
Informacja
Ustawienie buforów ramek nadal ma znaczenie w tej postaci. Tryb pojedynczego bufora sprawia, że snapshot(blocking=False) zwraca None aż do przechwycenia kolejnej ramki; podwójne lub potrójne buforowanie wygładza to, dzięki czemu wrapper zazwyczaj znajduje zbuforowaną ramkę czekającą przy pierwszym odpytaniu po przetworzeniu poprzedniej ramki.