protocol — Kanały protokołu OpenMV

Moduł protocol udostępnia w Pythonie protokół hosta OpenMV. Pozwala on na zainicjalizowanie i skonfigurowanie stosu protokołu po stronie oprogramowania układowego oraz umożliwia kodowi użytkownika rejestrowanie niestandardowych kanałów logicznych opartych na obiekcie Pythona implementującym interfejs kanału (read, write, size, poll itd.). To z nim komunikują się towarzyszące narzędzia desktopowe, gdy strumieniują dane obrazu lub udostępniają interaktywne widżety podłączonej kamerze.

Przykłady

Strumieniowanie obrazu RGB565 do narzędzia hosta przy użyciu niestandardowego backendu implementującego surowy interfejs kanału (backend.size(), backend.shape(), backend.poll(), backend.read()):

import csi
import protocol

csi0 = csi.CSI()
csi0.reset()
csi0.pixformat(csi.RGB565)
csi0.framesize(csi.HD)

img = csi0.snapshot()
img_mv = memoryview(img.bytearray())
frame_ready = True


class FrameChannel:
    def size(self):
        return len(img_mv)

    def shape(self):
        return (img.height(), img.width(), len(img_mv))

    def poll(self):
        return frame_ready

    def read(self, offset, size):
        global frame_ready
        end = offset + size
        chunk = img_mv[offset:end]
        if end >= len(img_mv):
            frame_ready = False
        return chunk


protocol.register(name="frame", backend=FrameChannel())

while True:
    if not frame_ready:
        img = csi0.snapshot()
        img_mv = memoryview(img.bytearray())
        frame_ready = True

Odpowiadający mu skrypt po stronie hosta, używający pakietu Pythona openmv (pip install openmv) do połączenia, przesłania skryptu na kamerę i pobierania kolejnych ramek:

import cv2
import numpy as np
from openmv.camera import Camera

# The on-cam script above, stored as a string (or read from a file).
SCRIPT = open("frame_streamer_on_cam.py").read()

with Camera("/dev/ttyACM0", baudrate=921600) as cam:
    cam.stop()
    cam.exec(SCRIPT)

    while True:
        status = cam.read_status()
        if not cam.has_channel("frame") or not status.get("frame"):
            continue

        h, w, size = cam._channel_shape(cam.get_channel(name="frame"))
        if cam.channel_size("frame") < size:
            continue

        data = cam.channel_read("frame", size)
        rgb565 = np.frombuffer(data, dtype="<u2").reshape(h, w)

        # Unpack RGB565 to an HxWx3 uint8 RGB image.
        r = ((rgb565 >> 11) & 0x1F) << 3
        g = ((rgb565 >>  5) & 0x3F) << 2
        b = ( rgb565        & 0x1F) << 3
        frame = np.dstack([r, g, b]).astype(np.uint8)

        # Display with OpenCV (cv2 expects BGR, not RGB).
        cv2.imshow("OpenMV", cv2.cvtColor(frame, cv2.COLOR_RGB2BGR))
        if cv2.waitKey(1) == ord("q"):
            break

cv2.destroyAllWindows()

Zastąp /dev/ttyACM0 portem szeregowym kamery (np. COM3 w systemie Windows). Konstruktor openmv.camera.Camera akceptuje te same parametry protokołu co init (crc / seq / ack / events / max_payload / max_retry / timeout), gdy stos po stronie kamery został odpowiednio przekonfigurowany.

Funkcje

protocol.init(crc: bool = True, seq: bool = True, ack: bool = True, events: bool = True, max_payload: int = ..., rtx_retries: int = 3, rtx_timeout_ms: int = 500, lock_interval_ms: int = 10, poll_ms: int = 0) None

Inicjalizuje (lub ponownie konfiguruje) stos protokołu i rejestruje domyślne logiczne kanały danych (stdin, stdout, stream oraz, jeśli skompilowano, profile). Zgłasza RuntimeError, jeśli inicjalizacja się nie powiedzie. Oprogramowanie układowe uruchamia się z domyślnym, już działającym stosem protokołu USB, więc wywołanie tej funkcji jest potrzebne tylko do zmiany transportu lub nadpisania domyślnych parametrów ramkowania.

crc włącza walidację CRC ramek protokołu.

seq włącza śledzenie numerów sekwencyjnych.

ack włącza potwierdzenia poszczególnych ramek.

events włącza powiadomienia o zdarzeniach kanału.

max_payload to maksymalny rozmiar ładunku w bajtach. Jeśli zostanie pominięty, używana jest poniższa wartość domyślna dla danej kamery; jest ona wyprowadzana z rozmiaru bufora protokołu danej płytki jako buffer - 10 (header) - 4 (CRC).

Kamera

Rozmiar bufora

