protocol — OpenMV protokolni kanali

Modul protocol izlaže OpenMV host protokol Pythonu. Omogućuje inicijalizaciju i konfiguraciju protokolnog stoga na strani ugrađenog programa te dopušta korisničkom kodu da registrira prilagođene logičke kanale poduprte Python objektom koji implementira sučelje kanala (read, write, size, poll itd.). To je ono s čime razgovaraju popratni alati na računalu kada prenose podatke slike ili izlažu interaktivne widgete povezanoj kameri.

Primjeri

Prijenos RGB565 slike host alatu pomoću prilagođenog backenda koji implementira sirovo sučelje kanala (backend.size(), backend.shape(), backend.poll(), backend.read()):

import csi
import protocol

csi0 = csi.CSI()
csi0.reset()
csi0.pixformat(csi.RGB565)
csi0.framesize(csi.HD)

img = csi0.snapshot()
img_mv = memoryview(img.bytearray())
frame_ready = True


class FrameChannel:
    def size(self):
        return len(img_mv)

    def shape(self):
        return (img.height(), img.width(), len(img_mv))

    def poll(self):
        return frame_ready

    def read(self, offset, size):
        global frame_ready
        end = offset + size
        chunk = img_mv[offset:end]
        if end >= len(img_mv):
            frame_ready = False
        return chunk


protocol.register(name="frame", backend=FrameChannel())

while True:
    if not frame_ready:
        img = csi0.snapshot()
        img_mv = memoryview(img.bytearray())
        frame_ready = True

Odgovarajuća skripta na strani hosta, koja koristi openmv Python paket (pip install openmv) za povezivanje, slanje skripte na kameru i dohvaćanje svake sličice:

import cv2
import numpy as np
from openmv.camera import Camera

# The on-cam script above, stored as a string (or read from a file).
SCRIPT = open("frame_streamer_on_cam.py").read()

with Camera("/dev/ttyACM0", baudrate=921600) as cam:
    cam.stop()
    cam.exec(SCRIPT)

    while True:
        status = cam.read_status()
        if not cam.has_channel("frame") or not status.get("frame"):
            continue

        h, w, size = cam._channel_shape(cam.get_channel(name="frame"))
        if cam.channel_size("frame") < size:
            continue

        data = cam.channel_read("frame", size)
        rgb565 = np.frombuffer(data, dtype="<u2").reshape(h, w)

        # Unpack RGB565 to an HxWx3 uint8 RGB image.
        r = ((rgb565 >> 11) & 0x1F) << 3
        g = ((rgb565 >>  5) & 0x3F) << 2
        b = ( rgb565        & 0x1F) << 3
        frame = np.dstack([r, g, b]).astype(np.uint8)

        # Display with OpenCV (cv2 expects BGR, not RGB).
        cv2.imshow("OpenMV", cv2.cvtColor(frame, cv2.COLOR_RGB2BGR))
        if cv2.waitKey(1) == ord("q"):
            break

cv2.destroyAllWindows()

Zamijenite /dev/ttyACM0 serijskim priključkom kamere (npr. COM3 na Windowsu). Konstruktor openmv.camera.Camera prihvaća iste parametre protokola kao i init (crc / seq / ack / events / max_payload / max_retry / timeout) kada je stog na strani kamere ponovno konfiguriran tako da im odgovara.

Funkcije

protocol.init(crc: bool = True, seq: bool = True, ack: bool = True, events: bool = True, max_payload: int = ..., rtx_retries: int = 3, rtx_timeout_ms: int = 500, lock_interval_ms: int = 10, poll_ms: int = 0) None

Inicijalizira (ili ponovno konfigurira) protokolni stog i registrira zadane logičke podatkovne kanale (stdin, stdout, stream te, ako je ugrađen, profile). Podiže RuntimeError ako inicijalizacija ne uspije. Ugrađeni program se pokreće sa zadanim USB protokolnim stogom koji već radi, pa je pozivanje ovoga potrebno samo za promjenu transporta ili nadjačavanje zadanih parametara uokvirivanja.

crc omogućuje CRC provjeru na okvirima protokola.

seq omogućuje praćenje rednih brojeva.

ack omogućuje potvrde po okviru.

events omogućuje obavijesti o događajima kanala.

max_payload je maksimalna veličina korisnog tereta u bajtovima. Ako se izostavi, koristi se zadana vrijednost po kameri navedena u nastavku; izvodi se iz veličine međuspremnika protokola svake pločice kao buffer - 10 (header) - 4 (CRC).

