protocol — OpenMV 프로토콜 채널

protocol 모듈은 OpenMV 호스트 프로토콜을 Python에 노출합니다. 펌웨어 측 프로토콜 스택을 초기화하고 구성할 수 있게 하며, 채널 인터페이스(read, write, size, poll 등)를 구현하는 Python 객체를 기반으로 사용자 코드가 사용자 정의 논리 채널을 등록할 수 있도록 합니다. 데스크톱 컴패니언 도구가 이미지 데이터를 스트리밍하거나 연결된 카메라에 대화형 위젯을 노출할 때 통신하는 대상이 바로 이것입니다.

예제

원시 채널 인터페이스(backend.size(), backend.shape(), backend.poll(), backend.read())를 구현하는 사용자 정의 백엔드를 사용해 RGB565 이미지를 호스트 도구로 스트리밍합니다:

import csi
import protocol

csi0 = csi.CSI()
csi0.reset()
csi0.pixformat(csi.RGB565)
csi0.framesize(csi.HD)

img = csi0.snapshot()
img_mv = memoryview(img.bytearray())
frame_ready = True


class FrameChannel:
    def size(self):
        return len(img_mv)

    def shape(self):
        return (img.height(), img.width(), len(img_mv))

    def poll(self):
        return frame_ready

    def read(self, offset, size):
        global frame_ready
        end = offset + size
        chunk = img_mv[offset:end]
        if end >= len(img_mv):
            frame_ready = False
        return chunk


protocol.register(name="frame", backend=FrameChannel())

while True:
    if not frame_ready:
        img = csi0.snapshot()
        img_mv = memoryview(img.bytearray())
        frame_ready = True

이에 대응하는 호스트 측 스크립트로, openmv Python 패키지(pip install openmv)를 사용하여 연결하고, 카메라 측 스크립트를 푸시하며, 각 프레임을 가져옵니다:

import cv2
import numpy as np
from openmv.camera import Camera

# The on-cam script above, stored as a string (or read from a file).
SCRIPT = open("frame_streamer_on_cam.py").read()

with Camera("/dev/ttyACM0", baudrate=921600) as cam:
    cam.stop()
    cam.exec(SCRIPT)

    while True:
        status = cam.read_status()
        if not cam.has_channel("frame") or not status.get("frame"):
            continue

        h, w, size = cam._channel_shape(cam.get_channel(name="frame"))
        if cam.channel_size("frame") < size:
            continue

        data = cam.channel_read("frame", size)
        rgb565 = np.frombuffer(data, dtype="<u2").reshape(h, w)

        # Unpack RGB565 to an HxWx3 uint8 RGB image.
        r = ((rgb565 >> 11) & 0x1F) << 3
        g = ((rgb565 >>  5) & 0x3F) << 2
        b = ( rgb565        & 0x1F) << 3
        frame = np.dstack([r, g, b]).astype(np.uint8)

        # Display with OpenCV (cv2 expects BGR, not RGB).
        cv2.imshow("OpenMV", cv2.cvtColor(frame, cv2.COLOR_RGB2BGR))
        if cv2.waitKey(1) == ord("q"):
            break

cv2.destroyAllWindows()

/dev/ttyACM0 을 카메라의 시리얼 포트(예: Windows에서는 COM3)로 바꾸세요. openmv.camera.Camera 생성자는 카메라 측 스택이 일치하도록 재구성된 경우 init 과 동일한 프로토콜 매개변수(crc / seq / ack / events / max_payload / max_retry / timeout)를 받습니다.

함수

protocol.init(crc: bool = True, seq: bool = True, ack: bool = True, events: bool = True, max_payload: int = ..., rtx_retries: int = 3, rtx_timeout_ms: int = 500, lock_interval_ms: int = 10, poll_ms: int = 0) None

프로토콜 스택을 초기화(또는 재구성)하고 기본 논리 데이터 채널(stdin, stdout, stream 및 컴파일된 경우 profile)을 등록합니다. 초기화에 실패하면 RuntimeError 를 발생시킵니다. 펌웨어는 기본 USB 프로토콜 스택이 이미 실행된 상태로 부팅되므로, 전송 방식을 변경하거나 기본 프레이밍 매개변수를 재정의할 때만 이 함수를 호출하면 됩니다.

crc 는 프로토콜 프레임에 대한 CRC 검증을 활성화합니다.

seq 는 시퀀스 번호 추적을 활성화합니다.

ack 는 프레임별 확인 응답을 활성화합니다.

events 는 채널 이벤트 알림을 활성화합니다.

max_payload 는 최대 페이로드 크기(바이트)입니다. 생략하면 아래의 카메라별 기본값이 사용됩니다. 이 값은 각 보드의 프로토콜 버퍼 크기에서 buffer - 10 (header) - 4 (CRC) 로 산출됩니다.

카메라

버퍼 크기

최대 페이로드

OpenMV Cam M4 (OPENMV2)

512

498

OpenMV Cam M7 (OPENMV3)

