protocol --- OpenMV 協定通道

protocol 模組將 OpenMV 主機協定開放給 Python 使用。它允許初始化與設定韌體端的協定堆疊,並讓使用者程式碼註冊由實作通道介面(readwritesizepoll 等)的 Python 物件支援的自訂邏輯通道。當桌面端的搭配工具串流影像資料,或向已連線的相機開放互動式小工具時,所溝通的就是此協定。

範例

使用實作原始通道介面(backend.size()backend.shape()backend.poll()backend.read())的自訂後端,將 RGB565 影像串流至主機工具::

import csi
import protocol

csi0 = csi.CSI()
csi0.reset()
csi0.pixformat(csi.RGB565)
csi0.framesize(csi.HD)

img = csi0.snapshot()
img_mv = memoryview(img.bytearray())
frame_ready = True


class FrameChannel:
    def size(self):
        return len(img_mv)

    def shape(self):
        return (img.height(), img.width(), len(img_mv))

    def poll(self):
        return frame_ready

    def read(self, offset, size):
        global frame_ready
        end = offset + size
        chunk = img_mv[offset:end]
        if end >= len(img_mv):
            frame_ready = False
        return chunk


protocol.register(name="frame", backend=FrameChannel())

while True:
    if not frame_ready:
        img = csi0.snapshot()
        img_mv = memoryview(img.bytearray())
        frame_ready = True

對應的主機端指令碼,使用 openmv Python 套件(pip install openmv)來連線、推送相機端指令碼,並拉取每一個影格::

import cv2
import numpy as np
from openmv.camera import Camera

# The on-cam script above, stored as a string (or read from a file).
SCRIPT = open("frame_streamer_on_cam.py").read()

with Camera("/dev/ttyACM0", baudrate=921600) as cam:
    cam.stop()
    cam.exec(SCRIPT)

    while True:
        status = cam.read_status()
        if not cam.has_channel("frame") or not status.get("frame"):
            continue

        h, w, size = cam._channel_shape(cam.get_channel(name="frame"))
        if cam.channel_size("frame") < size:
            continue

        data = cam.channel_read("frame", size)
        rgb565 = np.frombuffer(data, dtype="<u2").reshape(h, w)

        # Unpack RGB565 to an HxWx3 uint8 RGB image.
        r = ((rgb565 >> 11) & 0x1F) << 3
        g = ((rgb565 >>  5) & 0x3F) << 2
        b = ( rgb565        & 0x1F) << 3
        frame = np.dstack([r, g, b]).astype(np.uint8)

        # Display with OpenCV (cv2 expects BGR, not RGB).
        cv2.imshow("OpenMV", cv2.cvtColor(frame, cv2.COLOR_RGB2BGR))
        if cv2.waitKey(1) == ord("q"):
            break

cv2.destroyAllWindows()

請將 /dev/ttyACM0 替換為相機的序列埠(例如在 Windows 上為 COM3)。當相機端的堆疊已重新設定為相符時,openmv.camera.Camera 建構式接受與 init 相同的協定參數(crc / seq / ack / events / max_payload / max_retry / timeout)。

函式

protocol.init(crc: bool = True, seq: bool = True, ack: bool = True, events: bool = True, max_payload: int = ..., rtx_retries: int = 3, rtx_timeout_ms: int = 500, lock_interval_ms: int = 10, poll_ms: int = 0) None

初始化(或重新設定)協定堆疊,並註冊預設的邏輯資料通道(stdinstdoutstream,以及若有編譯進來的 profile)。若初始化失敗則引發 RuntimeError。韌體開機時已執行預設的 USB 協定堆疊,因此只有在要變更傳輸方式或覆寫預設的訊框參數時才需要呼叫此函式。

crc 啟用協定訊框上的 CRC 驗證。

seq 啟用序號追蹤。

ack 啟用每訊框的確認。

events 啟用通道事件通知。

