protocol — Canale de protocol OpenMV

Modulul protocol expune protocolul gazdă OpenMV către Python. Permite inițializarea și configurarea stivei de protocol din partea firmware-ului și permite codului utilizatorului să înregistreze canale logice personalizate susținute de un obiect Python care implementează interfața canalului (read, write, size, poll etc.). Acesta este punctul cu care comunică instrumentele companion de pe desktop atunci când transmit date de imagine sau expun widget-uri interactive către o cameră conectată.

Exemple

Transmite o imagine RGB565 către un instrument gazdă folosind un backend personalizat care implementează interfața de canal brut (backend.size(), backend.shape(), backend.poll(), backend.read()):

import csi
import protocol

csi0 = csi.CSI()
csi0.reset()
csi0.pixformat(csi.RGB565)
csi0.framesize(csi.HD)

img = csi0.snapshot()
img_mv = memoryview(img.bytearray())
frame_ready = True


class FrameChannel:
    def size(self):
        return len(img_mv)

    def shape(self):
        return (img.height(), img.width(), len(img_mv))

    def poll(self):
        return frame_ready

    def read(self, offset, size):
        global frame_ready
        end = offset + size
        chunk = img_mv[offset:end]
        if end >= len(img_mv):
            frame_ready = False
        return chunk


protocol.register(name="frame", backend=FrameChannel())

while True:
    if not frame_ready:
        img = csi0.snapshot()
        img_mv = memoryview(img.bytearray())
        frame_ready = True

Scriptul corespondent de pe partea gazdei, folosind pachetul Python openmv (pip install openmv) pentru a se conecta, a încărca scriptul de pe cameră și a prelua fiecare cadru:

import cv2
import numpy as np
from openmv.camera import Camera

# The on-cam script above, stored as a string (or read from a file).
SCRIPT = open("frame_streamer_on_cam.py").read()

with Camera("/dev/ttyACM0", baudrate=921600) as cam:
    cam.stop()
    cam.exec(SCRIPT)

    while True:
        status = cam.read_status()
        if not cam.has_channel("frame") or not status.get("frame"):
            continue

        h, w, size = cam._channel_shape(cam.get_channel(name="frame"))
        if cam.channel_size("frame") < size:
            continue

        data = cam.channel_read("frame", size)
        rgb565 = np.frombuffer(data, dtype="<u2").reshape(h, w)

        # Unpack RGB565 to an HxWx3 uint8 RGB image.
        r = ((rgb565 >> 11) & 0x1F) << 3
        g = ((rgb565 >>  5) & 0x3F) << 2
        b = ( rgb565        & 0x1F) << 3
        frame = np.dstack([r, g, b]).astype(np.uint8)

        # Display with OpenCV (cv2 expects BGR, not RGB).
        cv2.imshow("OpenMV", cv2.cvtColor(frame, cv2.COLOR_RGB2BGR))
        if cv2.waitKey(1) == ord("q"):
            break

cv2.destroyAllWindows()

Înlocuiește /dev/ttyACM0 cu portul serial al camerei (de exemplu COM3 pe Windows). Constructorul openmv.camera.Camera acceptă aceiași parametri de protocol ca init (crc / seq / ack / events / max_payload / max_retry / timeout) atunci când stiva de pe partea camerei a fost reconfigurată pentru a se potrivi.

Funcții

protocol.init(crc: bool = True, seq: bool = True, ack: bool = True, events: bool = True, max_payload: int = ..., rtx_retries: int = 3, rtx_timeout_ms: int = 500, lock_interval_ms: int = 10, poll_ms: int = 0) None

Inițializează (sau reconfigurează) stiva de protocol și înregistrează canalele logice de date implicite (stdin, stdout, stream și, dacă este compilat, profile). Ridică RuntimeError dacă inițializarea eșuează. Firmware-ul pornește cu o stivă de protocol USB implicită deja activă, așa că apelarea acestei funcții este necesară doar pentru a schimba transportul sau pentru a suprascrie parametrii de încadrare impliciți.

crc activează validarea CRC pe cadrele de protocol.

seq activează urmărirea numerelor de secvență.

ack activează confirmările pentru fiecare cadru.

events activează notificările de evenimente ale canalelor.

max_payload este dimensiunea maximă a încărcăturii utile în octeți. Dacă este omis, se folosește valoarea implicită per cameră de mai jos; aceasta este derivată din dimensiunea tamponului (buffer) de protocol al fiecărei plăci ca buffer - 10 (header) - 4 (CRC).

