protocol — Каналы протокола OpenMV

Модуль protocol предоставляет протокол хоста OpenMV для Python. Он позволяет инициализировать и настраивать стек протокола на стороне прошивки, а также даёт пользовательскому коду регистрировать собственные логические каналы, реализованные объектом Python, который реализует интерфейс канала (read, write, size, poll и т. д.). Именно с этим взаимодействуют настольные сопутствующие инструменты, когда передают данные изображения в потоковом режиме или предоставляют интерактивные виджеты подключённой камере.

Примеры

Передача изображения RGB565 в потоковом режиме на инструмент хоста с использованием собственного бэкенда, реализующего интерфейс необработанного канала (backend.size(), backend.shape(), backend.poll(), backend.read()):

import csi
import protocol

csi0 = csi.CSI()
csi0.reset()
csi0.pixformat(csi.RGB565)
csi0.framesize(csi.HD)

img = csi0.snapshot()
img_mv = memoryview(img.bytearray())
frame_ready = True


class FrameChannel:
    def size(self):
        return len(img_mv)

    def shape(self):
        return (img.height(), img.width(), len(img_mv))

    def poll(self):
        return frame_ready

    def read(self, offset, size):
        global frame_ready
        end = offset + size
        chunk = img_mv[offset:end]
        if end >= len(img_mv):
            frame_ready = False
        return chunk


protocol.register(name="frame", backend=FrameChannel())

while True:
    if not frame_ready:
        img = csi0.snapshot()
        img_mv = memoryview(img.bytearray())
        frame_ready = True

Соответствующий скрипт на стороне хоста, использующий пакет Python openmv (pip install openmv) для подключения, отправки скрипта на камеру и получения каждого кадра:

import cv2
import numpy as np
from openmv.camera import Camera

# The on-cam script above, stored as a string (or read from a file).
SCRIPT = open("frame_streamer_on_cam.py").read()

with Camera("/dev/ttyACM0", baudrate=921600) as cam:
    cam.stop()
    cam.exec(SCRIPT)

    while True:
        status = cam.read_status()
        if not cam.has_channel("frame") or not status.get("frame"):
            continue

        h, w, size = cam._channel_shape(cam.get_channel(name="frame"))
        if cam.channel_size("frame") < size:
            continue

        data = cam.channel_read("frame", size)
        rgb565 = np.frombuffer(data, dtype="<u2").reshape(h, w)

        # Unpack RGB565 to an HxWx3 uint8 RGB image.
        r = ((rgb565 >> 11) & 0x1F) << 3
        g = ((rgb565 >>  5) & 0x3F) << 2
        b = ( rgb565        & 0x1F) << 3
        frame = np.dstack([r, g, b]).astype(np.uint8)

        # Display with OpenCV (cv2 expects BGR, not RGB).
        cv2.imshow("OpenMV", cv2.cvtColor(frame, cv2.COLOR_RGB2BGR))
        if cv2.waitKey(1) == ord("q"):
            break

cv2.destroyAllWindows()

Замените /dev/ttyACM0 последовательным портом камеры (например, COM3 в Windows). Конструктор openmv.camera.Camera принимает те же параметры протокола, что и init (crc / seq / ack / events / max_payload / max_retry / timeout), когда стек на стороне камеры был перенастроен соответствующим образом.

Функции

protocol.init(crc: bool = True, seq: bool = True, ack: bool = True, events: bool = True, max_payload: int = ..., rtx_retries: int = 3, rtx_timeout_ms: int = 500, lock_interval_ms: int = 10, poll_ms: int = 0) None

Инициализирует (или перенастраивает) стек протокола и регистрирует логические каналы данных по умолчанию (stdin, stdout, stream и, если включён при компиляции, profile). Вызывает RuntimeError, если инициализация не удалась. Прошивка загружается с уже работающим стеком протокола USB по умолчанию, поэтому вызов этой функции нужен только для смены транспорта или переопределения параметров кадрирования по умолчанию.

crc включает проверку CRC для кадров протокола.

seq включает отслеживание порядковых номеров.

ack включает покадровые подтверждения.

events включает уведомления о событиях канала.

max_payload — максимальный размер полезной нагрузки в байтах. Если не указано, используется приведённое ниже значение по умолчанию для каждой камеры; оно вычисляется из размера буфера протокола каждой платы как buffer - 10 (header) - 4 (CRC).

