protocol — OpenMV 프로토콜 채널¶
protocol 모듈은 OpenMV 호스트 프로토콜을 Python에 노출합니다. 펌웨어 측 프로토콜 스택을 초기화하고 구성할 수 있게 하며, 채널 인터페이스(read, write, size, poll 등)를 구현하는 Python 객체를 기반으로 사용자 코드가 사용자 정의 논리 채널을 등록할 수 있도록 합니다. 데스크톱 컴패니언 도구가 이미지 데이터를 스트리밍하거나 연결된 카메라에 대화형 위젯을 노출할 때 통신하는 대상이 바로 이것입니다.
예제¶
원시 채널 인터페이스(backend.size(), backend.shape(), backend.poll(), backend.read())를 구현하는 사용자 정의 백엔드를 사용해 RGB565 이미지를 호스트 도구로 스트리밍합니다:
import csi
import protocol
csi0 = csi.CSI()
csi0.reset()
csi0.pixformat(csi.RGB565)
csi0.framesize(csi.HD)
img = csi0.snapshot()
img_mv = memoryview(img.bytearray())
frame_ready = True
class FrameChannel:
def size(self):
return len(img_mv)
def shape(self):
return (img.height(), img.width(), len(img_mv))
def poll(self):
return frame_ready
def read(self, offset, size):
global frame_ready
end = offset + size
chunk = img_mv[offset:end]
if end >= len(img_mv):
frame_ready = False
return chunk
protocol.register(name="frame", backend=FrameChannel())
while True:
if not frame_ready:
img = csi0.snapshot()
img_mv = memoryview(img.bytearray())
frame_ready = True
이에 대응하는 호스트 측 스크립트로, openmv Python 패키지(pip install openmv)를 사용하여 연결하고, 카메라 측 스크립트를 푸시하며, 각 프레임을 가져옵니다:
import cv2
import numpy as np
from openmv.camera import Camera
# The on-cam script above, stored as a string (or read from a file).
SCRIPT = open("frame_streamer_on_cam.py").read()
with Camera("/dev/ttyACM0", baudrate=921600) as cam:
cam.stop()
cam.exec(SCRIPT)
while True:
status = cam.read_status()
if not cam.has_channel("frame") or not status.get("frame"):
continue
h, w, size = cam._channel_shape(cam.get_channel(name="frame"))
if cam.channel_size("frame") < size:
continue
data = cam.channel_read("frame", size)
rgb565 = np.frombuffer(data, dtype="<u2").reshape(h, w)
# Unpack RGB565 to an HxWx3 uint8 RGB image.
r = ((rgb565 >> 11) & 0x1F) << 3
g = ((rgb565 >> 5) & 0x3F) << 2
b = ( rgb565 & 0x1F) << 3
frame = np.dstack([r, g, b]).astype(np.uint8)
# Display with OpenCV (cv2 expects BGR, not RGB).
cv2.imshow("OpenMV", cv2.cvtColor(frame, cv2.COLOR_RGB2BGR))
if cv2.waitKey(1) == ord("q"):
break
cv2.destroyAllWindows()
/dev/ttyACM0 을 카메라의 시리얼 포트(예: Windows에서는 COM3)로 바꾸세요. openmv.camera.Camera 생성자는 카메라 측 스택이 일치하도록 재구성된 경우 init 과 동일한 프로토콜 매개변수(crc / seq / ack / events / max_payload / max_retry / timeout)를 받습니다.
함수¶
- protocol.init(crc: bool = True, seq: bool = True, ack: bool = True, events: bool = True, max_payload: int = ..., rtx_retries: int = 3, rtx_timeout_ms: int = 500, lock_interval_ms: int = 10, poll_ms: int = 0) None¶
프로토콜 스택을 초기화(또는 재구성)하고 기본 논리 데이터 채널(
stdin,stdout,stream및 컴파일된 경우profile)을 등록합니다. 초기화에 실패하면RuntimeError를 발생시킵니다. 펌웨어는 기본 USB 프로토콜 스택이 이미 실행된 상태로 부팅되므로, 전송 방식을 변경하거나 기본 프레이밍 매개변수를 재정의할 때만 이 함수를 호출하면 됩니다.crc는 프로토콜 프레임에 대한 CRC 검증을 활성화합니다.seq는 시퀀스 번호 추적을 활성화합니다.ack는 프레임별 확인 응답을 활성화합니다.events는 채널 이벤트 알림을 활성화합니다.max_payload는 최대 페이로드 크기(바이트)입니다. 생략하면 아래의 카메라별 기본값이 사용됩니다. 이 값은 각 보드의 프로토콜 버퍼 크기에서buffer - 10 (header) - 4 (CRC)로 산출됩니다.카메라
버퍼 크기
최대 페이로드
OpenMV Cam M4 (
OPENMV2)512
498
OpenMV Cam M7 (
OPENMV3)512
498
OpenMV Cam H7 (
OPENMV4)512
498
OpenMV Cam H7 Plus (
OPENMV4P)4096
4082
OpenMV Pure Thermal (
OPENMVPT)4096
4082
OpenMV Cam RT1062 (
OPENMV_RT1060)4096
4082
OpenMV Cam N6 (
OPENMV_N6)8192
8178
OpenMV AE3 (
OPENMV_AE3)8192
8178
Arduino Portenta H7 (
ARDUINO_PORTENTA_H7)4096
4082
Arduino Giga (
ARDUINO_GIGA)4096
4082
Arduino Nicla Vision (
ARDUINO_NICLA_VISION)4096
4082
rtx_retries는 재전송 시도 횟수입니다. 기본값3.rtx_timeout_ms는 밀리초 단위의 재전송 타임아웃입니다(타임아웃마다 두 배로 증가). 기본값500.lock_interval_ms는 밀리초 단위의 최소 잠금 간격입니다. 기본값10.poll_ms는 밀리초 단위의 폴링 간격입니다.0(기본값)은 타이머 폴링을 비활성화합니다.
- protocol.register(name: str, *, backend: object, flags: int = 0) ProtocolChannel¶
Python
backend객체를 새 논리 채널로 등록하고ProtocolChannel핸들을 반환합니다.backend객체에서 사용 가능한 메서드(아래 백엔드 인터페이스 참조)가 채널의 기능을 결정합니다. 해당 메서드가 구현된 경우protocol.CHANNEL_FLAG_READ,protocol.CHANNEL_FLAG_WRITE,protocol.CHANNEL_FLAG_LOCK가flags에 자동으로 추가됩니다.name은 문자열로 된 채널 이름입니다. 펌웨어의 채널 이름 버퍼 크기에 맞춰 잘립니다. 필수.backend는 백엔드 인터페이스를 구현하는 Python 객체입니다. 필수. 일반적으로 키워드(backend=...)로 전달됩니다.flags는 추가 채널 플래그 비트입니다(CHANNEL_FLAG_*상수 참조). 선택 사항이며 기본값은0입니다.채널을 등록할 수 없는 경우(예: 사용 가능한 채널 슬롯 없음)
RuntimeError를 발생시킵니다.
클래스¶
백엔드 인터페이스¶
protocol.register 에 전달되는 백엔드 객체는 다음 메서드의 임의의 부분 집합을 구현할 수 있습니다. 객체에 존재하는 메서드만 C 프로토콜 계층에 연결되며, 누락된 메서드는 해당 기능을 비활성화 상태로 둡니다.
- class protocol.backend¶
protocol.register에 전달되는 채널 백엔드 객체입니다. 아래 메서드는 Python 백엔드가 구현할 수 있는 선택적 인터페이스를 설명합니다.- read(offset: int, size: int) bytes¶
버퍼 프로토콜을 지원하는
bytes와 유사한 객체로offset부터 시작하여 최대size바이트를 반환합니다.
- readp(offset: int, size: int) bytes¶
read의 제로 카피 변형입니다. 기반 메모리가 프로토콜 계층에 의해 직접 읽히는 버퍼를 반환합니다. 이 버퍼는 전송이 지속되는 동안 유효한 상태를 유지해야 합니다.
- write(offset: int, data: bytearray) int¶
offset위치에data를 씁니다.data는 C 버퍼를 직접 참조하는bytearray입니다. 쓴 바이트 수를 반환하거나, 기본 성공 시0을 반환합니다.
- class protocol.CBORChannel(on_read: Callable | None = None, on_write: Callable | None = None)¶
(고정된
protocol패키지가 제공하는) 더 상위 수준의 Python 백엔드로, 이름이 지정된 필드를 SenML 호환 정수 키를 사용해 CBOR로 직렬화합니다. 디스플레이 위젯(label,depth)과on_read/on_write콜백을 갖춘 대화형 컨트롤(toggle,slider,select)을 지원합니다.on_read는 채널이 호스트용으로 직렬화되기 전에 호출되는 선택적 호출 가능 객체on_read(channel)입니다. 필드 값을 갱신하는 데 사용하세요.on_write는 호스트가 이름이 지정된 필드에 새 값을 쓸 때 호출되는 선택적 호출 가능 객체on_write(channel, name, value)입니다.- add(name: str, type: str, value: Any = None, unit: str | None = None, min: int | float | None = None, max: int | float | None = None, step: int | float | None = None, options: list | None = None, width: int | None = None, height: int | None = None) None¶
이름이 지정된 필드를 채널에 추가합니다.
name은 표시 이름입니다. 이 채널 내에서 고유해야 합니다.type은 위젯 유형입니다:"label","toggle","slider","select", 또는"depth".value는 초기 값입니다. 기본값은type에 따라 달라집니다.unit은label/slider에 대한 단위 문자열입니다(예:"Cel","%RH").min은 최솟값입니다(슬라이더 범위 또는 깊이 범위).max는 최댓값입니다(슬라이더 범위 또는 깊이 범위).step은 단계 크기입니다(슬라이더).options는 옵션 문자열 목록입니다(셀렉트).width는 픽셀 너비입니다(깊이).height는 픽셀 높이입니다(깊이).
- __getitem__(name: str) object¶
이름이 지정된 필드의 현재 값을 반환합니다.
depth필드의 경우 바이너리 데이터 버퍼가 반환되고, 그 외에는 스칼라 값이 반환됩니다.
상수¶
채널 플래그 비트(비트 단위로 결합됨. flags 를 통해 protocol.register 에 전달되거나 백엔드의 메서드에 따라 자동으로 설정됨).
내장 채널 식별자입니다.