10.4. مشاركة حلقة التقاط واحدة عبر المشاهدين

إن استدعاء كل عميل متصل لـ csi0.snapshot() بشكل مستقل مبدّد للموارد، وبمجرد فتح تدفقين في آنٍ واحد يزداد الأمر سوءًا: يسلّم المستشعر الإطارات بمعدّله الخاص، وكل التقاط مكرَّر يبطّئ الجميع. النهج الصحيح هو coroutine التقاط واحد ينشر "أحدث إطار" إلى موضع مشترك، بالإضافة إلى مُكرِّرات لكل عميل تقرأ من ذلك الموضع.

One capture task writes JPEG bytes to a single latest_jpeg slot; three stream-client iterators read from the slot and each wait on the shared new_frame event.

10.4.1. مهمة الالتقاط

يلتقط coroutine يعمل في الخلفية الإطارات بأسرع ما يسلّمها المستشعر، ويضغط كلًّا منها بصيغة JPEG داخل bytes مشترك، وينبض حدثًا حتى يستيقظ أي عميل منتظر:

latest_jpeg = None
new_frame = asyncio.Event()

async def capture_loop():
    global latest_jpeg
    while True:
        img = await csi0.snapshot()
        latest_jpeg = bytes(img.compress(quality=85).bytearray())
        new_frame.set()
        new_frame.clear()

زوج set() / clear() هو نمط النبضة. تفكّ set() حجب كل coroutine ينتظر الحدث حاليًا دفعةً واحدة؛ وتعيد clear() ضبط الحدث فورًا حتى يَحجب استدعاء wait() التالي مجددًا. مع وجود عدة مستهلكين (مشاهد، ومشاهد آخر، وأي coroutine آخر يحتاج إلى التفاعل مع إطار جديد)، لا يكون أي مستهلك واحد مسؤولًا عن إعادة ضبط الحدث، ولا أحد يسرق إيقاظًا من أحد آخر.

ملاحظة

غلاف bytes(...) حول JPEG حامل للعبء هنا. تُرجع bytearray() عرضًا داخل مخزن صورة الكاميرا؛ ويعيد استدعاء snapshot() التالي مباشرةً كتابة ذلك المخزن في مكانه بالإطار التالي. يعيش latest_jpeg أطول من المتغير المحلي img، لذا فبدون النسخ سيرى كل قارئ الموضع ينزاح من تحته عند كل التقاط.

10.4.2. تقرأ مُكرِّرات كل عميل من الموضع

يتوقف معالج تدفق MJPEG عن استدعاء csi0.snapshot() بنفسه. وبدلًا من ذلك، تنتظر كل نسخة FrameStream على الحدث المشترك وتقرأ من البايتات المشتركة:

class FrameStream:
    # One instance per connected client. Each one independently
    # waits on the shared new_frame pulse; the capture loop is
    # responsible for resetting the event between frames.

    def __aiter__(self):
        return self

    async def __anext__(self):
        await new_frame.wait()
        if latest_jpeg is None:
            return b''
        return (b'--' + BOUNDARY + b'\r\n'
                b'Content-Type: image/jpeg\r\n\r\n'
                + latest_jpeg + b'\r\n')

يتغير مسار اللقطة أيضًا: فهو لم يعد يطلق التقاطًا، بل يُرجع ما يحمله latest_jpeg حاليًا:

@app.get('/snapshot.jpg')
async def snapshot(request):
    if latest_jpeg is None:
        return 'no frame yet', 503
    return Response(
        body=latest_jpeg,
        headers={'Content-Type': 'image/jpeg'},
    )

صيغة الصف (body, status) هي اختصار microdot لضبط رمز حالة HTTP دون إنشاء microdot.Response. الرمز 503 يعني أنا هنا لكني لست جاهزًا -- وهو الرمز الصحيح لـ "اسأل مرة أخرى بعد لحظة."

10.4.3. تشغيل الالتقاط إلى جانب الخادم

يحتوي main الآن على coroutines رئيسيين: حلقة الالتقاط وخادم HTTP. تشغّلهما asyncio.gather() معًا، وإذا تعطّل أحدهما يُلغى الآخر:

async def main():
    await asyncio.gather(
        capture_loop(),
        app.start_server(host='0.0.0.0', port=80),
    )

asyncio.run(main())

الآن يقرأ المستشعر إطارًا واحدًا لكل دورة بغضّ النظر عن عدد المشاهدين المتصلين. أول متصفح يصل إلى /stream.jpg يرى الإطارات؛ وكذلك الثاني والثالث والعاشر -- جميعهم يتشاركون الالتقاط نفسه، وتبقى الكاميرا مستجيبة على مساراتها الأخرى.