Камера

Размер буфера

Макс. полезная нагрузка

OpenMV Cam M4 (OPENMV2)

512

498

OpenMV Cam M7 (OPENMV3)

512

498

OpenMV Cam H7 (OPENMV4)

512

498

OpenMV Cam H7 Plus (OPENMV4P)

4096

4082

OpenMV Pure Thermal (OPENMVPT)

4096

4082

OpenMV Cam RT1062 (OPENMV_RT1060)

4096

4082

OpenMV Cam N6 (OPENMV_N6)

8192

8178

OpenMV AE3 (OPENMV_AE3)

8192

8178

Arduino Portenta H7 (ARDUINO_PORTENTA_H7)

4096

4082

Arduino Giga (ARDUINO_GIGA)

4096

4082

Arduino Nicla Vision (ARDUINO_NICLA_VISION)

4096

4082

rtx_retries — количество попыток повторной передачи. По умолчанию 3.

rtx_timeout_ms — тайм-аут повторной передачи в миллисекундах (удваивается после каждого тайм-аута). По умолчанию 500.

lock_interval_ms — минимальный интервал блокировки в миллисекундах. По умолчанию 10.

poll_ms — интервал опроса в миллисекундах. 0 (по умолчанию) отключает опрос по таймеру.

protocol.is_active() bool

Возвращает True, если хост в данный момент подключён и стек протокола активен, иначе False.

protocol.register(name: str, *, backend: object, flags: int = 0) ProtocolChannel

Регистрирует объект Python backend как новый логический канал и возвращает дескриптор ProtocolChannel. Доступные методы объекта backend (см. Интерфейс бэкенда ниже) определяют возможности канала; флаги protocol.CHANNEL_FLAG_READ, protocol.CHANNEL_FLAG_WRITE и protocol.CHANNEL_FLAG_LOCK добавляются к flags автоматически, когда соответствующие методы реализованы.

name — имя канала в виде строки. Усекается до размера буфера имени канала в прошивке. Обязательно.

backend — объект Python, реализующий интерфейс бэкенда. Обязательно. Обычно передаётся по ключевому слову (backend=...).

flags — дополнительные биты флагов канала (см. константы CHANNEL_FLAG_*). Необязательно; по умолчанию 0.

Вызывает RuntimeError, если канал не удаётся зарегистрировать (например, нет свободных слотов каналов).

Классы

class protocol.ProtocolChannel

Дескриптор, возвращаемый protocol.register. Экземпляры не создаются напрямую.

send_event(event: int, wait_ack: bool = False) None

Отправляет хосту уведомление о событии канала.

event — идентификатор события (целое число).

wait_ack — если True, блокирует выполнение до подтверждения события хостом.

Вызывает RuntimeError, если отправка события не удалась.

Интерфейс бэкенда

Объект бэкенда, передаваемый в protocol.register, может реализовывать любое подмножество следующих методов. К C-уровню протокола подключаются только присутствующие на объекте методы; отсутствующие методы оставляют соответствующую возможность отключённой.

class protocol.backend

Объект бэкенда канала, передаваемый в protocol.register. Методы ниже описывают необязательный интерфейс, который может реализовывать бэкенд на Python.

init() object

Вызывается один раз при инициализации канала. Возвращает любое значение, отличное от None, в случае успеха; исключение или отсутствие возвращаемого значения считается ошибкой.

poll() bool

Возвращает True, если канал имеет данные, готовые к чтению хостом.

lock() bool

Захватывает канал для передачи. Возвращает True в случае успеха.

unlock() bool

Освобождает канал после передачи. Возвращает True в случае успеха.

size() int

Возвращает количество байтов, доступных в данный момент для чтения из канала.

shape() tuple

Возвращает кортеж из не более чем четырёх целых чисел, описывающих форму данных (например, размеры изображения). Уровень протокола использует не более четырёх элементов.

flush() object

Сбрасывает все ожидающие данные. Возвращает любое значение, отличное от None, в случае успеха.

read(offset: int, size: int) bytes

Возвращает до size байтов начиная со смещения offset в виде bytes-подобного объекта, поддерживающего протокол буфера.

readp(offset: int, size: int) bytes

