9.19. MQTT trong Python

Mô-đun mqtt được tích hợp sẵn trên mọi OpenMV cam có mạng bọc giao thức dây MQTT trong một lớp, mqtt.MQTTClient. Lớp này mở socket TCP, thực hiện bắt tay CONNECT, đóng gói và giải nén các gói ở cấp byte, xử lý keepalive PINGREQ, và gửi các tin nhắn PUBLISH đến vào một hàm gọi lại. Mã ứng dụng gọi connect(), publish(), subscribe(), và wait_msg() / check_msg().

9.19.1. Một publisher trong mười lăm dòng

Chương trình hữu ích nhỏ nhất là một lần publish duy nhất. Kết nối, publish một tin nhắn, ngắt kết nối:

from mqtt import MQTTClient

client = MQTTClient(
    client_id='yard-cam',
    server='test.mosquitto.org',
    port=1883,
)
client.connect()
client.publish(b'yard-cam/motion', b'detected at 14:02', qos=0)
client.disconnect()

test.mosquitto.org là broker kiểm tra công khai được vận hành bởi dự án Eclipse Mosquitto. Nó chấp nhận kết nối TCP thuần túy trên cổng 1883 mà không cần thông tin xác thực. Đừng sử dụng nó cho bất cứ điều gì nghiêm túc; nó không có đảm bảo quyền riêng tư và không gian tên chủ đề được chia sẻ với mọi người kiểm tra khác trên internet.

client_id phải là duy nhất cho mỗi kết nối broker -- broker sử dụng nó để theo dõi các phiên. Chủ đề và tải trọng tin nhắn là bytes; truyền str nếu thuận tiện hơn và client sẽ mã hóa nó dưới dạng UTF-8.

9.19.2. Kết nối qua TLS

Đối với bất cứ thứ gì ngoài các thử nghiệm nhanh, MQTT qua TLS chỉ cần thêm một đối số. Dict ssl_params được chuyển tiếp đến ssl.wrap_socket(), vì vậy bất cứ thứ gì hoạt động ở đó đều hoạt động ở đây:

import ssl

client = MQTTClient(
    client_id='yard-cam',
    server='broker.example.com',
    port=8883,                          # TLS-MQTT default port
    ssl_params={'server_hostname': 'broker.example.com'},
    user='yard-cam',
    password=load_token(),
)

Cổng 8883 là cổng TLS-MQTT dành riêng của IANA. server_hostname bật SNI để các broker phía sau một IP dùng chung có thể định tuyến đến chứng chỉ đúng -- cùng cơ chế mà HTTPS sử dụng. user / password ánh xạ đến các trường username/password của gói CONNECT; broker quyết định liệu các thông tin xác thực đó có cấp quyền publish hoặc subscribe cho các chủ đề cụ thể hay không.

9.19.3. Đăng ký và nhận tin nhắn

Để nhận tin nhắn, client cung cấp một hàm gọi lại và gọi subscribe(). Hàm gọi lại nhận hai đối số bytes, chủ đề và tải trọng:

def on_message(topic, msg):
    print('received on', topic.decode(), ':', msg.decode())

client = MQTTClient(
    client_id='dashboard',
    server='test.mosquitto.org',
    port=1883,
    callback=on_message,
)
client.connect()
client.subscribe(b'yard-cam/motion', qos=0)
while True:
    client.wait_msg()

wait_msg() chặn cho đến khi một gói MQTT đến, phân tích cú pháp nó, gọi hàm gọi lại nếu đó là PUBLISH trên một chủ đề đã đăng ký, và trả về. Các hàm gọi lại đã đăng ký kích hoạt từ bên trong lệnh gọi đó -- không có luồng nền.

Đối với vòng lặp cam tương tác cần tiếp tục thực hiện các công việc khác, check_msg() là cùng logic trong dạng không chặn. Nó sử dụng select.select() với thời gian chờ 50 ms và trả về ngay lập tức nếu không có gì đang chờ:

while True:
    client.check_msg()
    run_frame()                  # capture + processing
    check_motion_threshold()

9.19.4. Kết nối lại một cách sạch sẽ

Bất kỳ MQTT client chạy lâu dài nào cũng phải xử lý các kết nối bị ngắt. Ngắt kết nối Wi-Fi, khởi động lại broker, hết thời gian NAT, hoặc đơn giản là chạy quá keepalive mà không có lưu lượng đều kết thúc socket. Client được tích hợp sẵn raise OSError (hoặc một ngoại lệ bare với mã trả về của broker) từ lệnh gọi nhận thấy sự ngắt kết nối, và mẫu chuẩn là một vòng lặp thử lại:

import time

def keep_publishing(client, topic, get_message):
    while True:
        try:
            client.connect()
            while True:
                client.publish(topic, get_message())
                time.sleep(5)
        except OSError:
            print('connection lost, reconnecting in 5s')
            time.sleep(5)

Các subscription không được duy trì qua các lần kết nối lại trừ khi client truyền clean_session=False khi kết nối, vì vậy lệnh connect bên trong cũng nên phát lại các lệnh gọi subscribe() trước khi vào vòng lặp publish.

9.19.5. Hook last-will

Một cam báo cáo trạng thái nên nói với broker thông điệp nào cần gửi thay mặt cam nếu kết nối bị ngắt đột ngột. Đặt will trước connect()

client = MQTTClient(
    client_id='yard-cam',
    server='broker.example.com',
    port=8883,
    ssl_params={'server_hostname': 'broker.example.com'},
)
client.set_last_will(
    b'yard-cam/status',
    b'offline',
    retain=True,
    qos=0,
)
client.connect()
client.publish(b'yard-cam/status', b'online', retain=True)

Bây giờ bất kỳ dashboard nào đã đăng ký yard-cam/status sẽ thấy online ngay khi cam kết nối và offline bất cứ khi nào broker nhận thấy cam bị ngắt. Tin nhắn offline được giữ lại trên broker để một dashboard kết nối mười phút sau vẫn thấy trạng thái hiện tại đúng.

9.19.6. Khi nào nên chọn MQTT thay vì HTTP

Chương webservers đề cập đến cam hoạt động như một máy chủ HTTP và, trên trang tải lên đám mây, như một HTTP client đăng JPEG lên một URL cố định. Cả hai đều có chỗ của chúng. Thời điểm phù hợp để chọn MQTT thay thế:

  • Cùng một dữ liệu cần đến nhiều người nghe (một dashboard, một dịch vụ thông báo, một bộ ghi) mà cam không biết danh sách trước.

  • Người nghe có thể đến và đi mà không cần cam khởi động lại.

  • Cam muốn đăng ký -- để nhận lệnh từ một bộ điều khiển -- điều mà một HTTP client không thể làm mà không cần long polling hoặc máy chủ đẩy trên một URL hàm gọi lại.

  • Kết nối phải tồn tại qua các khoảng thời gian nhàn rỗi dài một cách tiết kiệm.

Thời điểm phù hợp để tiếp tục dùng HTTP: một cam, một máy chủ, một mẫu request/response cố định với body quá lớn cho một MQTT topic duy nhất (JPEG frame qua MQTT hoạt động được nhưng không thân thiện với broker; HTTP POST là lựa chọn tự nhiên hơn).

Tham chiếu chéo: trang tải lên đám mây trong chương webservers hiển thị phiên bản HTTP của "cam → kho lưu trữ đám mây". Phiên bản MQTT của cùng vấn đề giữ cho cam độc lập khỏi URL của kho lưu trữ và cho phép một người tiêu dùng thứ hai (chẳng hạn một ứng dụng cảnh báo điện thoại) truy cập cùng luồng đó.