9.19. MQTT ב-Python

מודול mqtt המצורף לכל מצלמת OpenMV מרושתת עוטף את פרוטוקול התקשורת של MQTT במחלקה אחת, mqtt.MQTTClient. המחלקה פותחת את שקע ה-TCP, מבצעת את לחיצת היד של CONNECT, אורזת ומפרקת את החבילות ברמת הבייטים, מטפלת ב-keepalive של PINGREQ, ומנתבת הודעות PUBLISH נכנסות אל פונקציית callback. קוד האפליקציה קורא ל-connect(), publish(), subscribe(), ו-wait_msg() / check_msg().

9.19.1. מפרסם בחמש-עשרה שורות

התוכנית השימושית הקטנה ביותר היא פרסום בודד. התחברות, פרסום הודעה אחת, ניתוק:

from mqtt import MQTTClient

client = MQTTClient(
    client_id='yard-cam',
    server='test.mosquitto.org',
    port=1883,
)
client.connect()
client.publish(b'yard-cam/motion', b'detected at 14:02', qos=0)
client.disconnect()

test.mosquitto.org הוא ה-broker הציבורי לבדיקות המופעל על ידי פרויקט Eclipse Mosquitto. הוא מקבל חיבורי TCP פשוטים ביציאה 1883 ללא אישורי גישה. אל תשתמש בו לשום דבר רציני; אין לו הבטחות פרטיות ומרחב השמות של הנושאים משותף עם כל בודק אחר באינטרנט.

client_id חייב להיות ייחודי לכל חיבור ל-broker – ה-broker משתמש בו כדי לעקוב אחר sessions. נושאים ומטעני הודעות הם בייטים; ניתן להעביר str אם זה נוח יותר וה-client יקודד אותו כ-UTF-8.

9.19.2. התחברות דרך TLS

לכל דבר שמעבר לניסויים מהירים, MQTT על גבי TLS הוא ארגומנט נוסף אחד. מילון ה-ssl_params מועבר ל-ssl.wrap_socket(), כך שכל מה שעובד שם עובד גם כאן:

import ssl

client = MQTTClient(
    client_id='yard-cam',
    server='broker.example.com',
    port=8883,                          # TLS-MQTT default port
    ssl_params={'server_hostname': 'broker.example.com'},
    user='yard-cam',
    password=load_token(),
)

יציאה 8883 היא יציאת TLS-MQTT השמורה על ידי IANA. server_hostname מפעיל את SNI כך ש-brokers שמאחורי IP משותף יכולים לנתב אל התעודה הנכונה – אותו מנגנון שבו משתמש HTTPS. user / password ממופים לשדות שם המשתמש/הסיסמה של חבילת CONNECT; ה-broker מחליט האם אישורי גישה אלה מעניקים זכויות פרסום או הרשמה לנושאים מסוימים.

9.19.3. הרשמה וקבלה

כדי לקבל הודעות, client מספק פונקציית callback וקורא ל-subscribe(). ה-callback מקבל שני ארגומנטים מסוג בייטים, הנושא והמטען:

def on_message(topic, msg):
    print('received on', topic.decode(), ':', msg.decode())

client = MQTTClient(
    client_id='dashboard',
    server='test.mosquitto.org',
    port=1883,
    callback=on_message,
)
client.connect()
client.subscribe(b'yard-cam/motion', qos=0)
while True:
    client.wait_msg()

wait_msg() חוסם עד שמגיעה חבילת MQTT אחת, מנתח אותה, קורא ל-callback אם זו הייתה הודעת PUBLISH על נושא רשום, וחוזר. פונקציות ה-callback הרשומות מופעלות מתוך אותה קריאה – אין thread ברקע.

עבור לולאת מצלמה אינטראקטיבית שצריכה להמשיך לבצע עבודה אחרת, check_msg() היא אותה לוגיקה בצורה לא-חוסמת. היא משתמשת ב-select.select() עם פסק זמן של 50 מילישניות וחוזרת מיד אם אין שום דבר ממתין:

