protocol --- OpenMVプロトコルチャネル

protocol モジュールは、OpenMVホストプロトコルをPythonに公開します。ファームウェア側のプロトコルスタックを初期化・構成でき、チャネルインターフェース(readwritesizepoll など)を実装したPythonオブジェクトを背後に持つカスタム論理チャネルを、ユーザーコードから登録できるようにします。これは、デスクトップのコンパニオンツールが画像データをストリーミングしたり、接続されたカメラに対話型ウィジェットを公開したりする際にやり取りする対象です。

rawチャネルインターフェース(backend.size()backend.shape()backend.poll()backend.read())を実装したカスタムバックエンドを使用して、RGB565画像をホストツールへストリーミングします:

import csi
import protocol

csi0 = csi.CSI()
csi0.reset()
csi0.pixformat(csi.RGB565)
csi0.framesize(csi.HD)

img = csi0.snapshot()
img_mv = memoryview(img.bytearray())
frame_ready = True


class FrameChannel:
    def size(self):
        return len(img_mv)

    def shape(self):
        return (img.height(), img.width(), len(img_mv))

    def poll(self):
        return frame_ready

    def read(self, offset, size):
        global frame_ready
        end = offset + size
        chunk = img_mv[offset:end]
        if end >= len(img_mv):
            frame_ready = False
        return chunk


protocol.register(name="frame", backend=FrameChannel())

while True:
    if not frame_ready:
        img = csi0.snapshot()
        img_mv = memoryview(img.bytearray())
        frame_ready = True

接続してカメラ側スクリプトを送り込み、各フレームを取得するために openmv Pythonパッケージ(pip install openmv)を使用する、対応するホスト側スクリプト:

import cv2
import numpy as np
from openmv.camera import Camera

# The on-cam script above, stored as a string (or read from a file).
SCRIPT = open("frame_streamer_on_cam.py").read()

with Camera("/dev/ttyACM0", baudrate=921600) as cam:
    cam.stop()
    cam.exec(SCRIPT)

    while True:
        status = cam.read_status()
        if not cam.has_channel("frame") or not status.get("frame"):
            continue

        h, w, size = cam._channel_shape(cam.get_channel(name="frame"))
        if cam.channel_size("frame") < size:
            continue

        data = cam.channel_read("frame", size)
        rgb565 = np.frombuffer(data, dtype="<u2").reshape(h, w)

        # Unpack RGB565 to an HxWx3 uint8 RGB image.
        r = ((rgb565 >> 11) & 0x1F) << 3
        g = ((rgb565 >>  5) & 0x3F) << 2
        b = ( rgb565        & 0x1F) << 3
        frame = np.dstack([r, g, b]).astype(np.uint8)

        # Display with OpenCV (cv2 expects BGR, not RGB).
        cv2.imshow("OpenMV", cv2.cvtColor(frame, cv2.COLOR_RGB2BGR))
        if cv2.waitKey(1) == ord("q"):
            break

cv2.destroyAllWindows()

/dev/ttyACM0 をカメラのシリアルポート(Windowsでは COM3 など)に置き換えてください。openmv.camera.Camera コンストラクタは、カメラ側のスタックがそれに合わせて再構成されている場合、init と同じプロトコルパラメータ(crc / seq / ack / events / max_payload / max_retry / timeout)を受け付けます。

関数

protocol.init(crc: bool = True, seq: bool = True, ack: bool = True, events: bool = True, max_payload: int = ..., rtx_retries: int = 3, rtx_timeout_ms: int = 500, lock_interval_ms: int = 10, poll_ms: int = 0) None

プロトコルスタックを初期化(または再構成)し、デフォルトの論理データチャネル(stdinstdoutstream、およびコンパイルされていれば profile)を登録します。初期化に失敗した場合は RuntimeError を送出します。ファームウェアはデフォルトのUSBプロトコルスタックがすでに動作している状態で起動するため、これを呼び出す必要があるのはトランスポートを変更する場合やデフォルトのフレーミングパラメータをオーバーライドする場合のみです。

crc はプロトコルフレームのCRC検証を有効にします。

seq はシーケンス番号の追跡を有効にします。

ack はフレームごとの確認応答を有効にします。

events はチャネルイベント通知を有効にします。

max_payload は最大ペイロードサイズ(バイト単位)です。省略した場合は以下のカメラごとのデフォルトが使用されます。これは各ボードのプロトコルバッファサイズから buffer - 10 (header) - 4 (CRC) として算出されます。

