protocol --- OpenMV 协议通道¶
protocol 模块向 Python 暴露了 OpenMV 主机协议。它允许初始化和配置固件端的协议栈,并让用户代码注册由实现了通道接口(read、write、size、poll 等)的 Python 对象支持的自定义逻辑通道。当桌面配套工具向相连的摄像头流式传输图像数据或暴露交互式控件时,它们就是与此进行通信的。
示例¶
使用实现了原始通道接口(backend.size()、backend.shape()、backend.poll()、backend.read())的自定义后端,向主机工具流式传输一张 RGB565 图像:
import csi
import protocol
csi0 = csi.CSI()
csi0.reset()
csi0.pixformat(csi.RGB565)
csi0.framesize(csi.HD)
img = csi0.snapshot()
img_mv = memoryview(img.bytearray())
frame_ready = True
class FrameChannel:
def size(self):
return len(img_mv)
def shape(self):
return (img.height(), img.width(), len(img_mv))
def poll(self):
return frame_ready
def read(self, offset, size):
global frame_ready
end = offset + size
chunk = img_mv[offset:end]
if end >= len(img_mv):
frame_ready = False
return chunk
protocol.register(name="frame", backend=FrameChannel())
while True:
if not frame_ready:
img = csi0.snapshot()
img_mv = memoryview(img.bytearray())
frame_ready = True
对应的主机端脚本,使用 openmv Python 包(pip install openmv)进行连接、推送摄像头端脚本并拉取每一帧:
import cv2
import numpy as np
from openmv.camera import Camera
# The on-cam script above, stored as a string (or read from a file).
SCRIPT = open("frame_streamer_on_cam.py").read()
with Camera("/dev/ttyACM0", baudrate=921600) as cam:
cam.stop()
cam.exec(SCRIPT)
while True:
status = cam.read_status()
if not cam.has_channel("frame") or not status.get("frame"):
continue
h, w, size = cam._channel_shape(cam.get_channel(name="frame"))
if cam.channel_size("frame") < size:
continue
data = cam.channel_read("frame", size)
rgb565 = np.frombuffer(data, dtype="<u2").reshape(h, w)
# Unpack RGB565 to an HxWx3 uint8 RGB image.
r = ((rgb565 >> 11) & 0x1F) << 3
g = ((rgb565 >> 5) & 0x3F) << 2
b = ( rgb565 & 0x1F) << 3
frame = np.dstack([r, g, b]).astype(np.uint8)
# Display with OpenCV (cv2 expects BGR, not RGB).
cv2.imshow("OpenMV", cv2.cvtColor(frame, cv2.COLOR_RGB2BGR))
if cv2.waitKey(1) == ord("q"):
break
cv2.destroyAllWindows()
将 /dev/ttyACM0 替换为摄像头的串口(例如 Windows 上的 COM3)。当摄像头端协议栈已被重新配置以匹配时,openmv.camera.Camera 构造函数接受与 init 相同的协议参数(crc / seq / ack / events / max_payload / max_retry / timeout)。
函数¶
- protocol.init(crc: bool = True, seq: bool = True, ack: bool = True, events: bool = True, max_payload: int = ..., rtx_retries: int = 3, rtx_timeout_ms: int = 500, lock_interval_ms: int = 10, poll_ms: int = 0) None¶
初始化(或重新配置)协议栈,并注册默认的逻辑数据通道(
stdin、stdout、stream,以及在编译启用时的profile)。如果初始化失败则抛出RuntimeError。固件启动时已经运行着一个默认的 USB 协议栈,因此只有在需要更改传输方式或覆盖默认的帧封装参数时才需要调用此函数。crc在协议帧上启用 CRC 校验。seq启用序列号跟踪。ack启用逐帧确认。events启用通道事件通知。max_payload是以字节为单位的最大负载大小。如果省略,则使用下表中各摄像头的默认值;该值由每块板卡的协议缓冲区大小按buffer - 10 (header) - 4 (CRC)推导得出。摄像头
缓冲区大小
最大负载
OpenMV Cam M4(
OPENMV2)512
498
OpenMV Cam M7(
OPENMV3)512
498
OpenMV Cam H7(
OPENMV4)512
498
OpenMV Cam H7 Plus(
OPENMV4P)4096
4082
OpenMV Pure Thermal(
OPENMVPT)4096
4082
OpenMV Cam RT1062(
OPENMV_RT1060)4096
4082
OpenMV Cam N6(
OPENMV_N6)8192
8178
OpenMV AE3(
OPENMV_AE3)8192
8178
Arduino Portenta H7(
ARDUINO_PORTENTA_H7)4096
4082
Arduino Giga(
ARDUINO_GIGA)4096
4082
Arduino Nicla Vision(
ARDUINO_NICLA_VISION)4096
4082
rtx_retries是重传尝试次数。默认为3。rtx_timeout_ms是以毫秒为单位的重传超时时间(每次超时后翻倍)。默认为500。lock_interval_ms是以毫秒为单位的最小锁定间隔。默认为10。poll_ms是以毫秒为单位的轮询间隔。0(默认值)禁用定时器轮询。
- protocol.register(name: str, *, backend: object, flags: int = 0) ProtocolChannel¶
将一个 Python
backend对象注册为新的逻辑通道,并返回一个ProtocolChannel句柄。backend对象可用的方法(参见下方的 后端接口)决定了该通道的能力;当相应的方法被实现时,protocol.CHANNEL_FLAG_READ、protocol.CHANNEL_FLAG_WRITE和protocol.CHANNEL_FLAG_LOCK会被自动添加到flags中。name是字符串形式的通道名称。会被截断到固件的通道名缓冲区大小。必填。backend是实现了后端接口的 Python 对象。必填。 通常以关键字方式传入(backend=...)。flags是附加的通道标志位(参见CHANNEL_FLAG_*常量)。可选;默认为0。如果通道无法注册(例如没有空闲的通道槽位),则抛出
RuntimeError。
类¶
后端接口¶
传给 protocol.register 的后端对象可以实现以下方法的任意子集。只有对象上存在的方法才会被接入 C 协议层;缺失的方法会使相应的能力被禁用。
- class protocol.backend¶
传给
protocol.register的通道后端对象。下面的方法描述了 Python 后端可以实现的可选接口。- write(offset: int, data: bytearray) int¶
在
offset处写入data。data是一个直接引用 C 缓冲区的bytearray。返回写入的字节数,或在默认成功时返回0。
- class protocol.CBORChannel(on_read: Callable | None = None, on_write: Callable | None = None)¶
一个更高层级的 Python 后端(由冻结的
protocol包提供),它使用 SenML 兼容的整数键将命名字段序列化为 CBOR。支持显示控件(label、depth)以及带有on_read/on_write回调的交互式控件(toggle、slider、select)。on_read是一个可选的可调用对象on_read(channel),在通道为主机序列化之前调用。可用它来刷新字段值。on_write是一个可选的可调用对象on_write(channel, name, value),当主机为某个命名字段写入新值时调用。- add(name: str, type: str, value: Any = None, unit: str | None = None, min: int | float | None = None, max: int | float | None = None, step: int | float | None = None, options: list | None = None, width: int | None = None, height: int | None = None) None¶
向通道添加一个命名字段。
name是显示名称;在该通道内必须唯一。type是控件类型:"label"、"toggle"、"slider"、"select"或"depth"。value是初始值。默认值取决于type。unit是用于label/slider的单位字符串(例如"Cel"、"%RH")。min是最小值(滑块范围或深度范围)。max是最大值(滑块范围或深度范围)。step是步长(滑块)。options是选项字符串列表(select)。width是以像素为单位的宽度(depth)。height是以像素为单位的高度(depth)。
常量¶
通道标志位(按位组合;通过 flags 传给 protocol.register,或根据后端的方法自动设置)。
内置通道标识符。