8.14. AsyncCSI

Skrip OpenMV Cam yang umum diakhiri dengan while True: img = csi0.snapshot() -- loop snapshot yang memblokir yang tidak membutuhkan asyncio sama sekali. Begitu aplikasi harus melakukan sesuatu yang lain bersamaan dengan pengambilan gambar -- mendengarkan tombol, mengirim data ke peer, menjalankan task latar belakang -- panggilan yang memblokir menjadi penghalang. Sementara snapshot menunggu bingkai berikutnya, event loop tidak berjalan, sehingga setiap coroutine lain dalam program dibekukan hingga bingkai tiba.

Halaman ini membangun pembungkus kecil di sekitar CSI yang mengubah snapshot menjadi coroutine yang ramah dengan await. Hasilnya adalah pengganti drop-in yang memungkinkan loop pengambilan gambar untuk berdampingan dengan bagian lain dari program asyncio.

8.14.1. Bagian-bagiannya

Satu bagian dari API CSI melakukan sebagian besar pekerjaan -- snapshot() dalam mode non-blocking-nya. Memanggil snapshot(blocking=False) akan mengembalikan bingkai berikutnya (jika sudah siap) atau None (jika belum). Panggilan non-blocking pertama juga memulai pengambilan DMA kamera jika belum berjalan, sehingga pembungkus tidak perlu melakukan sesuatu yang khusus untuk bootstrap.

Bagian lainnya adalah asyncio.sleep_ms(). Pembungkus melakukan polling snapshot non-blocking dalam sebuah loop, menghasilkan kontrol ke event loop dengan await asyncio.sleep_ms(0) di antara pemeriksaan sehingga setiap coroutine yang siap mendapat kesempatan berjalan sebelum polling berikutnya.

8.14.2. Pembungkusnya

import asyncio
import csi


class AsyncCSI:
    def __init__(self, *args, **kwargs):
        self._csi = csi.CSI(*args, **kwargs)

    def __getattr__(self, name):
        return getattr(self._csi, name)

    async def snapshot(self):
        while True:
            img = self._csi.snapshot(blocking=False)
            if img is not None:
                return img
            await asyncio.sleep_ms(0)

Konstruktor membungkus instance CSI. __getattr__ meneruskan setiap atribut yang tidak didefinisikan pembungkus itu sendiri -- reset, pixformat, framesize, semua pengaturan sensor -- ke CSI yang mendasarinya, sehingga pembungkus terlihat identik dengan objek yang tidak dibungkus kecuali untuk satu metode yang penting.

async def snapshot adalah bagian baru. Ia memanggil snapshot(blocking=False); jika panggilan mengembalikan citra, coroutine mengembalikannya. Jika tidak, ia menghasilkan kendali kembali ke event loop dengan await asyncio.sleep_ms(0) agar coroutine lain mendapat kesempatan berjalan, kemudian kembali ke awal dan mencoba lagi. Iterasi pertama memulai DMA; iterasi berikutnya mengambil bingkai saat tersedia.

8.14.3. Loop snapshot dengan teman

Dengan pembungkus di tempatnya, loop snapshot muat ke dalam program asyncio yang lebih besar sama seperti coroutine lainnya. Contoh di bawah ini menjalankan tiga coroutine secara bersamaan: loop pengambilan gambar, pembalik LED, dan heartbeat yang mencetak hello sekali per detik:

import asyncio
import csi
from machine import LED


async def capture_loop(cam):
    while True:
        img = await cam.snapshot()
        # process img here

async def blinker(led, period_ms):
    while True:
        led.on()
        await asyncio.sleep_ms(period_ms)
        led.off()
        await asyncio.sleep_ms(period_ms)

async def hello(period_s):
    while True:
        print("hello")
        await asyncio.sleep(period_s)

async def main():
    cam = AsyncCSI()
    cam.reset()
    cam.pixformat(csi.RGB565)
    cam.framesize(csi.QVGA)

    asyncio.create_task(blinker(LED("LED_BLUE"), 200))
    asyncio.create_task(hello(1))
    await capture_loop(cam)

asyncio.run(main())

Ketiga coroutine membuat kemajuan pada event loop yang sama. Sementara capture_loop menghasilkan kontrol di antara polling snapshot non-blocking, blinker mengubah LED dan hello mencetak. Sementara blinker dan hello tidur, capture_loop melakukan polling kamera. Interval polling singkat -- satu tick event loop -- sehingga menambah latensi yang sangat kecil pada saat aplikasi melihat bingkai baru.

Loop pengambilan gambar tidak memblokir event loop. Menambahkan pekerjaan bersamaan lebih banyak -- misalnya klien UART -- hanyalah satu panggilan create_task() lagi di dalam main.

Catatan

Pengaturan framebuffer tetap penting dalam bentuk ini. Mode buffer tunggal membuat snapshot(blocking=False) mengembalikan None hingga bingkai berikutnya diambil; buffer ganda atau tiga meratakan itu sehingga pembungkus biasanya menemukan bingkai yang sudah di-buffer menunggu pada polling pertama setelah bingkai sebelumnya diproses.