protocol — Канали протоколу OpenMV

Модуль protocol відкриває хост-протокол OpenMV для Python. Він дозволяє ініціалізувати та налаштовувати стек протоколу на стороні мікропрограми, а також дає змогу користувацькому коду реєструвати власні логічні канали на основі Python-об’єкта, що реалізує інтерфейс каналу (read, write, size, poll тощо). Саме з цим взаємодіють настільні допоміжні інструменти при потоковій передачі даних зображень або відображенні інтерактивних віджетів підключеної камери.

Приклади

Потокова передача зображення RGB565 на хост за допомогою власного бекенду, що реалізує низькорівневий інтерфейс каналу (backend.size(), backend.shape(), backend.poll(), backend.read()):

import csi
import protocol

csi0 = csi.CSI()
csi0.reset()
csi0.pixformat(csi.RGB565)
csi0.framesize(csi.HD)

img = csi0.snapshot()
img_mv = memoryview(img.bytearray())
frame_ready = True


class FrameChannel:
    def size(self):
        return len(img_mv)

    def shape(self):
        return (img.height(), img.width(), len(img_mv))

    def poll(self):
        return frame_ready

    def read(self, offset, size):
        global frame_ready
        end = offset + size
        chunk = img_mv[offset:end]
        if end >= len(img_mv):
            frame_ready = False
        return chunk


protocol.register(name="frame", backend=FrameChannel())

while True:
    if not frame_ready:
        img = csi0.snapshot()
        img_mv = memoryview(img.bytearray())
        frame_ready = True

Відповідний скрипт на стороні хоста, що використовує пакет openmv для Python (pip install openmv) для підключення, надсилання скрипта на камеру та отримання кожного кадру:

import cv2
import numpy as np
from openmv.camera import Camera

# The on-cam script above, stored as a string (or read from a file).
SCRIPT = open("frame_streamer_on_cam.py").read()

with Camera("/dev/ttyACM0", baudrate=921600) as cam:
    cam.stop()
    cam.exec(SCRIPT)

    while True:
        status = cam.read_status()
        if not cam.has_channel("frame") or not status.get("frame"):
            continue

        h, w, size = cam._channel_shape(cam.get_channel(name="frame"))
        if cam.channel_size("frame") < size:
            continue

        data = cam.channel_read("frame", size)
        rgb565 = np.frombuffer(data, dtype="<u2").reshape(h, w)

        # Unpack RGB565 to an HxWx3 uint8 RGB image.
        r = ((rgb565 >> 11) & 0x1F) << 3
        g = ((rgb565 >>  5) & 0x3F) << 2
        b = ( rgb565        & 0x1F) << 3
        frame = np.dstack([r, g, b]).astype(np.uint8)

        # Display with OpenCV (cv2 expects BGR, not RGB).
        cv2.imshow("OpenMV", cv2.cvtColor(frame, cv2.COLOR_RGB2BGR))
        if cv2.waitKey(1) == ord("q"):
            break

cv2.destroyAllWindows()

Замість /dev/ttyACM0 вкажіть послідовний порт камери (наприклад, COM3 у Windows). Конструктор openmv.camera.Camera приймає ті самі параметри протоколу, що й init (crc / seq / ack / events / max_payload / max_retry / timeout), якщо стек на стороні камери переналаштований відповідним чином.

Функції

protocol.init(crc: bool = True, seq: bool = True, ack: bool = True, events: bool = True, max_payload: int = ..., rtx_retries: int = 3, rtx_timeout_ms: int = 500, lock_interval_ms: int = 10, poll_ms: int = 0) None

Ініціалізує (або переналаштовує) стек протоколу та реєструє стандартні логічні канали даних (stdin, stdout, stream і, якщо скомпільовано, profile). Генерує RuntimeError у разі помилки ініціалізації. Мікропрограма завантажується із вже запущеним стандартним стеком протоколу USB, тому виклик цієї функції потрібен лише для зміни транспорту або перевизначення стандартних параметрів фреймування.

crc вмикає перевірку CRC для кадрів протоколу.

seq вмикає відстеження порядкових номерів.

ack вмикає підтвердження для кожного кадру.

events вмикає сповіщення про події каналу.

max_payload — максимальний розмір корисного навантаження в байтах. Якщо не вказано, використовується стандартне значення для конкретної камери; воно визначається на основі розміру буфера протоколу кожної плати: buffer - 10 (header) - 4 (CRC).

