protocol --- OpenMV 协议通道

protocol 模块向 Python 暴露了 OpenMV 主机协议。它允许初始化和配置固件端的协议栈,并让用户代码注册由实现了通道接口(readwritesizepoll 等)的 Python 对象支持的自定义逻辑通道。当桌面配套工具向相连的摄像头流式传输图像数据或暴露交互式控件时,它们就是与此进行通信的。

示例

使用实现了原始通道接口(backend.size()backend.shape()backend.poll()backend.read())的自定义后端,向主机工具流式传输一张 RGB565 图像:

import csi
import protocol

csi0 = csi.CSI()
csi0.reset()
csi0.pixformat(csi.RGB565)
csi0.framesize(csi.HD)

img = csi0.snapshot()
img_mv = memoryview(img.bytearray())
frame_ready = True


class FrameChannel:
    def size(self):
        return len(img_mv)

    def shape(self):
        return (img.height(), img.width(), len(img_mv))

    def poll(self):
        return frame_ready

    def read(self, offset, size):
        global frame_ready
        end = offset + size
        chunk = img_mv[offset:end]
        if end >= len(img_mv):
            frame_ready = False
        return chunk


protocol.register(name="frame", backend=FrameChannel())

while True:
    if not frame_ready:
        img = csi0.snapshot()
        img_mv = memoryview(img.bytearray())
        frame_ready = True

对应的主机端脚本,使用 openmv Python 包(pip install openmv)进行连接、推送摄像头端脚本并拉取每一帧:

import cv2
import numpy as np
from openmv.camera import Camera

# The on-cam script above, stored as a string (or read from a file).
SCRIPT = open("frame_streamer_on_cam.py").read()

with Camera("/dev/ttyACM0", baudrate=921600) as cam:
    cam.stop()
    cam.exec(SCRIPT)

    while True:
        status = cam.read_status()
        if not cam.has_channel("frame") or not status.get("frame"):
            continue

        h, w, size = cam._channel_shape(cam.get_channel(name="frame"))
        if cam.channel_size("frame") < size:
            continue

        data = cam.channel_read("frame", size)
        rgb565 = np.frombuffer(data, dtype="<u2").reshape(h, w)

        # Unpack RGB565 to an HxWx3 uint8 RGB image.
        r = ((rgb565 >> 11) & 0x1F) << 3
        g = ((rgb565 >>  5) & 0x3F) << 2
        b = ( rgb565        & 0x1F) << 3
        frame = np.dstack([r, g, b]).astype(np.uint8)

        # Display with OpenCV (cv2 expects BGR, not RGB).
        cv2.imshow("OpenMV", cv2.cvtColor(frame, cv2.COLOR_RGB2BGR))
        if cv2.waitKey(1) == ord("q"):
            break

cv2.destroyAllWindows()

/dev/ttyACM0 替换为摄像头的串口(例如 Windows 上的 COM3)。当摄像头端协议栈已被重新配置以匹配时,openmv.camera.Camera 构造函数接受与 init 相同的协议参数(crc / seq / ack / events / max_payload / max_retry / timeout)。

函数

protocol.init(crc: bool = True, seq: bool = True, ack: bool = True, events: bool = True, max_payload: int = ..., rtx_retries: int = 3, rtx_timeout_ms: int = 500, lock_interval_ms: int = 10, poll_ms: int = 0) None

初始化(或重新配置)协议栈,并注册默认的逻辑数据通道(stdinstdoutstream,以及在编译启用时的 profile)。如果初始化失败则抛出 RuntimeError。固件启动时已经运行着一个默认的 USB 协议栈,因此只有在需要更改传输方式或覆盖默认的帧封装参数时才需要调用此函数。

crc 在协议帧上启用 CRC 校验。

seq 启用序列号跟踪。

ack 启用逐帧确认。

events 启用通道事件通知。

max_payload 是以字节为单位的最大负载大小。如果省略,则使用下表中各摄像头的默认值;该值由每块板卡的协议缓冲区大小按 buffer - 10 (header) - 4 (CRC) 推导得出。

摄像头

缓冲区大小

最大负载