512

498

OpenMV Cam H7 (OPENMV4)

512

498

OpenMV Cam H7 Plus (OPENMV4P)

4096

4082

OpenMV Pure Thermal (OPENMVPT)

4096

4082

OpenMV Cam RT1062 (OPENMV_RT1060)

4096

4082

OpenMV Cam N6 (OPENMV_N6)

8192

8178

OpenMV AE3 (OPENMV_AE3)

8192

8178

Arduino Portenta H7 (ARDUINO_PORTENTA_H7)

4096

4082

Arduino Giga (ARDUINO_GIGA)

4096

4082

Arduino Nicla Vision (ARDUINO_NICLA_VISION)

4096

4082

rtx_retries 는 재전송 시도 횟수입니다. 기본값 3.

rtx_timeout_ms 는 밀리초 단위의 재전송 타임아웃입니다(타임아웃마다 두 배로 증가). 기본값 500.

lock_interval_ms 는 밀리초 단위의 최소 잠금 간격입니다. 기본값 10.

poll_ms 는 밀리초 단위의 폴링 간격입니다. 0 (기본값)은 타이머 폴링을 비활성화합니다.

protocol.is_active() bool

호스트가 현재 연결되어 있고 프로토콜 스택이 활성 상태이면 True 를, 그렇지 않으면 False 를 반환합니다.

protocol.register(name: str, *, backend: object, flags: int = 0) ProtocolChannel

Python backend 객체를 새 논리 채널로 등록하고 ProtocolChannel 핸들을 반환합니다. backend 객체에서 사용 가능한 메서드(아래 백엔드 인터페이스 참조)가 채널의 기능을 결정합니다. 해당 메서드가 구현된 경우 protocol.CHANNEL_FLAG_READ, protocol.CHANNEL_FLAG_WRITE, protocol.CHANNEL_FLAG_LOCKflags 에 자동으로 추가됩니다.

name 은 문자열로 된 채널 이름입니다. 펌웨어의 채널 이름 버퍼 크기에 맞춰 잘립니다. 필수.

backend 는 백엔드 인터페이스를 구현하는 Python 객체입니다. 필수. 일반적으로 키워드(backend=...)로 전달됩니다.

flags 는 추가 채널 플래그 비트입니다(CHANNEL_FLAG_* 상수 참조). 선택 사항이며 기본값은 0 입니다.

채널을 등록할 수 없는 경우(예: 사용 가능한 채널 슬롯 없음) RuntimeError 를 발생시킵니다.

클래스

class protocol.ProtocolChannel

protocol.register 가 반환하는 핸들입니다. 인스턴스는 직접 생성되지 않습니다.

send_event(event: int, wait_ack: bool = False) None

호스트로 채널 이벤트 알림을 보냅니다.

event 는 이벤트 식별자(정수)입니다.

wait_ackTrue 이면 호스트가 이벤트를 확인할 때까지 블록됩니다.

이벤트 전송에 실패하면 RuntimeError 를 발생시킵니다.

백엔드 인터페이스

protocol.register 에 전달되는 백엔드 객체는 다음 메서드의 임의의 부분 집합을 구현할 수 있습니다. 객체에 존재하는 메서드만 C 프로토콜 계층에 연결되며, 누락된 메서드는 해당 기능을 비활성화 상태로 둡니다.

class protocol.backend

protocol.register 에 전달되는 채널 백엔드 객체입니다. 아래 메서드는 Python 백엔드가 구현할 수 있는 선택적 인터페이스를 설명합니다.

init() object

채널이 초기화될 때 한 번 호출됩니다. 성공 시 None 이 아닌 값을 반환하세요. 예외 또는 반환 누락은 오류로 처리됩니다.

poll() bool

호스트가 읽을 수 있는 데이터가 채널에 준비되어 있으면 True 를 반환합니다.

lock() bool

전송을 위해 채널을 획득합니다. 성공 시 True 를 반환합니다.

unlock() bool

전송 후 채널을 해제합니다. 성공 시 True 를 반환합니다.

size() int

채널에서 현재 읽을 수 있는 바이트 수를 반환합니다.

shape() tuple

데이터 모양(예: 이미지 차원)을 설명하는 최대 4개 정수의 튜플을 반환합니다. 프로토콜 계층은 최대 4개의 요소를 소비합니다.

flush() object

보류 중인 데이터를 모두 플러시합니다. 성공 시 None 이 아닌 값을 반환합니다.

read(offset: int, size: int) bytes

버퍼 프로토콜을 지원하는 bytes 와 유사한 객체로 offset 부터 시작하여 최대 size 바이트를 반환합니다.

readp(offset: int, size: int) bytes

read 의 제로 카피 변형입니다. 기반 메모리가 프로토콜 계층에 의해 직접 읽히는 버퍼를 반환합니다. 이 버퍼는 전송이 지속되는 동안 유효한 상태를 유지해야 합니다.

write(offset: int, data: bytearray) int

