10.3. البث الحي -- مشاهد واحد¶
تستطيع المتصفحات عرض تدفقات Motion JPEG (MJPEG) متعددة الأجزاء مباشرةً داخل وسم <img>. أعطِ المتصفح استجابة HTTP واحدة لا تنتهي أبدًا، واكتب صور JPEG مفصولة بحدّ متعدد الأجزاء، وسيعرض المتصفح كل إطار فور وصوله.
الاتصال بسيط ومباشر: ترويسة استجابة واحدة، Content-Type: multipart/x-mixed-replace; boundary=frame، ثم سطر --frame، و Content-Type: image/jpeg، وسطر فارغ، وبايتات JPEG، و \r\n، وتكرار. يغلق المتصفح الاتصال عند إزالة <img> أو إغلاق التبويب.
10.3.1. الالتقاط دون حجب¶
استدعاء csi0.snapshot() الحاجب المستخدم حتى الآن يوقف حلقة الأحداث بأكملها حتى يسلّم المستشعر إطارًا. كان ذلك جيدًا عندما يطلق طلب واحد لقطة واحدة ولا شيء آخر يعمل. لكن بمجرد فتح تدفق يجب على الخادم أن يستمر في معالجة الطلبات الأخرى بينما يجري التقاط الإطار التالي -- يحتاج استدعاء الالتقاط أن يتنازل لحلقة الأحداث وهو ينتظر المستشعر.
النمط هو غلاف AsyncCSI رفيع يستطلع csi.CSI.snapshot() في وضع عدم الحجب ويُنيم coroutine بين الاستطلاعات. شرح فصل asyncio هذا النمط في AsyncCSI؛ أدرجه ضمن البرنامج النصي مباشرةً في الوقت الحالي:
import asyncio
class AsyncCSI:
def __init__(self, *args, **kwargs):
self._csi = csi.CSI(*args, **kwargs)
def __getattr__(self, name):
return getattr(self._csi, name)
async def snapshot(self):
while True:
img = self._csi.snapshot(blocking=False)
if img is not None:
return img
await asyncio.sleep_ms(0)
تُمرَّر كل طريقة CSI أخرى (reset() و pixformat() و framesize() و gain_db()، ...) عبر __getattr__؛ وحدها snapshot() تُستبدل بنسخة قابلة للانتظار تتيح لحلقة الأحداث جدولة coroutines أخرى بين الاستطلاعات.
استبدل csi.CSI() المجرّد من مسار اللقطة بـ AsyncCSI():
csi0 = AsyncCSI()
csi0.reset()
csi0.pixformat(csi.RGB565)
csi0.framesize(csi.QVGA)
10.3.2. متون البث هي مُكرِّرات قائمة على الأصناف¶
متن استجابة البث ما هو إلا كائن يكرّره microdot باستخدام async for، مرسلًا كل جزء يُنتَج عبر المقبس. في CPython يكون هذا عادةً دالة مولّد غير متزامن -- async def مع yield. لكن MicroPython لا يدعم ذلك:
ملاحظة
لا يدعم asyncio في MicroPython دوال المولّد غير المتزامن (async def name(): ... yield ...). يجب أن تكون متون استجابة البث مُكرِّرات غير متزامنة قائمة على الأصناف بحيث تُرجع __aiter__ قيمة self وتُعرَّف __anext__ بوصفها async def.
بالنسبة لتدفق MJPEG يعني ذلك صنفًا تنتظر __anext__ فيه إطارًا واحدًا وتُرجعه مؤطّرًا داخل الغلاف متعدد الأجزاء:
BOUNDARY = b'frame'
class FrameStream:
def __aiter__(self):
return self
async def __anext__(self):
img = await csi0.snapshot()
jpeg = bytes(img.compress(quality=85).bytearray())
return (b'--' + BOUNDARY + b'\r\n'
b'Content-Type: image/jpeg\r\n\r\n'
+ jpeg + b'\r\n')
@app.get('/stream.jpg')
async def stream(request):
return Response(
body=FrameStream(),
headers={
'Content-Type':
b'multipart/x-mixed-replace; boundary=' + BOUNDARY,
},
)
النسخة جديدة لكل طلب، لذا يحصل كل عميل متصل على مُكرِّره الخاص. عندما ينفصل المتصفح، يتوقف microdot عن انتظار __anext__ ويُجمَع المُكرِّر كنفايات.
ملاحظة
غلاف bytes(...) حول JPEG هو إجراء وقائي. تُرجع bytearray() عرضًا داخل مخزن صورة الكاميرا، ويعيد استدعاء snapshot() التالي كتابة ذلك المخزن في مكانه. التغليف بـ bytes ينسخ JPEG إلى الخارج حتى يظل الجزء الذي يكون microdot في منتصف كتابته مستقرًا حتى لو لم ينتهِ إفراغ الكاتب بحلول وقت تشغيل __anext__ مجددًا.
10.3.3. تشغيل الخادم داخل asyncio¶
استدعاء app.run(host=..., port=...) السابق حاجب. يحتاج معالج MJPEG إلى مشاركة الحلقة مع استطلاعات لقطة AsyncCSI، لذا استبدل app.run بـ start_server() داخل asyncio.run():
async def main():
await app.start_server(host='0.0.0.0', port=80)
asyncio.run(main())
يتيح غلاف asyncio.run() أن يكون الخادم مهمة واحدة بين عدة مهام -- وعندئذٍ يصبح coroutine الـ main المكان الطبيعي لإطلاق الالتقاط، وكشف الحركة، وأي شيء آخر يجب أن يشارك الحلقة مع خادم HTTP.
10.3.4. مشاهد واحد في كل مرة¶
يشغّل كل عميل متصل مُكرِّر FrameStream الخاص به، ما يعني أن كل عميل يطلق استدعاء csi0.snapshot() الخاص به. متصفحان يعنيان قراءتين للمستشعر لكل فترة إطار، وثلاثة تعني ثلاثًا، وهكذا. لا يستطيع المستشعر فعليًا تسليم الإطارات أسرع من معدل إطاراته الخاص، لذا تصطفّ الطلبات خلف بعضها ويتباطأ تدفق الجميع.
الحل هو حلقة التقاط مشتركة واحدة تنشر إطارًا واحدًا إلى قُرّاء كثيرين.