8.14. AsyncCSI¶
สคริปต์ OpenMV Cam ทั่วไปจบด้วย while True: img = csi0.snapshot() ซึ่งเป็นลูปสแนปช็อตที่บล็อกและไม่ต้องการ asyncio เลย ทันทีที่แอปพลิเคชันต้องทำ สิ่งอื่น ควบคู่ไปกับการจับภาพ ไม่ว่าจะเป็นการฟังปุ่ม การส่งข้อมูลไปยังอุปกรณ์คู่ หรือการรันงานพื้นหลัง การเรียกที่บล็อกก็ขัดขวาง ขณะที่ snapshot รอเฟรมถัดไป event loop ไม่ได้ทำงาน ดังนั้นคอรูทีนอื่นๆ ทุกตัวในโปรแกรมจะถูกหยุดจนกว่าเฟรมจะมาถึง
หน้านี้สร้างตัวครอบขนาดเล็กรอบ CSI ที่เปลี่ยน snapshot ให้เป็นคอรูทีนที่รองรับ await ผลลัพธ์คือการแทนที่แบบ drop-in ที่ช่วยให้ลูปจับภาพอยู่ร่วมกับโปรแกรม asyncio ที่เหลือได้
8.14.1. ส่วนประกอบ¶
ส่วนหนึ่งของ API CSI ทำงานส่วนใหญ่ คือ snapshot() ในโหมดไม่บล็อก การเรียก snapshot(blocking=False) จะคืนเฟรมถัดไป (ถ้ามีพร้อม) หรือ None (ถ้าไม่มี) การเรียกแบบไม่บล็อกครั้งแรกยัง เริ่มต้น การจับภาพ DMA ของกล้องหากยังไม่ได้ทำงาน ดังนั้นตัวครอบไม่จำเป็นต้องทำอะไรพิเศษในการบูตสแตรป
ส่วนอื่นคือ asyncio.sleep_ms() ตัวครอบสำรวจ snapshot แบบไม่บล็อกในลูป โดยยอมให้ event loop ทำงานด้วย await asyncio.sleep_ms(0) ระหว่างการตรวจสอบ เพื่อให้คอรูทีนที่พร้อมอื่นๆ มีโอกาสทำงานก่อนการสำรวจครั้งถัดไป
8.14.2. ตัวครอบ¶
import asyncio
import csi
class AsyncCSI:
def __init__(self, *args, **kwargs):
self._csi = csi.CSI(*args, **kwargs)
def __getattr__(self, name):
return getattr(self._csi, name)
async def snapshot(self):
while True:
img = self._csi.snapshot(blocking=False)
if img is not None:
return img
await asyncio.sleep_ms(0)
ตัวสร้างครอบอินสแตนซ์ CSI __getattr__ ส่งต่อทุกแอตทริบิวต์ที่ตัวครอบไม่ได้กำหนดเอง ไม่ว่าจะเป็น reset, pixformat, framesize และปุ่มปรับ sensor knobs ทั้งหมด ไปยัง CSI ที่อยู่ภายใน ดังนั้นตัวครอบจึงดูเหมือนกันกับอ็อบเจกต์ที่ไม่ได้ครอบยกเว้นเมธอดเดียวที่สำคัญ
async def snapshot คือส่วนใหม่ มันเรียก snapshot(blocking=False) หากการเรียกคืนภาพ คอรูทีนจะคืนค่ามัน มิฉะนั้นมันจะยอมกลับไปยัง event loop ด้วย await asyncio.sleep_ms(0) เพื่อให้คอรูทีนอื่นมีโอกาสทำงาน แล้ววนกลับและลองอีกครั้ง การวนซ้ำครั้งแรกเริ่ม DMA การวนซ้ำถัดไปรับเฟรมเมื่อพร้อม
8.14.3. ลูปสแนปช็อตพร้อมเพื่อน¶
เมื่อตัวครอบอยู่ในตำแหน่ง ลูปสแนปช็อตจะพอดีกับโปรแกรม asyncio ที่ใหญ่กว่าในแบบเดียวกับคอรูทีนอื่นๆ ตัวอย่างด้านล่างรันสามคอรูทีนพร้อมกัน: ลูปจับภาพ ตัวกะพริบ LED และ heartbeat ที่พิมพ์ hello ทุกวินาที:
import asyncio
import csi
from machine import LED
async def capture_loop(cam):
while True:
img = await cam.snapshot()
# process img here
async def blinker(led, period_ms):
while True:
led.on()
await asyncio.sleep_ms(period_ms)
led.off()
await asyncio.sleep_ms(period_ms)
async def hello(period_s):
while True:
print("hello")
await asyncio.sleep(period_s)
async def main():
cam = AsyncCSI()
cam.reset()
cam.pixformat(csi.RGB565)
cam.framesize(csi.QVGA)
asyncio.create_task(blinker(LED("LED_BLUE"), 200))
asyncio.create_task(hello(1))
await capture_loop(cam)
asyncio.run(main())
คอรูทีนทั้งสามก้าวหน้าบน event loop เดียวกัน ขณะที่ capture_loop กำลังยอมระหว่างการสำรวจสแนปช็อตแบบไม่บล็อก blinker สลับ LED และ hello พิมพ์ ขณะที่ blinker และ hello กำลังนอน capture_loop สำรวจกล้อง ช่วงการสำรวจสั้นมาก ซึ่งก็คือ event-loop tick เดียว ดังนั้นจึงเพิ่มเวลาแฝงน้อยมากเมื่อแอปพลิเคชันเห็นเฟรมใหม่
ลูปจับภาพ ไม่ได้ บล็อก event loop การเพิ่มงานพร้อมกันอื่นๆ เช่น UART client เป็นเพียงการเรียก create_task() อีกครั้งภายใน main
Note
การตั้งค่า framebuffers ยังคงมีความสำคัญในรูปแบบนี้ โหมด single-buffer ทำให้ snapshot(blocking=False) คืนค่า None จนกว่าจะจับเฟรมถัดไป การบัฟเฟอร์คู่หรือสามชั้นทำให้ราบเรียบขึ้นเพื่อให้ตัวครอบมักพบเฟรมที่บัฟเฟอร์ไว้รออยู่ในการสำรวจครั้งแรกหลังจากเฟรมก่อนหน้าได้รับการประมวลผล