カメラ

バッファサイズ

最大ペイロード

OpenMV Cam M4 (OPENMV2)

512

498

OpenMV Cam M7 (OPENMV3)

512

498

OpenMV Cam H7 (OPENMV4)

512

498

OpenMV Cam H7 Plus (OPENMV4P)

4096

4082

OpenMV Pure Thermal (OPENMVPT)

4096

4082

OpenMV Cam RT1062 (OPENMV_RT1060)

4096

4082

OpenMV Cam N6 (OPENMV_N6)

8192

8178

OpenMV AE3 (OPENMV_AE3)

8192

8178

Arduino Portenta H7 (ARDUINO_PORTENTA_H7)

4096

4082

Arduino Giga (ARDUINO_GIGA)

4096

4082

Arduino Nicla Vision (ARDUINO_NICLA_VISION)

4096

4082

rtx_retries は再送試行回数です。デフォルトは 3 です。

rtx_timeout_ms は再送タイムアウト(ミリ秒単位、タイムアウトごとに倍増)です。デフォルトは 500 です。

lock_interval_ms は最小ロック間隔(ミリ秒単位)です。デフォルトは 10 です。

poll_ms はポーリング間隔(ミリ秒単位)です。0(デフォルト)はタイマーポーリングを無効にします。

protocol.is_active() bool

ホストが現在接続されておりプロトコルスタックがアクティブな場合は True を、そうでない場合は False を返します。

protocol.register(name: str, *, backend: object, flags: int = 0) ProtocolChannel

Pythonの backend オブジェクトを新しい論理チャネルとして登録し、ProtocolChannel ハンドルを返します。backend オブジェクトで利用可能なメソッド(後述の バックエンドインターフェース を参照)がチャネルの機能を決定します。対応するメソッドが実装されている場合、protocol.CHANNEL_FLAG_READprotocol.CHANNEL_FLAG_WRITEprotocol.CHANNEL_FLAG_LOCKflags に自動的に追加されます。

name は文字列としてのチャネル名です。ファームウェアのチャネル名バッファサイズに切り詰められます。必須。

backend はバックエンドインターフェースを実装したPythonオブジェクトです。必須。 通常はキーワード引数(backend=...)として渡します。

flags は追加のチャネルフラグビット(CHANNEL_FLAG_* 定数を参照)です。任意で、デフォルトは 0 です。

チャネルを登録できない場合(空きチャネルスロットがないなど)は RuntimeError を送出します。

クラス

class protocol.ProtocolChannel

protocol.register が返すハンドル。インスタンスを直接構築することはありません。

send_event(event: int, wait_ack: bool = False) None

チャネルイベント通知をホストに送信します。

event はイベント識別子(整数)です。

wait_ackTrue の場合、ホストがイベントを確認応答するまでブロックします。

イベントの送信に失敗した場合は RuntimeError を送出します。

バックエンドインターフェース

protocol.register に渡されるバックエンドオブジェクトは、以下のメソッドの任意のサブセットを実装できます。オブジェクトに存在するメソッドのみがCプロトコル層に接続され、欠落しているメソッドに対応する機能は無効のままになります。

class protocol.backend

protocol.register に渡されるチャネルバックエンドオブジェクト。以下のメソッドは、Pythonバックエンドが実装できる任意のインターフェースを説明します。

init() object

チャネルが初期化されるときに一度だけ呼び出されます。成功時には None 以外の任意の値を返します。例外や戻り値の欠如はエラーとして扱われます。

poll() bool

チャネルにホストが読み取れるデータが準備できている場合は True を返します。

lock() bool

転送のためにチャネルを取得します。成功時には True を返します。

unlock() bool

転送後にチャネルを解放します。成功時には True を返します。

size() int

チャネルから現在読み取り可能なバイト数を返します。

shape() tuple

データの形状(画像の寸法など)を表す最大4つの整数のタプルを返します。プロトコル層によって最大4つの要素が消費されます。

flush() object

保留中のデータをフラッシュします。成功時には None 以外の任意の値を返します。

read(offset: int, size: int) bytes

offset から始まる最大 size バイトを、バッファプロトコルをサポートする bytes 様オブジェクトとして返します。

readp(offset: int, size: int) bytes

read のゼロコピー版です。基盤となるメモリがプロトコル層によって直接読み取られるバッファを返します。バッファは転送の間有効であり続ける必要があります。