OpenMV Cam M4(OPENMV2

512

498

OpenMV Cam M7(OPENMV3

512

498

OpenMV Cam H7(OPENMV4

512

498

OpenMV Cam H7 Plus(OPENMV4P

4096

4082

OpenMV Pure Thermal(OPENMVPT

4096

4082

OpenMV Cam RT1062(OPENMV_RT1060

4096

4082

OpenMV Cam N6(OPENMV_N6

8192

8178

OpenMV AE3(OPENMV_AE3

8192

8178

Arduino Portenta H7(ARDUINO_PORTENTA_H7

4096

4082

Arduino Giga(ARDUINO_GIGA

4096

4082

Arduino Nicla Vision(ARDUINO_NICLA_VISION

4096

4082

rtx_retries 是重传尝试次数。默认为 3

rtx_timeout_ms 是以毫秒为单位的重传超时时间(每次超时后翻倍)。默认为 500

lock_interval_ms 是以毫秒为单位的最小锁定间隔。默认为 10

poll_ms 是以毫秒为单位的轮询间隔。0(默认值)禁用定时器轮询。

protocol.is_active() bool

如果当前有主机已连接且协议栈处于活动状态,则返回 True,否则返回 False

protocol.register(name: str, *, backend: object, flags: int = 0) ProtocolChannel

将一个 Python backend 对象注册为新的逻辑通道,并返回一个 ProtocolChannel 句柄。backend 对象可用的方法(参见下方的 后端接口)决定了该通道的能力;当相应的方法被实现时,protocol.CHANNEL_FLAG_READprotocol.CHANNEL_FLAG_WRITEprotocol.CHANNEL_FLAG_LOCK 会被自动添加到 flags 中。

name 是字符串形式的通道名称。会被截断到固件的通道名缓冲区大小。必填。

backend 是实现了后端接口的 Python 对象。必填。 通常以关键字方式传入(backend=...)。

flags 是附加的通道标志位(参见 CHANNEL_FLAG_* 常量)。可选;默认为 0

如果通道无法注册(例如没有空闲的通道槽位),则抛出 RuntimeError

class protocol.ProtocolChannel

protocol.register 返回的句柄。实例不会被直接构造。

send_event(event: int, wait_ack: bool = False) None

向主机发送一个通道事件通知。

event 是事件标识符(整数)。

wait_ack 若为 True,则阻塞直到主机确认该事件。

如果发送事件失败,则抛出 RuntimeError

后端接口

传给 protocol.register 的后端对象可以实现以下方法的任意子集。只有对象上存在的方法才会被接入 C 协议层;缺失的方法会使相应的能力被禁用。

class protocol.backend

传给 protocol.register 的通道后端对象。下面的方法描述了 Python 后端可以实现的可选接口。

init() object

在通道初始化时调用一次。成功时返回任何非 None 的值;抛出异常或没有返回值会被视为错误。

poll() bool

如果通道有数据准备好供主机读取,则返回 True

lock() bool

为一次传输获取该通道。成功时返回 True

unlock() bool

在一次传输后释放该通道。成功时返回 True

size() int

返回当前可从通道读取的字节数。

shape() tuple

返回一个最多包含四个整数的元组,用于描述数据形状(例如图像尺寸)。协议层最多使用其中四个元素。

flush() object

刷新所有待处理的数据。成功时返回任何非 None 的值。

read(offset: int, size: int) bytes

offset 开始返回最多 size 个字节,以支持缓冲区协议的 bytes 类对象形式返回。

readp(offset: int, size: int) bytes

read 的零拷贝变体。返回一个其底层内存由协议层直接读取的缓冲区;该缓冲区在传输期间必须保持有效。

write(offset: int, data: bytearray) int

offset 处写入 datadata 是一个直接引用 C 缓冲区的 bytearray。返回写入的字节数,或在默认成功时返回 0

ioctl(cmd: int, length: int, arg: bytearray | None) int

处理一个 ioctl。如果 length 为零,则 argNone,否则为引用 C 缓冲区的 bytearray。成功时返回 0None,出错时返回一个负整数。

is_active() bool

对于传输通道,如果底层传输当前已连接,则返回 True

class protocol.CBORChannel(on_read: Callable | None = None, on_write: Callable | None = None)

一个更高层级的 Python 后端(由冻结的 protocol 包提供),它使用 SenML 兼容的整数键将命名字段序列化为 CBOR。支持显示控件(labeldepth)以及带有 on_read/on_write 回调的交互式控件(togglesliderselect)。

on_read 是一个可选的可调用对象 on_read(channel),在通道为主机序列化之前调用。可用它来刷新字段值。

on_write 是一个可选的可调用对象 on_write(channel, name, value),当主机为某个命名字段写入新值时调用。

add(name: str, type: str, value: Any = None, unit: str | None = None, min: int | float | None = None, max: int | float | None = None, step: int | float | None = None, options: list | None = None, width: int | None = None, height: int | None = None) None

向通道添加一个命名字段。

name 是显示名称;在该通道内必须唯一。

type 是控件类型:"label""toggle""slider""select""depth"

value 是初始值。默认值取决于 type

unit 是用于 label/slider 的单位字符串(例如 "Cel""%RH")。

min 是最小值(滑块范围或深度范围)。

max 是最大值(滑块范围或深度范围)。

step 是步长(滑块)。

options 是选项字符串列表(select)。

width 是以像素为单位的宽度(depth)。

height 是以像素为单位的高度(depth)。

__getitem__(name: str) object

返回命名字段的当前值。对于 depth 字段返回二进制数据缓冲区,否则返回标量值。

__setitem__(name: str, value: Any) None

设置命名字段的值。对于 slider 字段,传入一个 (min, max, value) 元组可同时更新范围和当前值。对于 depth 字段,value 是二进制数据缓冲区。

poll() bool

后端接口方法。当有序列化数据可供主机使用时返回 True

size() int

后端接口方法。调用 on_read(如果已设置)并返回序列化缓冲区的大小。

read(offset: int, size: int) bytes

后端接口方法。返回序列化缓冲区的一个切片。

write(offset: int, data: bytearray) int

后端接口方法。解码一个 CBOR 更新列表并将值应用到匹配的命名字段,对每个字段调用 on_write

常量

通道标志位(按位组合;通过 flags 传给 protocol.register,或根据后端的方法自动设置)。

protocol.CHANNEL_FLAG_READ: int

该通道支持读取。

protocol.CHANNEL_FLAG_WRITE: int

该通道支持写入。

protocol.CHANNEL_FLAG_LOCK: int

该通道实现了 lock/unlock

protocol.CHANNEL_FLAG_PHYSICAL: int

该通道代表一个物理传输(与逻辑数据通道相对)。

内置通道标识符。

protocol.CHANNEL_ID_TRANSPORT: int

为活动传输保留的通道 ID。

protocol.CHANNEL_ID_STDIN: int

内置 stdin 通道的通道 ID。

protocol.CHANNEL_ID_STDOUT: int

内置 stdout 通道的通道 ID。

protocol.CHANNEL_ID_STREAM: int

内置 stream 通道的通道 ID。

protocol.CHANNEL_ID_PROFILE: int

内置性能分析器通道的通道 ID(仅在固件启用性能分析器构建时存在)。