Cameră

Dimensiune tampon (buffer)

Încărcătură utilă maximă

OpenMV Cam M4 (OPENMV2)

512

498

OpenMV Cam M7 (OPENMV3)

512

498

OpenMV Cam H7 (OPENMV4)

512

498

OpenMV Cam H7 Plus (OPENMV4P)

4096

4082

OpenMV Pure Thermal (OPENMVPT)

4096

4082

OpenMV Cam RT1062 (OPENMV_RT1060)

4096

4082

OpenMV Cam N6 (OPENMV_N6)

8192

8178

OpenMV AE3 (OPENMV_AE3)

8192

8178

Arduino Portenta H7 (ARDUINO_PORTENTA_H7)

4096

4082

Arduino Giga (ARDUINO_GIGA)

4096

4082

Arduino Nicla Vision (ARDUINO_NICLA_VISION)

4096

4082

rtx_retries este numărul de încercări de retransmisie. Implicit 3.

rtx_timeout_ms este timpul de expirare al retransmisiei în milisecunde (dublat după fiecare expirare). Implicit 500.

lock_interval_ms este intervalul minim de blocare în milisecunde. Implicit 10.

poll_ms este intervalul de interogare (polling) în milisecunde. 0 (valoarea implicită) dezactivează interogarea pe temporizator.

protocol.is_active() bool

Returnează True dacă o gazdă este conectată în prezent și stiva de protocol este activă, altfel False.

protocol.register(name: str, *, backend: object, flags: int = 0) ProtocolChannel

Înregistrează un obiect backend Python ca un nou canal logic și returnează un descriptor ProtocolChannel. Metodele disponibile ale obiectului backend (vezi Interfața backend mai jos) determină capacitățile canalului; protocol.CHANNEL_FLAG_READ, protocol.CHANNEL_FLAG_WRITE și protocol.CHANNEL_FLAG_LOCK sunt adăugate automat la flags atunci când metodele corespunzătoare sunt implementate.

name este numele canalului sub formă de șir de caractere. Trunchiat la dimensiunea tamponului de nume de canal al firmware-ului. Obligatoriu.

backend este obiectul Python care implementează interfața backend. Obligatoriu. De obicei transmis prin cuvânt-cheie (backend=...).

flags reprezintă biți suplimentari de marcaj ai canalului (vezi constantele CHANNEL_FLAG_*). Opțional; implicit 0.

Ridică RuntimeError dacă canalul nu poate fi înregistrat (de exemplu, nu există sloturi de canal libere).

Clase

class protocol.ProtocolChannel

Descriptor returnat de protocol.register. Instanțele nu sunt construite direct.

send_event(event: int, wait_ack: bool = False) None

Trimite o notificare de eveniment de canal către gazdă.

event este identificatorul evenimentului (număr întreg).

wait_ack dacă este True, blochează până când gazda confirmă evenimentul.

Ridică RuntimeError dacă trimiterea evenimentului eșuează.

Interfața backend

Un obiect backend transmis către protocol.register poate implementa orice submulțime a metodelor de mai jos. Doar metodele prezente pe obiect sunt conectate la stratul de protocol C; metodele lipsă lasă capacitatea corespunzătoare dezactivată.

class protocol.backend

Obiect backend de canal transmis către protocol.register. Metodele de mai jos descriu interfața opțională pe care un backend Python o poate implementa.

init() object

Apelat o singură dată atunci când canalul este inițializat. Returnează orice valoare diferită de None la succes; o excepție sau lipsa unei valori returnate este tratată ca eroare.

poll() bool

Returnează True dacă canalul are date gata de a fi citite de gazdă.

lock() bool

Achiziționează canalul pentru un transfer. Returnează True la succes.

unlock() bool

Eliberează canalul după un transfer. Returnează True la succes.

size() int

Returnează numărul de octeți care pot fi citiți în prezent din canal.

shape() tuple

Returnează un tuplu de până la patru numere întregi care descriu forma datelor (de exemplu, dimensiunile imaginii). Stratul de protocol consumă până la patru elemente.

flush() object

Golește orice date în așteptare. Returnează orice valoare diferită de None la succes.

read(offset: int, size: int) bytes

Returnează până la size octeți începând de la offset ca un obiect de tip bytes care suportă protocolul de tampon (buffer).

readp(offset: int, size: int) bytes

