protocol — Canale de protocol OpenMV¶
Modulul protocol expune protocolul gazdă OpenMV către Python. Permite inițializarea și configurarea stivei de protocol din partea firmware-ului și permite codului utilizatorului să înregistreze canale logice personalizate susținute de un obiect Python care implementează interfața canalului (read, write, size, poll etc.). Acesta este punctul cu care comunică instrumentele companion de pe desktop atunci când transmit date de imagine sau expun widget-uri interactive către o cameră conectată.
Exemple¶
Transmite o imagine RGB565 către un instrument gazdă folosind un backend personalizat care implementează interfața de canal brut (backend.size(), backend.shape(), backend.poll(), backend.read()):
import csi
import protocol
csi0 = csi.CSI()
csi0.reset()
csi0.pixformat(csi.RGB565)
csi0.framesize(csi.HD)
img = csi0.snapshot()
img_mv = memoryview(img.bytearray())
frame_ready = True
class FrameChannel:
def size(self):
return len(img_mv)
def shape(self):
return (img.height(), img.width(), len(img_mv))
def poll(self):
return frame_ready
def read(self, offset, size):
global frame_ready
end = offset + size
chunk = img_mv[offset:end]
if end >= len(img_mv):
frame_ready = False
return chunk
protocol.register(name="frame", backend=FrameChannel())
while True:
if not frame_ready:
img = csi0.snapshot()
img_mv = memoryview(img.bytearray())
frame_ready = True
Scriptul corespondent de pe partea gazdei, folosind pachetul Python openmv (pip install openmv) pentru a se conecta, a încărca scriptul de pe cameră și a prelua fiecare cadru:
import cv2
import numpy as np
from openmv.camera import Camera
# The on-cam script above, stored as a string (or read from a file).
SCRIPT = open("frame_streamer_on_cam.py").read()
with Camera("/dev/ttyACM0", baudrate=921600) as cam:
cam.stop()
cam.exec(SCRIPT)
while True:
status = cam.read_status()
if not cam.has_channel("frame") or not status.get("frame"):
continue
h, w, size = cam._channel_shape(cam.get_channel(name="frame"))
if cam.channel_size("frame") < size:
continue
data = cam.channel_read("frame", size)
rgb565 = np.frombuffer(data, dtype="<u2").reshape(h, w)
# Unpack RGB565 to an HxWx3 uint8 RGB image.
r = ((rgb565 >> 11) & 0x1F) << 3
g = ((rgb565 >> 5) & 0x3F) << 2
b = ( rgb565 & 0x1F) << 3
frame = np.dstack([r, g, b]).astype(np.uint8)
# Display with OpenCV (cv2 expects BGR, not RGB).
cv2.imshow("OpenMV", cv2.cvtColor(frame, cv2.COLOR_RGB2BGR))
if cv2.waitKey(1) == ord("q"):
break
cv2.destroyAllWindows()
Înlocuiește /dev/ttyACM0 cu portul serial al camerei (de exemplu COM3 pe Windows). Constructorul openmv.camera.Camera acceptă aceiași parametri de protocol ca init (crc / seq / ack / events / max_payload / max_retry / timeout) atunci când stiva de pe partea camerei a fost reconfigurată pentru a se potrivi.
Funcții¶
- protocol.init(crc: bool = True, seq: bool = True, ack: bool = True, events: bool = True, max_payload: int = ..., rtx_retries: int = 3, rtx_timeout_ms: int = 500, lock_interval_ms: int = 10, poll_ms: int = 0) None¶
Inițializează (sau reconfigurează) stiva de protocol și înregistrează canalele logice de date implicite (
stdin,stdout,streamși, dacă este compilat,profile). RidicăRuntimeErrordacă inițializarea eșuează. Firmware-ul pornește cu o stivă de protocol USB implicită deja activă, așa că apelarea acestei funcții este necesară doar pentru a schimba transportul sau pentru a suprascrie parametrii de încadrare impliciți.crcactivează validarea CRC pe cadrele de protocol.seqactivează urmărirea numerelor de secvență.ackactivează confirmările pentru fiecare cadru.eventsactivează notificările de evenimente ale canalelor.max_payloadeste dimensiunea maximă a încărcăturii utile în octeți. Dacă este omis, se folosește valoarea implicită per cameră de mai jos; aceasta este derivată din dimensiunea tamponului (buffer) de protocol al fiecărei plăci cabuffer - 10 (header) - 4 (CRC).Cameră
Dimensiune tampon (buffer)
Încărcătură utilă maximă
OpenMV Cam M4 (
OPENMV2)512
498
OpenMV Cam M7 (
OPENMV3)512
498
OpenMV Cam H7 (
OPENMV4)512
498
OpenMV Cam H7 Plus (
OPENMV4P)4096
4082
OpenMV Pure Thermal (
OPENMVPT)4096
4082
OpenMV Cam RT1062 (
OPENMV_RT1060)4096
4082
OpenMV Cam N6 (
OPENMV_N6)8192
8178
OpenMV AE3 (
OPENMV_AE3)8192
8178
Arduino Portenta H7 (
ARDUINO_PORTENTA_H7)4096
4082
Arduino Giga (
ARDUINO_GIGA)4096
4082
Arduino Nicla Vision (
ARDUINO_NICLA_VISION)4096
4082
rtx_retrieseste numărul de încercări de retransmisie. Implicit3.rtx_timeout_mseste timpul de expirare al retransmisiei în milisecunde (dublat după fiecare expirare). Implicit500.lock_interval_mseste intervalul minim de blocare în milisecunde. Implicit10.poll_mseste intervalul de interogare (polling) în milisecunde.0(valoarea implicită) dezactivează interogarea pe temporizator.
- protocol.is_active() bool¶
Returnează
Truedacă o gazdă este conectată în prezent și stiva de protocol este activă, altfelFalse.
- protocol.register(name: str, *, backend: object, flags: int = 0) ProtocolChannel¶
Înregistrează un obiect
backendPython ca un nou canal logic și returnează un descriptorProtocolChannel. Metodele disponibile ale obiectuluibackend(vezi Interfața backend mai jos) determină capacitățile canalului;protocol.CHANNEL_FLAG_READ,protocol.CHANNEL_FLAG_WRITEșiprotocol.CHANNEL_FLAG_LOCKsunt adăugate automat laflagsatunci când metodele corespunzătoare sunt implementate.nameeste numele canalului sub formă de șir de caractere. Trunchiat la dimensiunea tamponului de nume de canal al firmware-ului. Obligatoriu.backendeste obiectul Python care implementează interfața backend. Obligatoriu. De obicei transmis prin cuvânt-cheie (backend=...).flagsreprezintă biți suplimentari de marcaj ai canalului (vezi constanteleCHANNEL_FLAG_*). Opțional; implicit0.Ridică
RuntimeErrordacă canalul nu poate fi înregistrat (de exemplu, nu există sloturi de canal libere).
Clase¶
- class protocol.ProtocolChannel¶
Descriptor returnat de
protocol.register. Instanțele nu sunt construite direct.
Interfața backend¶
Un obiect backend transmis către protocol.register poate implementa orice submulțime a metodelor de mai jos. Doar metodele prezente pe obiect sunt conectate la stratul de protocol C; metodele lipsă lasă capacitatea corespunzătoare dezactivată.
- class protocol.backend¶
Obiect backend de canal transmis către
protocol.register. Metodele de mai jos descriu interfața opțională pe care un backend Python o poate implementa.- init() object¶
Apelat o singură dată atunci când canalul este inițializat. Returnează orice valoare diferită de
Nonela succes; o excepție sau lipsa unei valori returnate este tratată ca eroare.
- shape() tuple¶
Returnează un tuplu de până la patru numere întregi care descriu forma datelor (de exemplu, dimensiunile imaginii). Stratul de protocol consumă până la patru elemente.
- flush() object¶
Golește orice date în așteptare. Returnează orice valoare diferită de
Nonela succes.
- read(offset: int, size: int) bytes¶
Returnează până la
sizeocteți începând de laoffsetca un obiect de tipbytescare suportă protocolul de tampon (buffer).
- readp(offset: int, size: int) bytes¶
Variantă fără copiere (zero-copy) a
read. Returnează un tampon (buffer) a cărui memorie subiacentă este citită direct de stratul de protocol; tamponul trebuie să rămână valid pe durata transferului.
- write(offset: int, data: bytearray) int¶
Scrie
datalaoffset.dataeste unbytearraycare referențiază direct tamponul (buffer) C. Returnează numărul de octeți scriși, sau0la succesul implicit.
- class protocol.CBORChannel(on_read: Callable | None = None, on_write: Callable | None = None)¶
Un backend Python de nivel mai înalt (furnizat de pachetul
protocolînghețat) care serializează câmpuri denumite în CBOR folosind chei întregi compatibile cu SenML. Suportă widget-uri de afișare (label,depth) și controale interactive (toggle,slider,select) cu funcții de retroapelare (callback)on_read/on_write.on_readeste un apelabil opționalon_read(channel)invocat înainte ca canalul să fie serializat pentru gazdă. Folosește-l pentru a reîmprospăta valorile câmpurilor.on_writeeste un apelabil opționalon_write(channel, name, value)invocat atunci când gazda scrie o nouă valoare pentru un câmp denumit.- add(name: str, type: str, value: Any = None, unit: str | None = None, min: int | float | None = None, max: int | float | None = None, step: int | float | None = None, options: list | None = None, width: int | None = None, height: int | None = None) None¶
Adaugă un câmp denumit la canal.
nameeste numele afișat; trebuie să fie unic în cadrul acestui canal.typeeste tipul de widget:"label","toggle","slider","select"sau"depth".valueeste valoarea inițială. Valoarea implicită depinde detype.uniteste șirul de unitate pentrulabel/slider(de exemplu"Cel","%RH").mineste valoarea minimă (intervalul slider-ului sau intervalul de adâncime).maxeste valoarea maximă (intervalul slider-ului sau intervalul de adâncime).stepeste dimensiunea pasului (slider).optionseste lista de șiruri de opțiuni (select).widtheste lățimea în pixeli (depth).heighteste înălțimea în pixeli (depth).
- __getitem__(name: str) object¶
Returnează valoarea curentă a câmpului denumit. Pentru câmpurile
deptheste returnat tamponul de date binare, altfel valoarea scalară.
- __setitem__(name: str, value: Any) None¶
Setează valoarea câmpului denumit. Pentru câmpurile
slider, un tuplu(min, max, value)actualizează simultan intervalul și valoarea curentă. Pentru câmpuriledepth,valueeste tamponul de date binare.
- poll() bool¶
Metodă a interfeței backend. Returnează
Trueatunci când există date serializate disponibile pentru gazdă.
- size() int¶
Metodă a interfeței backend. Invocă
on_read(dacă este setat) și returnează dimensiunea tamponului serializat.
Constante¶
Biți de marcaj ai canalului (combinați la nivel de bit; transmiși către protocol.register prin flags sau setați automat în funcție de metodele backend-ului).
- protocol.CHANNEL_FLAG_PHYSICAL: int¶
Canalul reprezintă un transport fizic (spre deosebire de un canal logic de date).
Identificatori de canal încorporați.