offset 위치에 data 를 씁니다. data 는 C 버퍼를 직접 참조하는 bytearray 입니다. 쓴 바이트 수를 반환하거나, 기본 성공 시 0 을 반환합니다.

ioctl(cmd: int, length: int, arg: bytearray | None) int

ioctl을 처리합니다. length 가 0이면 argNone 이고, 그렇지 않으면 C 버퍼를 참조하는 bytearray 입니다. 성공 시 0 또는 None 을, 오류 시 음의 정수를 반환합니다.

is_active() bool

전송 채널의 경우, 기반 전송이 현재 연결되어 있으면 True 를 반환합니다.

class protocol.CBORChannel(on_read: Callable | None = None, on_write: Callable | None = None)

(고정된 protocol 패키지가 제공하는) 더 상위 수준의 Python 백엔드로, 이름이 지정된 필드를 SenML 호환 정수 키를 사용해 CBOR로 직렬화합니다. 디스플레이 위젯(label, depth)과 on_read/on_write 콜백을 갖춘 대화형 컨트롤(toggle, slider, select)을 지원합니다.

on_read 는 채널이 호스트용으로 직렬화되기 전에 호출되는 선택적 호출 가능 객체 on_read(channel) 입니다. 필드 값을 갱신하는 데 사용하세요.

on_write 는 호스트가 이름이 지정된 필드에 새 값을 쓸 때 호출되는 선택적 호출 가능 객체 on_write(channel, name, value) 입니다.

add(name: str, type: str, value: Any = None, unit: str | None = None, min: int | float | None = None, max: int | float | None = None, step: int | float | None = None, options: list | None = None, width: int | None = None, height: int | None = None) None

이름이 지정된 필드를 채널에 추가합니다.

name 은 표시 이름입니다. 이 채널 내에서 고유해야 합니다.

type 은 위젯 유형입니다: "label", "toggle", "slider", "select", 또는 "depth".

value 는 초기 값입니다. 기본값은 type 에 따라 달라집니다.

unitlabel/slider 에 대한 단위 문자열입니다(예: "Cel", "%RH").

min 은 최솟값입니다(슬라이더 범위 또는 깊이 범위).

max 는 최댓값입니다(슬라이더 범위 또는 깊이 범위).

step 은 단계 크기입니다(슬라이더).

options 는 옵션 문자열 목록입니다(셀렉트).

width 는 픽셀 너비입니다(깊이).

height 는 픽셀 높이입니다(깊이).

__getitem__(name: str) object

이름이 지정된 필드의 현재 값을 반환합니다. depth 필드의 경우 바이너리 데이터 버퍼가 반환되고, 그 외에는 스칼라 값이 반환됩니다.

__setitem__(name: str, value: Any) None

이름이 지정된 필드의 값을 설정합니다. slider 필드의 경우 (min, max, value) 튜플이 범위와 현재 값을 동시에 갱신합니다. depth 필드의 경우 value 는 바이너리 데이터 버퍼입니다.

poll() bool

백엔드 인터페이스 메서드입니다. 직렬화된 데이터가 호스트에 사용 가능할 때 True 를 반환합니다.

size() int

백엔드 인터페이스 메서드입니다. (설정된 경우) on_read 를 호출하고 직렬화된 버퍼의 크기를 반환합니다.

read(offset: int, size: int) bytes

백엔드 인터페이스 메서드입니다. 직렬화된 버퍼의 슬라이스를 반환합니다.

write(offset: int, data: bytearray) int

백엔드 인터페이스 메서드입니다. CBOR 업데이트 목록을 디코딩하고 일치하는 이름의 필드에 값을 적용하며, 각각에 대해 on_write 를 호출합니다.

상수

채널 플래그 비트(비트 단위로 결합됨. flags 를 통해 protocol.register 에 전달되거나 백엔드의 메서드에 따라 자동으로 설정됨).

protocol.CHANNEL_FLAG_READ: int

채널이 읽기를 지원합니다.

protocol.CHANNEL_FLAG_WRITE: int

채널이 쓰기를 지원합니다.

protocol.CHANNEL_FLAG_LOCK: int

채널이 lock/unlock 을 구현합니다.

protocol.CHANNEL_FLAG_PHYSICAL: int

채널이 (논리 데이터 채널이 아닌) 물리적 전송을 나타냅니다.

내장 채널 식별자입니다.

protocol.CHANNEL_ID_TRANSPORT: int

활성 전송에 예약된 채널 ID입니다.

protocol.CHANNEL_ID_STDIN: int

내장 stdin 채널의 채널 ID입니다.

protocol.CHANNEL_ID_STDOUT: int

내장 stdout 채널의 채널 ID입니다.

protocol.CHANNEL_ID_STREAM: int

내장 stream 채널의 채널 ID입니다.

protocol.CHANNEL_ID_PROFILE: int

내장 프로파일러 채널의 채널 ID입니다(펌웨어가 프로파일러를 활성화한 상태로 빌드된 경우에만 존재).