write(offset: int, data: bytearray) int

offset の位置に data を書き込みます。data はCバッファを直接参照する bytearray です。書き込まれたバイト数を返すか、デフォルトの成功時には 0 を返します。

ioctl(cmd: int, length: int, arg: bytearray | None) int

ioctlを処理します。length がゼロの場合 argNone で、そうでない場合はCバッファを参照する bytearray です。成功時には 0 または None を、エラー時には負の整数を返します。

is_active() bool

トランスポートチャネルの場合、基盤となるトランスポートが現在接続されていれば True を返します。

class protocol.CBORChannel(on_read: Callable | None = None, on_write: Callable | None = None)

名前付きフィールドをSenML互換の整数キーを使用してCBORにシリアライズする、より高水準のPythonバックエンド(フリーズされた protocol パッケージが提供)です。表示ウィジェット(labeldepth)と、on_read/on_write コールバックを備えた対話型コントロール(togglesliderselect)をサポートします。

on_read は任意の呼び出し可能オブジェクト on_read(channel) で、チャネルがホスト向けにシリアライズされる前に呼び出されます。フィールド値を更新するために使用します。

on_write は任意の呼び出し可能オブジェクト on_write(channel, name, value) で、ホストが名前付きフィールドに新しい値を書き込んだときに呼び出されます。

add(name: str, type: str, value: Any = None, unit: str | None = None, min: int | float | None = None, max: int | float | None = None, step: int | float | None = None, options: list | None = None, width: int | None = None, height: int | None = None) None

名前付きフィールドをチャネルに追加します。

name は表示名です。このチャネル内で一意である必要があります。

type はウィジェットの種類です: "label""toggle""slider""select"、または "depth"

value は初期値です。デフォルトは type によって異なります。

unitlabel/slider の単位文字列("Cel""%RH" など)です。

min は最小値(スライダー範囲またはdepth範囲)です。

max は最大値(スライダー範囲またはdepth範囲)です。

step はステップサイズ(スライダー)です。

options はオプション文字列のリスト(select)です。

width はピクセル幅(depth)です。

height はピクセル高さ(depth)です。

__getitem__(name: str) object

名前付きフィールドの現在の値を返します。depth フィールドの場合はバイナリデータバッファが、それ以外の場合はスカラー値が返されます。

__setitem__(name: str, value: Any) None

名前付きフィールドの値を設定します。slider フィールドの場合、(min, max, value) タプルは範囲と現在値を同時に更新します。depth フィールドの場合、value はバイナリデータバッファです。

poll() bool

バックエンドインターフェースのメソッド。シリアライズされたデータがホスト向けに利用可能な場合に True を返します。

size() int

バックエンドインターフェースのメソッド。on_read(設定されている場合)を呼び出し、シリアライズされたバッファのサイズを返します。

read(offset: int, size: int) bytes

バックエンドインターフェースのメソッド。シリアライズされたバッファのスライスを返します。

write(offset: int, data: bytearray) int

バックエンドインターフェースのメソッド。CBOR更新リストをデコードし、一致する名前付きフィールドに値を適用し、それぞれに対して on_write を呼び出します。

定数

チャネルフラグビット(ビット単位で組み合わせ、flags を介して protocol.register に渡されるか、バックエンドのメソッドに基づいて自動的に設定されます)。

protocol.CHANNEL_FLAG_READ: int

チャネルが読み取りをサポートしています。

protocol.CHANNEL_FLAG_WRITE: int

チャネルが書き込みをサポートしています。

protocol.CHANNEL_FLAG_LOCK: int

チャネルが lock/unlock を実装しています。

protocol.CHANNEL_FLAG_PHYSICAL: int

チャネルが物理トランスポート(論理データチャネルとは対照的に)を表しています。

組み込みチャネル識別子。

protocol.CHANNEL_ID_TRANSPORT: int

アクティブなトランスポート用に予約されたチャネルID。

protocol.CHANNEL_ID_STDIN: int

組み込み stdin チャネルのチャネルID。

protocol.CHANNEL_ID_STDOUT: int

組み込み stdout チャネルのチャネルID。

protocol.CHANNEL_ID_STREAM: int

組み込み stream チャネルのチャネルID。

protocol.CHANNEL_ID_PROFILE: int

組み込みプロファイラチャネルのチャネルID(ファームウェアがプロファイラを有効にしてビルドされている場合にのみ存在します)。