protocol --- OpenMVプロトコルチャネル¶
protocol モジュールは、OpenMVホストプロトコルをPythonに公開します。ファームウェア側のプロトコルスタックを初期化・構成でき、チャネルインターフェース(read、write、size、poll など)を実装したPythonオブジェクトを背後に持つカスタム論理チャネルを、ユーザーコードから登録できるようにします。これは、デスクトップのコンパニオンツールが画像データをストリーミングしたり、接続されたカメラに対話型ウィジェットを公開したりする際にやり取りする対象です。
例¶
rawチャネルインターフェース(backend.size()、backend.shape()、backend.poll()、backend.read())を実装したカスタムバックエンドを使用して、RGB565画像をホストツールへストリーミングします:
import csi
import protocol
csi0 = csi.CSI()
csi0.reset()
csi0.pixformat(csi.RGB565)
csi0.framesize(csi.HD)
img = csi0.snapshot()
img_mv = memoryview(img.bytearray())
frame_ready = True
class FrameChannel:
def size(self):
return len(img_mv)
def shape(self):
return (img.height(), img.width(), len(img_mv))
def poll(self):
return frame_ready
def read(self, offset, size):
global frame_ready
end = offset + size
chunk = img_mv[offset:end]
if end >= len(img_mv):
frame_ready = False
return chunk
protocol.register(name="frame", backend=FrameChannel())
while True:
if not frame_ready:
img = csi0.snapshot()
img_mv = memoryview(img.bytearray())
frame_ready = True
接続してカメラ側スクリプトを送り込み、各フレームを取得するために openmv Pythonパッケージ(pip install openmv)を使用する、対応するホスト側スクリプト:
import cv2
import numpy as np
from openmv.camera import Camera
# The on-cam script above, stored as a string (or read from a file).
SCRIPT = open("frame_streamer_on_cam.py").read()
with Camera("/dev/ttyACM0", baudrate=921600) as cam:
cam.stop()
cam.exec(SCRIPT)
while True:
status = cam.read_status()
if not cam.has_channel("frame") or not status.get("frame"):
continue
h, w, size = cam._channel_shape(cam.get_channel(name="frame"))
if cam.channel_size("frame") < size:
continue
data = cam.channel_read("frame", size)
rgb565 = np.frombuffer(data, dtype="<u2").reshape(h, w)
# Unpack RGB565 to an HxWx3 uint8 RGB image.
r = ((rgb565 >> 11) & 0x1F) << 3
g = ((rgb565 >> 5) & 0x3F) << 2
b = ( rgb565 & 0x1F) << 3
frame = np.dstack([r, g, b]).astype(np.uint8)
# Display with OpenCV (cv2 expects BGR, not RGB).
cv2.imshow("OpenMV", cv2.cvtColor(frame, cv2.COLOR_RGB2BGR))
if cv2.waitKey(1) == ord("q"):
break
cv2.destroyAllWindows()
/dev/ttyACM0 をカメラのシリアルポート(Windowsでは COM3 など)に置き換えてください。openmv.camera.Camera コンストラクタは、カメラ側のスタックがそれに合わせて再構成されている場合、init と同じプロトコルパラメータ(crc / seq / ack / events / max_payload / max_retry / timeout)を受け付けます。
関数¶
- protocol.init(crc: bool = True, seq: bool = True, ack: bool = True, events: bool = True, max_payload: int = ..., rtx_retries: int = 3, rtx_timeout_ms: int = 500, lock_interval_ms: int = 10, poll_ms: int = 0) None¶
プロトコルスタックを初期化(または再構成)し、デフォルトの論理データチャネル(
stdin、stdout、stream、およびコンパイルされていればprofile)を登録します。初期化に失敗した場合はRuntimeErrorを送出します。ファームウェアはデフォルトのUSBプロトコルスタックがすでに動作している状態で起動するため、これを呼び出す必要があるのはトランスポートを変更する場合やデフォルトのフレーミングパラメータをオーバーライドする場合のみです。crcはプロトコルフレームのCRC検証を有効にします。seqはシーケンス番号の追跡を有効にします。ackはフレームごとの確認応答を有効にします。eventsはチャネルイベント通知を有効にします。max_payloadは最大ペイロードサイズ(バイト単位)です。省略した場合は以下のカメラごとのデフォルトが使用されます。これは各ボードのプロトコルバッファサイズからbuffer - 10 (header) - 4 (CRC)として算出されます。カメラ
バッファサイズ
最大ペイロード
OpenMV Cam M4 (
OPENMV2)512
498
OpenMV Cam M7 (
OPENMV3)512
498
OpenMV Cam H7 (
OPENMV4)512
498
OpenMV Cam H7 Plus (
OPENMV4P)4096
4082
OpenMV Pure Thermal (
OPENMVPT)4096
4082
OpenMV Cam RT1062 (
OPENMV_RT1060)4096
4082
OpenMV Cam N6 (
OPENMV_N6)8192
8178
OpenMV AE3 (
OPENMV_AE3)8192
8178
Arduino Portenta H7 (
ARDUINO_PORTENTA_H7)4096
4082
Arduino Giga (
ARDUINO_GIGA)4096
4082
Arduino Nicla Vision (
ARDUINO_NICLA_VISION)4096
4082
rtx_retriesは再送試行回数です。デフォルトは3です。rtx_timeout_msは再送タイムアウト(ミリ秒単位、タイムアウトごとに倍増)です。デフォルトは500です。lock_interval_msは最小ロック間隔(ミリ秒単位)です。デフォルトは10です。poll_msはポーリング間隔(ミリ秒単位)です。0(デフォルト)はタイマーポーリングを無効にします。
- protocol.register(name: str, *, backend: object, flags: int = 0) ProtocolChannel¶
Pythonの
backendオブジェクトを新しい論理チャネルとして登録し、ProtocolChannelハンドルを返します。backendオブジェクトで利用可能なメソッド(後述の バックエンドインターフェース を参照)がチャネルの機能を決定します。対応するメソッドが実装されている場合、protocol.CHANNEL_FLAG_READ、protocol.CHANNEL_FLAG_WRITE、protocol.CHANNEL_FLAG_LOCKがflagsに自動的に追加されます。nameは文字列としてのチャネル名です。ファームウェアのチャネル名バッファサイズに切り詰められます。必須。backendはバックエンドインターフェースを実装したPythonオブジェクトです。必須。 通常はキーワード引数(backend=...)として渡します。flagsは追加のチャネルフラグビット(CHANNEL_FLAG_*定数を参照)です。任意で、デフォルトは0です。チャネルを登録できない場合(空きチャネルスロットがないなど)は
RuntimeErrorを送出します。
クラス¶
バックエンドインターフェース¶
protocol.register に渡されるバックエンドオブジェクトは、以下のメソッドの任意のサブセットを実装できます。オブジェクトに存在するメソッドのみがCプロトコル層に接続され、欠落しているメソッドに対応する機能は無効のままになります。
- class protocol.backend¶
protocol.registerに渡されるチャネルバックエンドオブジェクト。以下のメソッドは、Pythonバックエンドが実装できる任意のインターフェースを説明します。- readp(offset: int, size: int) bytes¶
readのゼロコピー版です。基盤となるメモリがプロトコル層によって直接読み取られるバッファを返します。バッファは転送の間有効であり続ける必要があります。
- write(offset: int, data: bytearray) int¶
offsetの位置にdataを書き込みます。dataはCバッファを直接参照するbytearrayです。書き込まれたバイト数を返すか、デフォルトの成功時には0を返します。
- class protocol.CBORChannel(on_read: Callable | None = None, on_write: Callable | None = None)¶
名前付きフィールドをSenML互換の整数キーを使用してCBORにシリアライズする、より高水準のPythonバックエンド(フリーズされた
protocolパッケージが提供)です。表示ウィジェット(label、depth)と、on_read/on_writeコールバックを備えた対話型コントロール(toggle、slider、select)をサポートします。on_readは任意の呼び出し可能オブジェクトon_read(channel)で、チャネルがホスト向けにシリアライズされる前に呼び出されます。フィールド値を更新するために使用します。on_writeは任意の呼び出し可能オブジェクトon_write(channel, name, value)で、ホストが名前付きフィールドに新しい値を書き込んだときに呼び出されます。- add(name: str, type: str, value: Any = None, unit: str | None = None, min: int | float | None = None, max: int | float | None = None, step: int | float | None = None, options: list | None = None, width: int | None = None, height: int | None = None) None¶
名前付きフィールドをチャネルに追加します。
nameは表示名です。このチャネル内で一意である必要があります。typeはウィジェットの種類です:"label"、"toggle"、"slider"、"select"、または"depth"。valueは初期値です。デフォルトはtypeによって異なります。unitはlabel/sliderの単位文字列("Cel"、"%RH"など)です。minは最小値(スライダー範囲またはdepth範囲)です。maxは最大値(スライダー範囲またはdepth範囲)です。stepはステップサイズ(スライダー)です。optionsはオプション文字列のリスト(select)です。widthはピクセル幅(depth)です。heightはピクセル高さ(depth)です。
定数¶
チャネルフラグビット(ビット単位で組み合わせ、flags を介して protocol.register に渡されるか、バックエンドのメソッドに基づいて自動的に設定されます)。
組み込みチャネル識別子。