Variantă fără copiere (zero-copy) a read. Returnează un tampon (buffer) a cărui memorie subiacentă este citită direct de stratul de protocol; tamponul trebuie să rămână valid pe durata transferului.

write(offset: int, data: bytearray) int

Scrie data la offset. data este un bytearray care referențiază direct tamponul (buffer) C. Returnează numărul de octeți scriși, sau 0 la succesul implicit.

ioctl(cmd: int, length: int, arg: bytearray | None) int

Gestionează un ioctl. arg este None dacă length este zero, altfel un bytearray care referențiază tamponul (buffer) C. Returnează 0 sau None la succes, sau un număr întreg negativ la eroare.

is_active() bool

Pentru canalele de transport, returnează True dacă transportul subiacent este conectat în prezent.

class protocol.CBORChannel(on_read: Callable | None = None, on_write: Callable | None = None)

Un backend Python de nivel mai înalt (furnizat de pachetul protocol înghețat) care serializează câmpuri denumite în CBOR folosind chei întregi compatibile cu SenML. Suportă widget-uri de afișare (label, depth) și controale interactive (toggle, slider, select) cu funcții de retroapelare (callback) on_read/on_write.

on_read este un apelabil opțional on_read(channel) invocat înainte ca canalul să fie serializat pentru gazdă. Folosește-l pentru a reîmprospăta valorile câmpurilor.

on_write este un apelabil opțional on_write(channel, name, value) invocat atunci când gazda scrie o nouă valoare pentru un câmp denumit.

add(name: str, type: str, value: Any = None, unit: str | None = None, min: int | float | None = None, max: int | float | None = None, step: int | float | None = None, options: list | None = None, width: int | None = None, height: int | None = None) None

Adaugă un câmp denumit la canal.

name este numele afișat; trebuie să fie unic în cadrul acestui canal.

type este tipul de widget: "label", "toggle", "slider", "select" sau "depth".

value este valoarea inițială. Valoarea implicită depinde de type.

unit este șirul de unitate pentru label/slider (de exemplu "Cel", "%RH").

min este valoarea minimă (intervalul slider-ului sau intervalul de adâncime).

max este valoarea maximă (intervalul slider-ului sau intervalul de adâncime).

step este dimensiunea pasului (slider).

options este lista de șiruri de opțiuni (select).

width este lățimea în pixeli (depth).

height este înălțimea în pixeli (depth).

__getitem__(name: str) object

Returnează valoarea curentă a câmpului denumit. Pentru câmpurile depth este returnat tamponul de date binare, altfel valoarea scalară.

__setitem__(name: str, value: Any) None

Setează valoarea câmpului denumit. Pentru câmpurile slider, un tuplu (min, max, value) actualizează simultan intervalul și valoarea curentă. Pentru câmpurile depth, value este tamponul de date binare.

poll() bool

Metodă a interfeței backend. Returnează True atunci când există date serializate disponibile pentru gazdă.

size() int

Metodă a interfeței backend. Invocă on_read (dacă este setat) și returnează dimensiunea tamponului serializat.

read(offset: int, size: int) bytes

Metodă a interfeței backend. Returnează o porțiune a tamponului serializat.

write(offset: int, data: bytearray) int

Metodă a interfeței backend. Decodează o listă de actualizări CBOR și aplică valorile câmpurilor denumite corespunzătoare, invocând on_write pentru fiecare.

Constante

Biți de marcaj ai canalului (combinați la nivel de bit; transmiși către protocol.register prin flags sau setați automat în funcție de metodele backend-ului).

protocol.CHANNEL_FLAG_READ: int

Canalul suportă citiri.

protocol.CHANNEL_FLAG_WRITE: int

Canalul suportă scrieri.

protocol.CHANNEL_FLAG_LOCK: int

Canalul implementează lock/unlock.

protocol.CHANNEL_FLAG_PHYSICAL: int

Canalul reprezintă un transport fizic (spre deosebire de un canal logic de date).

Identificatori de canal încorporați.

protocol.CHANNEL_ID_TRANSPORT: int

ID de canal rezervat pentru transportul activ.

protocol.CHANNEL_ID_STDIN: int

ID-ul canalului stdin încorporat.

protocol.CHANNEL_ID_STDOUT: int

ID-ul canalului stdout încorporat.

protocol.CHANNEL_ID_STREAM: int

ID-ul canalului stream încorporat.

protocol.CHANNEL_ID_PROFILE: int

ID-ul canalului profiler încorporat (prezent doar atunci când firmware-ul este compilat cu profiler-ul activat).