while True:
    client.check_msg()
    run_frame()                  # capture + processing
    check_motion_threshold()

9.19.4. התחברות מחדש בצורה נקייה

כל client של MQTT הפועל לאורך זמן חייב לטפל בחיבורים שנופלים. ניתוקי Wi-Fi, אתחולי broker, פסקי זמן של NAT, או פשוט חריגה מעבר ל-keepalive ללא תעבורה – כולם מסיימים את השקע. ה-client המצורף מעלה OSError (או חריגה חשופה עם קוד החזרה של ה-broker) מהקריאה שהבחינה בנפילה, והדפוס הסטנדרטי הוא לולאת ניסיון חוזר:

import time

def keep_publishing(client, topic, get_message):
    while True:
        try:
            client.connect()
            while True:
                client.publish(topic, get_message())
                time.sleep(5)
        except OSError:
            print('connection lost, reconnecting in 5s')
            time.sleep(5)

הרשמות אינן נשמרות לאורך התחברויות מחדש אלא אם ה-client העביר clean_session=False בעת ההתחברות, ולכן ה-connect הפנימי צריך גם להנפיק מחדש כל קריאות subscribe() לפני הכניסה ללולאת הפרסום.

9.19.5. וו הצוואה האחרונה (last-will)

מצלמה המדווחת על מצב צריכה לומר ל-broker איזו הודעה לשלוח בשם המצלמה אם החיבור נופל באופן בלתי צפוי. הגדר את הצוואה (will) לפני connect()

client = MQTTClient(
    client_id='yard-cam',
    server='broker.example.com',
    port=8883,
    ssl_params={'server_hostname': 'broker.example.com'},
)
client.set_last_will(
    b'yard-cam/status',
    b'offline',
    retain=True,
    qos=0,
)
client.connect()
client.publish(b'yard-cam/status', b'online', retain=True)

כעת כל לוח בקרה הרשום ל-yard-cam/status רואה online ברגע שהמצלמה מתחברת ו-offline בכל פעם שה-broker מבחין שהמצלמה נפלה. הודעת ה-offline השמורה נשמרת על ה-broker כך שלוח בקרה שמתחבר עשר דקות מאוחר יותר עדיין רואה את המצב הנוכחי הנכון.

9.19.6. מתי לבחור ב-MQTT על פני HTTP

פרק שרתי האינטרנט מכסה את המצלמה הפועלת כשרת HTTP, ובעמוד ההעלאה לענן, כ-client של HTTP השולח JPEGs אל כתובת URL קבועה. לשניהם מקום משלהם. הזמן הנכון לפנות ל-MQTT במקום זאת:

  • אותם נתונים צריכים להגיע למספר מאזינים (לוח בקרה, שירות התראות, מקליט) מבלי שהמצלמה תדע את הרשימה מראש.

  • מאזינים עשויים להופיע ולהיעלם מבלי שהמצלמה תאותחל מחדש.

  • המצלמה רוצה להירשם – כדי לקבל פקודות מבקר – מה ש-client של HTTP אינו יכול לעשות ללא תשאול מתמשך (long polling) או שרת הדוחף לכתובת URL של callback.

  • החיבור חייב לשרוד תקופות סרק ארוכות בזול.

הזמן הנכון להישאר עם HTTP: מצלמה אחת, שרת אחד, דפוס בקשה/תגובה קבוע עם גוף שגדול מדי עבור נושא MQTT בודד (פריימים של JPEG על גבי MQTT עובדים אך גסים כלפי ה-broker; HTTP POST הוא ההתאמה הטבעית).

קישור צולב: עמוד ההעלאה לענן בפרק שרתי האינטרנט מציג את גרסת ה-HTTP של ”מצלמה ← ארכיון בענן“. גרסת ה-MQTT של אותה בעיה שומרת על ניתוק המצלמה מכתובת ה-URL של הארכיון ומאפשרת לצרכן שני (אפליקציית התראות בטלפון, למשל) להתחבר לאותו זרם.