Maksymalny ładunek

OpenMV Cam M4 (OPENMV2)

512

498

OpenMV Cam M7 (OPENMV3)

512

498

OpenMV Cam H7 (OPENMV4)

512

498

OpenMV Cam H7 Plus (OPENMV4P)

4096

4082

OpenMV Pure Thermal (OPENMVPT)

4096

4082

OpenMV Cam RT1062 (OPENMV_RT1060)

4096

4082

OpenMV Cam N6 (OPENMV_N6)

8192

8178

OpenMV AE3 (OPENMV_AE3)

8192

8178

Arduino Portenta H7 (ARDUINO_PORTENTA_H7)

4096

4082

Arduino Giga (ARDUINO_GIGA)

4096

4082

Arduino Nicla Vision (ARDUINO_NICLA_VISION)

4096

4082

rtx_retries to liczba prób retransmisji. Domyślnie 3.

rtx_timeout_ms to limit czasu retransmisji w milisekundach (podwajany po każdym przekroczeniu limitu). Domyślnie 500.

lock_interval_ms to minimalny interwał blokady w milisekundach. Domyślnie 10.

poll_ms to interwał odpytywania w milisekundach. 0 (wartość domyślna) wyłącza odpytywanie za pomocą timera.

protocol.is_active() bool

Zwraca True, jeśli host jest aktualnie połączony, a stos protokołu jest aktywny, w przeciwnym razie False.

protocol.register(name: str, *, backend: object, flags: int = 0) ProtocolChannel

Rejestruje obiekt Pythona backend jako nowy kanał logiczny i zwraca uchwyt ProtocolChannel. Dostępne metody obiektu backend (zobacz Interfejs backendu poniżej) określają możliwości kanału; flagi protocol.CHANNEL_FLAG_READ, protocol.CHANNEL_FLAG_WRITE i protocol.CHANNEL_FLAG_LOCK są dodawane do flags automatycznie, gdy odpowiednie metody są zaimplementowane.

name to nazwa kanału jako ciąg znaków. Przycinana do rozmiaru bufora nazwy kanału w oprogramowaniu układowym. Wymagane.

backend to obiekt Pythona implementujący interfejs backendu. Wymagane. Zazwyczaj przekazywany jako argument słowa kluczowego (backend=...).

flags to dodatkowe bity flag kanału (zobacz stałe CHANNEL_FLAG_*). Opcjonalne; domyślnie 0.

Zgłasza RuntimeError, jeśli kanału nie można zarejestrować (np. brak wolnych gniazd kanałów).

Klasy

class protocol.ProtocolChannel

Uchwyt zwracany przez protocol.register. Instancje nie są tworzone bezpośrednio.

send_event(event: int, wait_ack: bool = False) None

Wysyła do hosta powiadomienie o zdarzeniu kanału.

event to identyfikator zdarzenia (liczba całkowita).

wait_ack jeśli True, blokuje do momentu potwierdzenia zdarzenia przez hosta.

Zgłasza RuntimeError, jeśli wysłanie zdarzenia się nie powiedzie.

Interfejs backendu

Obiekt backendu przekazywany do protocol.register może implementować dowolny podzbiór poniższych metod. Tylko metody obecne w obiekcie są podłączane do warstwy protokołu w C; brakujące metody pozostawiają odpowiednią funkcjonalność wyłączoną.

class protocol.backend

Obiekt backendu kanału przekazywany do protocol.register. Poniższe metody opisują opcjonalny interfejs, który może zaimplementować backend w Pythonie.

init() object

Wywoływana jednorazowo podczas inicjalizacji kanału. Zwraca dowolną wartość różną od None w przypadku powodzenia; wyjątek lub brak zwracanej wartości jest traktowany jako błąd.

poll() bool

Zwraca True, jeśli kanał ma dane gotowe do odczytu przez hosta.

lock() bool

Przejmuje kanał na potrzeby transferu. Zwraca True w przypadku powodzenia.

unlock() bool

Zwalnia kanał po transferze. Zwraca True w przypadku powodzenia.

size() int

Zwraca liczbę bajtów aktualnie możliwych do odczytania z kanału.

shape() tuple

Zwraca krotkę składającą się z maksymalnie czterech liczb całkowitych opisujących kształt danych (np. wymiary obrazu). Warstwa protokołu zużywa maksymalnie cztery elementy.

flush() object

Wypróżnia wszelkie oczekujące dane. Zwraca dowolną wartość różną od None w przypadku powodzenia.

read(offset: int, size: int) bytes

Zwraca maksymalnie size bajtów począwszy od pozycji offset jako obiekt typu bytes obsługujący protokół bufora.

readp(offset: int, size: int) bytes