Camera

Buffer size

Max payload

OpenMV Cam M4 (OPENMV2)

512

498

OpenMV Cam M7 (OPENMV3)

512

498

OpenMV Cam H7 (OPENMV4)

512

498

OpenMV Cam H7 Plus (OPENMV4P)

4096

4082

OpenMV Pure Thermal (OPENMVPT)

4096

4082

OpenMV Cam RT1062 (OPENMV_RT1060)

4096

4082

OpenMV Cam N6 (OPENMV_N6)

8192

8178

OpenMV AE3 (OPENMV_AE3)

8192

8178

Arduino Portenta H7 (ARDUINO_PORTENTA_H7)

4096

4082

Arduino Giga (ARDUINO_GIGA)

4096

4082

Arduino Nicla Vision (ARDUINO_NICLA_VISION)

4096

4082

rtx_retries — кількість спроб повторної передачі. Стандартне значення: 3.

rtx_timeout_ms — тайм-аут повторної передачі в мілісекундах (подвоюється після кожного тайм-ауту). Стандартне значення: 500.

lock_interval_ms — мінімальний інтервал блокування в мілісекундах. Стандартне значення: 10.

poll_ms — інтервал опитування в мілісекундах. 0 (стандартне значення) вимикає таймерне опитування.

protocol.is_active() bool

Повертає True, якщо хост наразі підключений і стек протоколу активний, інакше False.

protocol.register(name: str, *, backend: object, flags: int = 0) ProtocolChannel

Реєструє Python-об’єкт backend як новий логічний канал і повертає дескриптор ProtocolChannel. Доступні методи об’єкта backend (див. Інтерфейс бекенду нижче) визначають можливості каналу; protocol.CHANNEL_FLAG_READ, protocol.CHANNEL_FLAG_WRITE та protocol.CHANNEL_FLAG_LOCK автоматично додаються до flags, якщо реалізовано відповідні методи.

name — назва каналу у вигляді рядка. Обрізається до розміру буфера назви каналу мікропрограми. Обов’язково.

backend — Python-об’єкт, що реалізує інтерфейс бекенду. Обов’язково. Зазвичай передається як іменований аргумент (backend=...).

flags — додаткові біти прапорців каналу (див. константи CHANNEL_FLAG_*). Необов’язково; стандартне значення: 0.

Генерує RuntimeError, якщо канал не вдається зареєструвати (наприклад, немає вільних слотів для каналів).

Класи

class protocol.ProtocolChannel

Дескриптор, що повертається protocol.register. Екземпляри не створюються безпосередньо.

send_event(event: int, wait_ack: bool = False) None

Надіслати хосту сповіщення про подію каналу.

event — ідентифікатор події (ціле число).

wait_ack, якщо True, блокує виконання до отримання підтвердження від хоста.

Генерує RuntimeError у разі помилки надсилання події.

Інтерфейс бекенду

Об’єкт бекенду, переданий у protocol.register, може реалізовувати будь-яку підмножину наведених нижче методів. До рівня протоколу C підключаються лише методи, наявні в об’єкті; відсутні методи залишають відповідну можливість вимкненою.

class protocol.backend

Об’єкт бекенду каналу, переданий у protocol.register. Наведені нижче методи описують необов’язковий інтерфейс, який може реалізувати Python-бекенд.

init() object

Викликається один раз під час ініціалізації каналу. Повертає будь-яке значення, відмінне від None, у разі успіху; виняток або відсутність значення, що повертається, вважається помилкою.

poll() bool

Повертає True, якщо канал має дані, готові для читання хостом.

lock() bool

Захоплює канал для передачі. Повертає True у разі успіху.

unlock() bool

Звільняє канал після передачі. Повертає True у разі успіху.

size() int

Повертає кількість байтів, доступних для читання з каналу на даний момент.

shape() tuple

Повертає кортеж із не більше ніж чотирьох цілих чисел, що описують форму даних (наприклад, розміри зображення). Рівень протоколу використовує до чотирьох елементів.

flush() object

Скидає всі незбережені дані. Повертає будь-яке значення, відмінне від None, у разі успіху.

read(offset: int, size: int) bytes

Повертає до size байтів починаючи з offset у вигляді bytes-подібного об’єкта, що підтримує буферний протокол.

readp(offset: int, size: int) bytes

