8.14. AsyncCSI¶
ينتهي البرنامج النصي النموذجي لـ OpenMV Cam بـ while True: img = csi0.snapshot() -- وهي حلقة لقطات حاجبة لا تحتاج إلى asyncio على الإطلاق. وفي اللحظة التي يتعين فيها على التطبيق القيام بشيء آخر إلى جانب عمليات الالتقاط -- كالإصغاء لزر، أو إرسال بيانات إلى نظير، أو تشغيل مهمة في الخلفية -- يصبح الاستدعاء الحاجب عائقاً. فبينما ينتظر snapshot الإطار التالي، لا تكون حلقة الأحداث قيد التشغيل، وبالتالي تتجمد كل دالة تعاونية أخرى في البرنامج حتى يصل الإطار.
تبني هذه الصفحة غلافاً صغيراً حول CSI يحوّل snapshot إلى دالة تعاونية متوافقة مع await. والنتيجة هي بديل جاهز للاستخدام يتيح لحلقة الالتقاط التعايش مع بقية برنامج asyncio.
8.14.1. المكوّنات¶
يقوم جزء واحد من واجهة برمجة التطبيقات CSI بمعظم العمل -- وهو snapshot() في وضعه غير الحاجب. فاستدعاء snapshot(blocking=False) إما يُرجع الإطار التالي (إن كان جاهزاً) أو None (إن لم يكن كذلك). كما أن أول استدعاء غير حاجب يبدأ أيضاً التقاط DMA للكاميرا إن لم يكن قيد التشغيل بالفعل، لذا لا يحتاج الغلاف إلى القيام بأي شيء خاص للإقلاع.
أما الجزء الآخر فهو asyncio.sleep_ms(). يستطلع الغلاف اللقطات غير الحاجبة في حلقة، متنازلاً عن التحكم لحلقة الأحداث عبر await asyncio.sleep_ms(0) بين عمليات الفحص بحيث تحصل كل دالة تعاونية أخرى جاهزة على فرصة للتشغيل قبل الاستطلاع التالي.
8.14.2. الغلاف¶
import asyncio
import csi
class AsyncCSI:
def __init__(self, *args, **kwargs):
self._csi = csi.CSI(*args, **kwargs)
def __getattr__(self, name):
return getattr(self._csi, name)
async def snapshot(self):
while True:
img = self._csi.snapshot(blocking=False)
if img is not None:
return img
await asyncio.sleep_ms(0)
يغلّف المُنشئ نسخة من CSI. ويعيد __getattr__ توجيه كل سمة لا يعرّفها الغلاف نفسه -- reset و pixformat و framesize وكل مقابض المستشعر -- إلى الكائن الأساسي CSI، فيبدو الغلاف مطابقاً للكائن غير المغلّف باستثناء الطريقة الوحيدة المهمة.
إن async def snapshot هو الجزء الجديد. فهو يستدعي snapshot(blocking=False)؛ وإذا أعاد الاستدعاء صورة، تُرجعها الدالة التعاونية. وإلا فإنها تتنازل عن التحكم لحلقة الأحداث عبر await asyncio.sleep_ms(0) كي تحصل الدوال التعاونية الأخرى على فرصة للتشغيل، ثم تعود إلى الحلقة وتحاول مجدداً. التكرار الأول يبدأ DMA؛ بينما تلتقط التكرارات اللاحقة الإطارات حالما تصبح متاحة.
8.14.3. حلقة لقطات بصحبة غيرها¶
مع وجود الغلاف، تندمج حلقة اللقطات في برنامج asyncio أكبر بنفس الطريقة التي تندمج بها أي دالة تعاونية أخرى. يشغّل المثال أدناه ثلاث دوال تعاونية بشكل متزامن: حلقة الالتقاط، ووامض LED، ونبضة قلب تطبع hello مرة في الثانية:
import asyncio
import csi
from machine import LED
async def capture_loop(cam):
while True:
img = await cam.snapshot()
# process img here
async def blinker(led, period_ms):
while True:
led.on()
await asyncio.sleep_ms(period_ms)
led.off()
await asyncio.sleep_ms(period_ms)
async def hello(period_s):
while True:
print("hello")
await asyncio.sleep(period_s)
async def main():
cam = AsyncCSI()
cam.reset()
cam.pixformat(csi.RGB565)
cam.framesize(csi.QVGA)
asyncio.create_task(blinker(LED("LED_BLUE"), 200))
asyncio.create_task(hello(1))
await capture_loop(cam)
asyncio.run(main())
تحرز الدوال التعاونية الثلاث جميعها تقدماً على حلقة الأحداث نفسها. فبينما يتنازل capture_loop عن التحكم بين عمليات استطلاع اللقطات غير الحاجبة، يبدّل blinker حالة LED ويطبع hello. وبينما ينام blinker و hello، يستطلع capture_loop الكاميرا. فترة الاستطلاع قصيرة -- نبضة واحدة لحلقة الأحداث -- لذا فهي تضيف زمن استجابة لا يُذكر إلى الوقت الذي يرى فيه التطبيق إطاراً جديداً.
حلقة الالتقاط لا تحجب حلقة الأحداث. وإضافة المزيد من العمل المتزامن -- عميل UART مثلاً -- ما هي إلا استدعاء create_task() آخر داخل main.
ملاحظة
لا يزال إعداد المخازن المؤقتة للإطارات مهماً في هذا الشكل. فوضع المخزن الأحادي يجعل snapshot(blocking=False) يُرجع None حتى يُلتقط الإطار التالي؛ بينما يخفف التخزين المؤقت المزدوج أو الثلاثي من ذلك بحيث يجد الغلاف عادةً إطاراً مخزّناً في انتظاره عند أول استطلاع بعد معالجة الإطار السابق.