Kamera

Veličina međuspremnika

Maks. korisni teret

OpenMV Cam M4 (OPENMV2)

512

498

OpenMV Cam M7 (OPENMV3)

512

498

OpenMV Cam H7 (OPENMV4)

512

498

OpenMV Cam H7 Plus (OPENMV4P)

4096

4082

OpenMV Pure Thermal (OPENMVPT)

4096

4082

OpenMV Cam RT1062 (OPENMV_RT1060)

4096

4082

OpenMV Cam N6 (OPENMV_N6)

8192

8178

OpenMV AE3 (OPENMV_AE3)

8192

8178

Arduino Portenta H7 (ARDUINO_PORTENTA_H7)

4096

4082

Arduino Giga (ARDUINO_GIGA)

4096

4082

Arduino Nicla Vision (ARDUINO_NICLA_VISION)

4096

4082

rtx_retries je broj pokušaja ponovnog prijenosa. Zadano 3.

rtx_timeout_ms je vremensko ograničenje ponovnog prijenosa u milisekundama (udvostručuje se nakon svakog isteka). Zadano 500.

lock_interval_ms je minimalni interval zaključavanja u milisekundama. Zadano 10.

poll_ms je interval prozivanja u milisekundama. 0 (zadano) onemogućuje prozivanje pomoću mjerača vremena.

protocol.is_active() bool

Vraća True ako je host trenutno povezan i protokolni stog aktivan, inače False.

protocol.register(name: str, *, backend: object, flags: int = 0) ProtocolChannel

Registrira Python backend objekt kao novi logički kanal i vraća ProtocolChannel rukovatelj. Dostupne metode objekta backend (vidi Backend sučelje u nastavku) određuju sposobnosti kanala; protocol.CHANNEL_FLAG_READ, protocol.CHANNEL_FLAG_WRITE i protocol.CHANNEL_FLAG_LOCK automatski se dodaju u flags kada su odgovarajuće metode implementirane.

name je naziv kanala kao niz znakova. Skraćuje se na veličinu međuspremnika naziva kanala u ugrađenom programu. Obavezno.

backend je Python objekt koji implementira backend sučelje. Obavezno. Obično se prosljeđuje ključnom riječi (backend=...).

flags su dodatni bitovi zastavica kanala (vidi konstante CHANNEL_FLAG_*). Neobavezno; zadano je 0.

Podiže RuntimeError ako se kanal ne može registrirati (npr. nema slobodnih mjesta za kanale).

Klase

class protocol.ProtocolChannel

Rukovatelj koji vraća protocol.register. Instance se ne konstruiraju izravno.

send_event(event: int, wait_ack: bool = False) None

Šalje obavijest o događaju kanala hostu.

event je identifikator događaja (cijeli broj).

wait_ack ako je True blokira dok host ne potvrdi događaj.

Podiže RuntimeError ako slanje događaja ne uspije.

Backend sučelje

Backend objekt proslijeđen funkciji protocol.register može implementirati bilo koji podskup sljedećih metoda. Samo su metode prisutne na objektu povezane s C protokolnim slojem; nedostajuće metode ostavljaju odgovarajuću sposobnost onemogućenom.

class protocol.backend

Backend objekt kanala proslijeđen funkciji protocol.register. Metode u nastavku opisuju neobavezno sučelje koje Python backend može implementirati.

init() object

Poziva se jednom kada se kanal inicijalizira. Vraća bilo koju vrijednost različitu od None u slučaju uspjeha; iznimka ili nedostajuća povratna vrijednost tretira se kao pogreška.

poll() bool

Vraća True ako kanal ima podatke spremne za čitanje od strane hosta.

lock() bool

Zauzima kanal za prijenos. Vraća True u slučaju uspjeha.

unlock() bool

Otpušta kanal nakon prijenosa. Vraća True u slučaju uspjeha.

size() int

Vraća broj bajtova koji se trenutno mogu pročitati s kanala.

shape() tuple

Vraća n-torku do četiri cijela broja koji opisuju oblik podataka (npr. dimenzije slike). Protokolni sloj koristi do četiri elementa.

flush() object

Prazni sve podatke na čekanju. Vraća bilo koju vrijednost različitu od None u slučaju uspjeha.

read(offset: int, size: int) bytes

Vraća do size bajtova počevši od offset kao bytes-sličan objekt koji podržava protokol međuspremnika.

readp(offset: int, size: int) bytes