Варіант read без копіювання. Повертає буфер, пам’ять якого безпосередньо зчитується рівнем протоколу; буфер повинен залишатися дійсним протягом усього часу передачі.

write(offset: int, data: bytearray) int

Записує data за зміщенням offset. data — це bytearray, що безпосередньо посилається на C-буфер. Повертає кількість записаних байтів або 0 у разі стандартного успіху.

ioctl(cmd: int, length: int, arg: bytearray | None) int

Обробляє ioctl. arg дорівнює None, якщо length рівний нулю, інакше — bytearray, що безпосередньо посилається на C-буфер. Повертає 0 або None у разі успіху, або від’ємне ціле число у разі помилки.

is_active() bool

Для транспортних каналів повертає True, якщо базовий транспорт наразі підключений.

class protocol.CBORChannel(on_read: Callable | None = None, on_write: Callable | None = None)

Бекенд Python вищого рівня (надається замороженим пакетом protocol), який серіалізує іменовані поля до CBOR з використанням цілочисельних ключів, сумісних із SenML. Підтримує відображувальні віджети (label, depth) та інтерактивні елементи керування (toggle, slider, select) із зворотними викликами on_read/on_write.

on_read — необов’язковий виклик on_read(channel), що виконується перед серіалізацією каналу для хоста. Використовується для оновлення значень полів.

on_write — необов’язковий виклик on_write(channel, name, value), що виконується, коли хост записує нове значення для іменованого поля.

add(name: str, type: str, value: Any = None, unit: str | None = None, min: int | float | None = None, max: int | float | None = None, step: int | float | None = None, options: list | None = None, width: int | None = None, height: int | None = None) None

Додати іменоване поле до каналу.

name — відображувана назва; повинна бути унікальною в межах цього каналу.

type — тип віджета: "label", "toggle", "slider", "select" або "depth".

value — початкове значення. Стандартне значення залежить від type.

unit — рядок одиниці вимірювання для label/slider (наприклад, "Cel", "%RH").

min — мінімальне значення (діапазон слайдера або діапазон глибини).

max — максимальне значення (діапазон слайдера або діапазон глибини).

step — крок зміни значення (слайдер).

options — список рядків варіантів (select).

width — ширина в пікселях (depth).

height — висота в пікселях (depth).

__getitem__(name: str) object

Повертає поточне значення іменованого поля. Для полів типу depth повертається буфер бінарних даних, для інших — скалярне значення.

__setitem__(name: str, value: Any) None

Встановлює значення іменованого поля. Для полів типу slider кортеж (min, max, value) одночасно оновлює діапазон і поточне значення. Для полів типу depth value — це буфер бінарних даних.

poll() bool

Метод інтерфейсу бекенду. Повертає True, якщо серіалізовані дані готові для хоста.

size() int

Метод інтерфейсу бекенду. Викликає on_read (якщо встановлено) і повертає розмір серіалізованого буфера.

read(offset: int, size: int) bytes

Метод інтерфейсу бекенду. Повертає фрагмент серіалізованого буфера.

write(offset: int, data: bytearray) int

Метод інтерфейсу бекенду. Декодує список оновлень CBOR та застосовує значення до відповідних іменованих полів, викликаючи on_write для кожного.

Константи

Біти прапорців каналу (об’єднуються побітово; передаються в protocol.register через flags або встановлюються автоматично на основі методів бекенду).

protocol.CHANNEL_FLAG_READ: int

Канал підтримує читання.

protocol.CHANNEL_FLAG_WRITE: int

Канал підтримує записування.

protocol.CHANNEL_FLAG_LOCK: int

Канал реалізує lock/unlock.

protocol.CHANNEL_FLAG_PHYSICAL: int

Канал представляє фізичний транспорт (на відміну від логічного каналу даних).

Вбудовані ідентифікатори каналів.

protocol.CHANNEL_ID_TRANSPORT: int

Зарезервований ідентифікатор каналу для активного транспорту.

protocol.CHANNEL_ID_STDIN: int

Ідентифікатор вбудованого каналу stdin.

protocol.CHANNEL_ID_STDOUT: int

Ідентифікатор вбудованого каналу stdout.

protocol.CHANNEL_ID_STREAM: int

Ідентифікатор вбудованого каналу stream.

protocol.CHANNEL_ID_PROFILE: int

Ідентифікатор вбудованого каналу профілювальника (присутній лише тоді, коли мікропрограма зібрана з увімкненим профілювальником).