Вариант read без копирования. Возвращает буфер, базовая память которого читается напрямую уровнем протокола; буфер должен оставаться действительным в течение всей передачи.

write(offset: int, data: bytearray) int

Записывает data по смещению offset. data — это bytearray, ссылающийся непосредственно на C-буфер. Возвращает количество записанных байтов или 0 при успехе по умолчанию.

ioctl(cmd: int, length: int, arg: bytearray | None) int

Обрабатывает ioctl. arg равен None, если length равно нулю, иначе это bytearray, ссылающийся на C-буфер. Возвращает 0 или None в случае успеха либо отрицательное целое число при ошибке.

is_active() bool

Для транспортных каналов возвращает True, если базовый транспорт в данный момент подключён.

class protocol.CBORChannel(on_read: Callable | None = None, on_write: Callable | None = None)

Более высокоуровневый бэкенд на Python (предоставляемый встроенным пакетом protocol), который сериализует именованные поля в CBOR с использованием целочисленных ключей, совместимых с SenML. Поддерживает виджеты отображения (label, depth) и интерактивные элементы управления (toggle, slider, select) с функциями обратного вызова on_read/on_write.

on_read — необязательный вызываемый объект on_read(channel), вызываемый перед сериализацией канала для хоста. Используйте его для обновления значений полей.

on_write — необязательный вызываемый объект on_write(channel, name, value), вызываемый, когда хост записывает новое значение для именованного поля.

add(name: str, type: str, value: Any = None, unit: str | None = None, min: int | float | None = None, max: int | float | None = None, step: int | float | None = None, options: list | None = None, width: int | None = None, height: int | None = None) None

Добавляет именованное поле в канал.

name — отображаемое имя; должно быть уникальным в пределах этого канала.

type — тип виджета: "label", "toggle", "slider", "select" или "depth".

value — начальное значение. Значение по умолчанию зависит от type.

unit — строка единицы измерения для label/slider (например, "Cel", "%RH").

min — минимальное значение (диапазон слайдера или диапазон глубины).

max — максимальное значение (диапазон слайдера или диапазон глубины).

step — размер шага (слайдер).

options — список строк-вариантов (select).

width — ширина в пикселях (depth).

height — высота в пикселях (depth).

__getitem__(name: str) object

Возвращает текущее значение именованного поля. Для полей depth возвращается буфер двоичных данных, иначе скалярное значение.

__setitem__(name: str, value: Any) None

Устанавливает значение именованного поля. Для полей slider кортеж (min, max, value) одновременно обновляет диапазон и текущее значение. Для полей depth value — это буфер двоичных данных.

poll() bool

Метод интерфейса бэкенда. Возвращает True, когда для хоста доступны сериализованные данные.

size() int

Метод интерфейса бэкенда. Вызывает on_read (если задан) и возвращает размер сериализованного буфера.

read(offset: int, size: int) bytes

Метод интерфейса бэкенда. Возвращает срез сериализованного буфера.

write(offset: int, data: bytearray) int

Метод интерфейса бэкенда. Декодирует список обновлений CBOR и применяет значения к соответствующим именованным полям, вызывая on_write для каждого.

Константы

Биты флагов канала (объединяются побитово; передаются в protocol.register через flags или устанавливаются автоматически на основе методов бэкенда).

protocol.CHANNEL_FLAG_READ: int

Канал поддерживает чтение.

protocol.CHANNEL_FLAG_WRITE: int

Канал поддерживает запись.

protocol.CHANNEL_FLAG_LOCK: int

Канал реализует lock/unlock.

protocol.CHANNEL_FLAG_PHYSICAL: int

Канал представляет физический транспорт (в отличие от логического канала данных).

Встроенные идентификаторы каналов.

protocol.CHANNEL_ID_TRANSPORT: int

Зарезервированный идентификатор канала для активного транспорта.

protocol.CHANNEL_ID_STDIN: int

Идентификатор встроенного канала stdin.

protocol.CHANNEL_ID_STDOUT: int

Идентификатор встроенного канала stdout.

protocol.CHANNEL_ID_STREAM: int

Идентификатор встроенного канала stream.

protocol.CHANNEL_ID_PROFILE: int

Идентификатор встроенного канала профилировщика (присутствует только когда прошивка собрана с включённым профилировщиком).