Varijanta read bez kopiranja. Vraća međuspremnik čiju temeljnu memoriju protokolni sloj izravno čita; međuspremnik mora ostati valjan tijekom trajanja prijenosa.

write(offset: int, data: bytearray) int

Zapisuje data na offset. data je bytearray koji izravno referencira C međuspremnik. Vraća broj zapisanih bajtova ili 0 pri zadanom uspjehu.

ioctl(cmd: int, length: int, arg: bytearray | None) int

Obrađuje ioctl. arg je None ako je length nula, inače bytearray koji referencira C međuspremnik. Vraća 0 ili None u slučaju uspjeha ili negativan cijeli broj u slučaju pogreške.

is_active() bool

Za transportne kanale, vraća True ako je temeljni transport trenutno povezan.

class protocol.CBORChannel(on_read: Callable | None = None, on_write: Callable | None = None)

Python backend više razine (koji pruža ugrađeni protocol paket) koji serijalizira imenovana polja u CBOR pomoću cjelobrojnih ključeva kompatibilnih sa SenML-om. Podržava prikazne widgete (label, depth) i interaktivne kontrole (toggle, slider, select) s on_read/on_write povratnim pozivima.

on_read je neobavezni pozivni objekt on_read(channel) koji se poziva prije serijalizacije kanala za host. Koristite ga za osvježavanje vrijednosti polja.

on_write je neobavezni pozivni objekt on_write(channel, name, value) koji se poziva kada host upiše novu vrijednost za imenovano polje.

add(name: str, type: str, value: Any = None, unit: str | None = None, min: int | float | None = None, max: int | float | None = None, step: int | float | None = None, options: list | None = None, width: int | None = None, height: int | None = None) None

Dodaje imenovano polje u kanal.

name je prikazni naziv; mora biti jedinstven unutar ovog kanala.

type je tip widgeta: "label", "toggle", "slider", "select" ili "depth".

value je početna vrijednost. Zadana vrijednost ovisi o type.

unit je niz jedinice za label/slider (npr. "Cel", "%RH").

min je minimalna vrijednost (raspon klizača ili raspon dubine).

max je maksimalna vrijednost (raspon klizača ili raspon dubine).

step je veličina koraka (klizač).

options je popis nizova opcija (select).

width je širina u pikselima (depth).

height je visina u pikselima (depth).

__getitem__(name: str) object

Vraća trenutnu vrijednost imenovanog polja. Za depth polja vraća se binarni podatkovni međuspremnik, inače skalarna vrijednost.

__setitem__(name: str, value: Any) None

Postavlja vrijednost imenovanog polja. Za slider polja, n-torka (min, max, value) istovremeno ažurira raspon i trenutnu vrijednost. Za depth polja, value je binarni podatkovni međuspremnik.

poll() bool

Metoda backend sučelja. Vraća True kada su serijalizirani podaci dostupni hostu.

size() int

Metoda backend sučelja. Poziva on_read (ako je postavljen) i vraća veličinu serijaliziranog međuspremnika.

read(offset: int, size: int) bytes

Metoda backend sučelja. Vraća isječak serijaliziranog međuspremnika.

write(offset: int, data: bytearray) int

Metoda backend sučelja. Dekodira CBOR popis ažuriranja i primjenjuje vrijednosti na odgovarajuća imenovana polja, pozivajući on_write za svako.

Konstante

Bitovi zastavica kanala (kombiniraju se bitovno; prosljeđuju se funkciji protocol.register putem flags ili se postavljaju automatski na temelju metoda backenda).

protocol.CHANNEL_FLAG_READ: int

Kanal podržava čitanja.

protocol.CHANNEL_FLAG_WRITE: int

Kanal podržava zapisivanja.

protocol.CHANNEL_FLAG_LOCK: int

Kanal implementira lock/unlock.

protocol.CHANNEL_FLAG_PHYSICAL: int

Kanal predstavlja fizički transport (za razliku od logičkog podatkovnog kanala).

Ugrađeni identifikatori kanala.

protocol.CHANNEL_ID_TRANSPORT: int

Rezervirani ID kanala za aktivni transport.

protocol.CHANNEL_ID_STDIN: int

ID kanala ugrađenog stdin kanala.

protocol.CHANNEL_ID_STDOUT: int

ID kanala ugrađenog stdout kanala.

protocol.CHANNEL_ID_STREAM: int

ID kanala ugrađenog stream kanala.

protocol.CHANNEL_ID_PROFILE: int

ID kanala ugrađenog profiler kanala (prisutan samo kada je ugrađeni program izgrađen s omogućenim profilerom).