Wariant read bez kopiowania (zero-copy). Zwraca bufor, którego pamięć jest odczytywana bezpośrednio przez warstwę protokołu; bufor musi pozostać ważny przez cały czas trwania transferu.

write(offset: int, data: bytearray) int

Zapisuje data na pozycji offset. data to bytearray odwołujący się bezpośrednio do bufora C. Zwraca liczbę zapisanych bajtów lub 0 w przypadku domyślnego powodzenia.

ioctl(cmd: int, length: int, arg: bytearray | None) int

Obsługuje ioctl. arg to None, jeśli length wynosi zero, w przeciwnym razie bytearray odwołujący się do bufora C. Zwraca 0 lub None w przypadku powodzenia albo ujemną liczbę całkowitą w przypadku błędu.

is_active() bool

Dla kanałów transportowych zwraca True, jeśli leżący u podstaw transport jest aktualnie połączony.

class protocol.CBORChannel(on_read: Callable | None = None, on_write: Callable | None = None)

Wyższego poziomu backend w Pythonie (dostarczany przez zamrożony pakiet protocol), który serializuje nazwane pola do CBOR przy użyciu kluczy całkowitych zgodnych z SenML. Obsługuje widżety wyświetlania (label, depth) oraz elementy interaktywne (toggle, slider, select) z wywołaniami zwrotnymi on_read/on_write.

on_read to opcjonalny obiekt wywoływalny on_read(channel) wywoływany przed serializacją kanału dla hosta. Użyj go do odświeżenia wartości pól.

on_write to opcjonalny obiekt wywoływalny on_write(channel, name, value) wywoływany, gdy host zapisuje nową wartość dla nazwanego pola.

add(name: str, type: str, value: Any = None, unit: str | None = None, min: int | float | None = None, max: int | float | None = None, step: int | float | None = None, options: list | None = None, width: int | None = None, height: int | None = None) None

Dodaje nazwane pole do kanału.

name to nazwa wyświetlana; musi być unikalna w obrębie tego kanału.

type to typ widżetu: "label", "toggle", "slider", "select" lub "depth".

value to wartość początkowa. Domyślna zależy od type.

unit to ciąg jednostki dla label/slider (np. "Cel", "%RH").

min to wartość minimalna (zakres suwaka lub zakres głębi).

max to wartość maksymalna (zakres suwaka lub zakres głębi).

step to wielkość kroku (suwak).

options to lista ciągów opcji (select).

width to szerokość w pikselach (depth).

height to wysokość w pikselach (depth).

__getitem__(name: str) object

Zwraca bieżącą wartość nazwanego pola. Dla pól depth zwracany jest binarny bufor danych, w przeciwnym razie wartość skalarna.

__setitem__(name: str, value: Any) None

Ustawia wartość nazwanego pola. Dla pól slider krotka (min, max, value) jednocześnie aktualizuje zakres i bieżącą wartość. Dla pól depth value to binarny bufor danych.

poll() bool

Metoda interfejsu backendu. Zwraca True, gdy zserializowane dane są dostępne dla hosta.

size() int

Metoda interfejsu backendu. Wywołuje on_read (jeśli ustawione) i zwraca rozmiar zserializowanego bufora.

read(offset: int, size: int) bytes

Metoda interfejsu backendu. Zwraca fragment zserializowanego bufora.

write(offset: int, data: bytearray) int

Metoda interfejsu backendu. Dekoduje listę aktualizacji CBOR i stosuje wartości do pasujących nazwanych pól, wywołując on_write dla każdego z nich.

Stałe

Bity flag kanału (łączone bitowo; przekazywane do protocol.register poprzez flags lub ustawiane automatycznie na podstawie metod backendu).

protocol.CHANNEL_FLAG_READ: int

Kanał obsługuje odczyty.

protocol.CHANNEL_FLAG_WRITE: int

Kanał obsługuje zapisy.

protocol.CHANNEL_FLAG_LOCK: int

Kanał implementuje lock/unlock.

protocol.CHANNEL_FLAG_PHYSICAL: int

Kanał reprezentuje fizyczny transport (w przeciwieństwie do logicznego kanału danych).

Wbudowane identyfikatory kanałów.

protocol.CHANNEL_ID_TRANSPORT: int

Zarezerwowany identyfikator kanału dla aktywnego transportu.

protocol.CHANNEL_ID_STDIN: int

Identyfikator wbudowanego kanału stdin.

protocol.CHANNEL_ID_STDOUT: int

Identyfikator wbudowanego kanału stdout.

protocol.CHANNEL_ID_STREAM: int

Identyfikator wbudowanego kanału stream.

protocol.CHANNEL_ID_PROFILE: int

Identyfikator wbudowanego kanału profilera (obecny tylko wtedy, gdy oprogramowanie układowe zostało zbudowane z włączonym profilerem).