9.19. MQTT في Python

تغلّف وحدة mqtt المضمّنة في كل كاميرا OpenMV متصلة بالشبكة بروتوكول MQTT السلكي في صنف واحد هو mqtt.MQTTClient. ويفتح هذا الصنف مقبس TCP، ويُجري مصافحة CONNECT، ويحزِّم الحزم على مستوى البايت ويفكّها، ويتولى نبضات الإبقاء على الاتصال PINGREQ، ويوجّه رسائل PUBLISH الواردة إلى دالة رد نداء. وتستدعي شيفرة التطبيق connect() وpublish() وsubscribe() وwait_msg() / check_msg().

9.19.1. ناشر في خمسة عشر سطرًا

أصغر برنامج مفيد هو عملية نشر واحدة. اتصل، وانشر رسالة واحدة، ثم اقطع الاتصال:

from mqtt import MQTTClient

client = MQTTClient(
    client_id='yard-cam',
    server='test.mosquitto.org',
    port=1883,
)
client.connect()
client.publish(b'yard-cam/motion', b'detected at 14:02', qos=0)
client.disconnect()

إن test.mosquitto.org هو وسيط الاختبار العام الذي يديره مشروع Eclipse Mosquitto. وهو يقبل اتصالات TCP العادية على المنفذ 1883 دون بيانات اعتماد. لا تستخدمه لأي شيء جدي؛ فهو لا يقدّم أي ضمانات للخصوصية، ومجال أسماء المواضيع مشترك مع كل مختبِر آخر على الإنترنت.

يجب أن يكون client_id فريدًا لكل اتصال بالوسيط -- إذ يستخدمه الوسيط لتتبّع الجلسات. والمواضيع وحمولات الرسائل هي بايتات؛ مرّر str إن كان ذلك أكثر ملاءمة وسيرمّزه العميل بترميز UTF-8.

9.19.2. الاتصال عبر TLS

لأي شيء يتجاوز التجارب السريعة، يكون MQTT عبر TLS بمثابة وسيط إضافي واحد. ويُمرَّر قاموس ssl_params إلى ssl.wrap_socket()، لذا فإن كل ما يعمل هناك يعمل هنا:

import ssl

client = MQTTClient(
    client_id='yard-cam',
    server='broker.example.com',
    port=8883,                          # TLS-MQTT default port
    ssl_params={'server_hostname': 'broker.example.com'},
    user='yard-cam',
    password=load_token(),
)

إن المنفذ 8883 هو منفذ TLS-MQTT المحجوز من قبل IANA. ويُفعّل server_hostname تقنية SNI كي تتمكن الوسطاء الواقعون خلف عنوان IP مشترك من التوجيه إلى الشهادة الصحيحة -- وهي الآلية نفسها التي يستخدمها HTTPS. ويُطابِق user / password حقلي اسم المستخدم/كلمة المرور في حزمة CONNECT؛ ويقرر الوسيط ما إذا كانت هذه البيانات تمنح حقوق النشر أو الاشتراك لمواضيع محددة.

9.19.3. الاشتراك والاستقبال

لاستقبال الرسائل، يوفّر العميل دالة رد نداء ويستدعي subscribe(). وتتلقى دالة رد النداء وسيطين من نوع البايتات، هما الموضوع والحمولة:

def on_message(topic, msg):
    print('received on', topic.decode(), ':', msg.decode())

client = MQTTClient(
    client_id='dashboard',
    server='test.mosquitto.org',
    port=1883,
    callback=on_message,
)
client.connect()
client.subscribe(b'yard-cam/motion', qos=0)
while True:
    client.wait_msg()

تحجب wait_msg() التنفيذ حتى وصول حزمة MQTT واحدة، ثم تحلّلها، وتستدعي دالة رد النداء إن كانت رسالة PUBLISH على موضوع مُشترَك فيه، ثم تعود. وتُطلَق دوال رد النداء المُشترَك فيها من داخل تلك الاستدعاء -- فلا يوجد خيط في الخلفية.

وبالنسبة لحلقة كاميرا تفاعلية تحتاج إلى الاستمرار في أداء أعمال أخرى، تكون check_msg() هي المنطق نفسه في صيغة غير حاجبة. وهي تستخدم select.select() بمهلة 50 ms وتعود فورًا إن لم يكن هناك شيء معلّق:

while True:
    client.check_msg()
    run_frame()                  # capture + processing
    check_motion_threshold()

9.19.4. إعادة الاتصال بنظافة

على أي عميل MQTT طويل التشغيل أن يتعامل مع الاتصالات المنقطعة. فانقطاعات Wi-Fi، وإعادة تشغيل الوسيط، ومهلات NAT، أو ببساطة تجاوز فترة الإبقاء على الاتصال دون حركة بيانات، كلها تُنهي المقبس. ويُطلق العميل المضمّن استثناء OSError (أو استثناءً مجرّدًا يحمل رمز إرجاع الوسيط) من الاستدعاء الذي لاحظ الانقطاع، والنمط القياسي هو حلقة إعادة محاولة:

import time

def keep_publishing(client, topic, get_message):
    while True:
        try:
            client.connect()
            while True:
                client.publish(topic, get_message())
                time.sleep(5)
        except OSError:
            print('connection lost, reconnecting in 5s')
            time.sleep(5)

إن الاشتراكات لا تُحفظ عبر عمليات إعادة الاتصال ما لم يمرّر العميل clean_session=False عند الاتصال، لذا ينبغي أن يعيد connect الداخلي أيضًا إصدار أي استدعاءات subscribe() قبل الدخول في حلقة النشر.

9.19.5. خطّاف الوصية الأخيرة

ينبغي للكاميرا التي تُبلّغ عن حالتها أن تخبر الوسيط بالرسالة التي يجب إرسالها نيابةً عنها إن انقطع الاتصال على نحو غير متوقع. عيّن الوصية قبل connect()

client = MQTTClient(
    client_id='yard-cam',
    server='broker.example.com',
    port=8883,
    ssl_params={'server_hostname': 'broker.example.com'},
)
client.set_last_will(
    b'yard-cam/status',
    b'offline',
    retain=True,
    qos=0,
)
client.connect()
client.publish(b'yard-cam/status', b'online', retain=True)

والآن، أي لوحة معلومات مشتركة في yard-cam/status ترى online لحظة اتصال الكاميرا، وترى offline كلما لاحظ الوسيط انقطاع الكاميرا. وتبقى رسالة offline المحتفَظ بها مخزّنة على الوسيط، بحيث أن لوحة معلومات تتصل بعد عشر دقائق لا تزال ترى الحالة الراهنة الصحيحة.

9.19.6. متى تختار MQTT بدلًا من HTTP

يتناول فصل خوادم الويب الكاميرا وهي تعمل كخادم HTTP، وفي صفحة الرفع السحابي، وهي تعمل كعميل HTTP يرسل صور JPEG إلى عنوان URL ثابت. ولكلٍ منهما مكانه. والوقت المناسب لاللجوء إلى MQTT بدلًا من ذلك:

  • عندما تحتاج البيانات نفسها إلى الذهاب إلى عدة مستمعين (لوحة معلومات، وخدمة إشعارات، ومسجّل) دون أن تعرف الكاميرا القائمة سلفًا.

  • عندما قد يأتي المستمعون ويذهبون دون إعادة تشغيل الكاميرا.

  • عندما تريد الكاميرا أن تشترك -- لتلقّي أوامر من جهاز تحكم -- وهو ما لا يستطيع عميل HTTP فعله دون استقصاء طويل (long polling) أو خادم يدفع البيانات على عنوان URL لدالة رد نداء.

  • عندما يتعيّن على الاتصال أن يصمد عبر فترات خمول طويلة بتكلفة زهيدة.

أما الوقت المناسب للتمسك بـ HTTP فهو: كاميرا واحدة، وخادم واحد، ونمط طلب/استجابة ثابت بجسم أكبر من أن يُحمل في موضوع MQTT واحد (إرسال إطارات JPEG عبر MQTT ممكن لكنه فظ تجاه الوسيط؛ وطلب HTTP POST هو الملاءمة الطبيعية).

إحالة متقاطعة: تُظهر صفحة الرفع السحابي في فصل خوادم الويب النسخة المعتمدة على HTTP من "الكاميرا ← الأرشيف السحابي". أما نسخة MQTT من المشكلة نفسها فتُبقي الكاميرا منفصلة عن عنوان URL الخاص بالأرشيف، وتتيح لمستهلك ثانٍ (تطبيق تنبيهات على الهاتف مثلًا) أن يتصل بالدفق نفسه.