max_payload 為以位元組計的最大酬載大小。若省略則使用下方各相機的預設值;該值由每塊板子的協定緩衝區大小推導而來,計算方式為 buffer - 10 (header) - 4 (CRC)

相機

緩衝區大小

最大酬載

OpenMV Cam M4(OPENMV2

512

498

OpenMV Cam M7(OPENMV3

512

498

OpenMV Cam H7(OPENMV4

512

498

OpenMV Cam H7 Plus(OPENMV4P

4096

4082

OpenMV Pure Thermal(OPENMVPT

4096

4082

OpenMV Cam RT1062(OPENMV_RT1060

4096

4082

OpenMV Cam N6(OPENMV_N6

8192

8178

OpenMV AE3(OPENMV_AE3

8192

8178

Arduino Portenta H7(ARDUINO_PORTENTA_H7

4096

4082

Arduino Giga(ARDUINO_GIGA

4096

4082

Arduino Nicla Vision(ARDUINO_NICLA_VISION

4096

4082

rtx_retries 為重傳嘗試次數。預設為 3

rtx_timeout_ms 為以毫秒計的重傳逾時時間(每次逾時後加倍)。預設為 500

lock_interval_ms 為以毫秒計的最小鎖定間隔。預設為 10

poll_ms 為以毫秒計的輪詢間隔。0(預設值)會停用計時器輪詢。

protocol.is_active() bool

若目前有主機連線且協定堆疊處於作用中,則回傳 True,否則回傳 False

protocol.register(name: str, *, backend: object, flags: int = 0) ProtocolChannel

將一個 Python backend 物件註冊為新的邏輯通道,並回傳一個 ProtocolChannel 控制代碼。backend 物件可用的方法(見下方 後端介面)決定該通道的能力;當對應方法有實作時,protocol.CHANNEL_FLAG_READprotocol.CHANNEL_FLAG_WRITEprotocol.CHANNEL_FLAG_LOCK 會自動加入 flags

name 為字串形式的通道名稱。會被截斷至韌體的通道名稱緩衝區大小。必填。

backend 為實作後端介面的 Python 物件。必填。通常以關鍵字傳入(backend=...)。

flags 為額外的通道旗標位元(見 CHANNEL_FLAG_* 常數)。選填;預設為 0

若無法註冊通道(例如沒有空閒的通道槽)則引發 RuntimeError

類別

class protocol.ProtocolChannel

protocol.register 回傳的控制代碼。實例不會直接建構。

send_event(event: int, wait_ack: bool = False) None

向主機傳送一個通道事件通知。

event 為事件識別碼(整數)。

wait_ack 若為 True 則會阻塞,直到主機確認該事件為止。

若傳送事件失敗則引發 RuntimeError

後端介面

傳入 protocol.register 的後端物件可以實作下列方法的任何子集。只有該物件上實際存在的方法會接到 C 協定層;缺少的方法會使對應能力停用。

class protocol.backend

傳入 protocol.register 的通道後端物件。下列方法描述 Python 後端可實作的選用介面。

init() object

在通道初始化時呼叫一次。成功時回傳任何非 None 的值;例外或缺少回傳值會被視為錯誤。

poll() bool

若通道有資料就緒可供主機讀取,則回傳 True

lock() bool

為傳輸取得通道。成功時回傳 True

unlock() bool

在傳輸後釋放通道。成功時回傳 True

size() int

回傳目前可從通道讀取的位元組數。

shape() tuple

回傳一個最多包含四個整數的元組,描述資料形狀(例如影像尺寸)。協定層最多會使用其中四個元素。

flush() object

清空任何待處理的資料。成功時回傳任何非 None 的值。

read(offset: int, size: int) bytes

offset 開始回傳最多 size 個位元組,以支援緩衝區協定的 bytes 類物件形式回傳。

readp(offset: int, size: int) bytes

read 的零複製變體。回傳一個其底層記憶體由協定層直接讀取的緩衝區;該緩衝區在傳輸期間必須保持有效。

write(offset: int, data: bytearray) int

offset 處寫入 datadata 為直接參照 C 緩衝區的 bytearray。回傳寫入的位元組數,或在預設成功時回傳 0

ioctl(cmd: int, length: int, arg: bytearray | None) int

處理一個 ioctl。若 length 為零,則 argNone,否則為參照 C 緩衝區的 bytearray。成功時回傳 0None,發生錯誤時回傳負整數。

is_active() bool

對於傳輸通道,若底層傳輸目前已連線則回傳 True

class protocol.CBORChannel(on_read: Callable | None = None, on_write: Callable | None = None)

一個較高階的 Python 後端(由凍結的 protocol 套件提供),會使用相容於 SenML 的整數鍵將具名欄位序列化為 CBOR。支援顯示小工具(labeldepth)與互動式控制項(togglesliderselect),並具備 on_read/on_write 回呼函式。

on_read 為選用的可呼叫物件 on_read(channel),會在通道為主機序列化之前被呼叫。可用它來重新整理欄位值。

on_write 為選用的可呼叫物件 on_write(channel, name, value),會在主機寫入某具名欄位的新值時被呼叫。

add(name: str, type: str, value: Any = None, unit: str | None = None, min: int | float | None = None, max: int | float | None = None, step: int | float | None = None, options: list | None = None, width: int | None = None, height: int | None = None) None

在通道中加入一個具名欄位。

name 為顯示名稱;在此通道內必須唯一。

type 為小工具類型:"label""toggle""slider""select""depth"

value 為初始值。預設值取決於 type

unitlabel/slider 的單位字串(例如 "Cel""%RH")。

min 為最小值(slider 範圍或 depth 範圍)。

max 為最大值(slider 範圍或 depth 範圍)。

step 為步進大小(slider)。

options 為選項字串清單(select)。

width 為像素寬度(depth)。

height 為像素高度(depth)。

__getitem__(name: str) object

回傳具名欄位的目前值。對於 depth 欄位會回傳二進位資料緩衝區,否則回傳純量值。

__setitem__(name: str, value: Any) None

設定具名欄位的值。對於 slider 欄位,(min, max, value) 元組會同時更新範圍與目前值。對於 depth 欄位,value 為二進位資料緩衝區。

poll() bool

後端介面方法。當有序列化資料可供主機使用時回傳 True

size() int

後端介面方法。呼叫 on_read(若有設定)並回傳序列化緩衝區的大小。

read(offset: int, size: int) bytes

後端介面方法。回傳序列化緩衝區的一個切片。

write(offset: int, data: bytearray) int

後端介面方法。解碼一個 CBOR 更新清單並將值套用至相符的具名欄位,對每個欄位呼叫 on_write

常數

通道旗標位元(以位元方式組合;透過 flags 傳入 protocol.register,或依後端的方法自動設定)。

protocol.CHANNEL_FLAG_READ: int

通道支援讀取。

protocol.CHANNEL_FLAG_WRITE: int

通道支援寫入。

protocol.CHANNEL_FLAG_LOCK: int

通道實作了 lock/unlock

protocol.CHANNEL_FLAG_PHYSICAL: int

通道代表一個實體傳輸(相對於邏輯資料通道)。

內建通道識別碼。

protocol.CHANNEL_ID_TRANSPORT: int

作用中傳輸的保留通道 ID。

protocol.CHANNEL_ID_STDIN: int

內建 stdin 通道的通道 ID。

protocol.CHANNEL_ID_STDOUT: int

內建 stdout 通道的通道 ID。

protocol.CHANNEL_ID_STREAM: int

內建 stream 通道的通道 ID。

protocol.CHANNEL_ID_PROFILE: int

內建效能分析器通道的通道 ID(僅在韌體建置時啟用效能分析器才會存在)。