protocol — Kanály protokolu OpenMV

Modul protocol zpřístupňuje hostitelský protokol OpenMV jazyku Python. Umožňuje inicializaci a konfiguraci protokolového zásobníku na straně firmwaru a dovoluje uživatelskému kódu registrovat vlastní logické kanály podpořené objektem Python, který implementuje rozhraní kanálu (read, write, size, poll atd.). S tím komunikují doprovodné nástroje na počítači, když streamují obrazová data nebo zpřístupňují interaktivní widgety připojené kameře.

Příklady

Stream obrazu RGB565 do hostitelského nástroje pomocí vlastního backendu, který implementuje rozhraní surového kanálu (backend.size(), backend.shape(), backend.poll(), backend.read()):

import csi
import protocol

csi0 = csi.CSI()
csi0.reset()
csi0.pixformat(csi.RGB565)
csi0.framesize(csi.HD)

img = csi0.snapshot()
img_mv = memoryview(img.bytearray())
frame_ready = True


class FrameChannel:
    def size(self):
        return len(img_mv)

    def shape(self):
        return (img.height(), img.width(), len(img_mv))

    def poll(self):
        return frame_ready

    def read(self, offset, size):
        global frame_ready
        end = offset + size
        chunk = img_mv[offset:end]
        if end >= len(img_mv):
            frame_ready = False
        return chunk


protocol.register(name="frame", backend=FrameChannel())

while True:
    if not frame_ready:
        img = csi0.snapshot()
        img_mv = memoryview(img.bytearray())
        frame_ready = True

Odpovídající skript na straně hostitele, který používá balíček Python openmv (pip install openmv) k připojení, nahrání skriptu do kamery a stažení každého snímku:

import cv2
import numpy as np
from openmv.camera import Camera

# The on-cam script above, stored as a string (or read from a file).
SCRIPT = open("frame_streamer_on_cam.py").read()

with Camera("/dev/ttyACM0", baudrate=921600) as cam:
    cam.stop()
    cam.exec(SCRIPT)

    while True:
        status = cam.read_status()
        if not cam.has_channel("frame") or not status.get("frame"):
            continue

        h, w, size = cam._channel_shape(cam.get_channel(name="frame"))
        if cam.channel_size("frame") < size:
            continue

        data = cam.channel_read("frame", size)
        rgb565 = np.frombuffer(data, dtype="<u2").reshape(h, w)

        # Unpack RGB565 to an HxWx3 uint8 RGB image.
        r = ((rgb565 >> 11) & 0x1F) << 3
        g = ((rgb565 >>  5) & 0x3F) << 2
        b = ( rgb565        & 0x1F) << 3
        frame = np.dstack([r, g, b]).astype(np.uint8)

        # Display with OpenCV (cv2 expects BGR, not RGB).
        cv2.imshow("OpenMV", cv2.cvtColor(frame, cv2.COLOR_RGB2BGR))
        if cv2.waitKey(1) == ord("q"):
            break

cv2.destroyAllWindows()

Nahraďte /dev/ttyACM0 sériovým portem kamery (např. COM3 ve Windows). Konstruktor openmv.camera.Camera přijímá stejné parametry protokolu jako init (crc / seq / ack / events / max_payload / max_retry / timeout), pokud byl zásobník na straně kamery překonfigurován tak, aby jim odpovídal.

Funkce

protocol.init(crc: bool = True, seq: bool = True, ack: bool = True, events: bool = True, max_payload: int = ..., rtx_retries: int = 3, rtx_timeout_ms: int = 500, lock_interval_ms: int = 10, poll_ms: int = 0) None

Inicializuje (nebo překonfiguruje) protokolový zásobník a registruje výchozí logické datové kanály (stdin, stdout, stream a, pokud je zkompilován, profile). Vyvolá RuntimeError, pokud inicializace selže. Firmware startuje s již běžícím výchozím protokolovým zásobníkem USB, takže toto volání je potřeba pouze ke změně přenosu nebo přepsání výchozích parametrů rámcování.

crc povoluje ověřování CRC u rámců protokolu.

seq povoluje sledování sekvenčních čísel.

ack povoluje potvrzování jednotlivých rámců.

events povoluje oznámení událostí kanálu.

max_payload je maximální velikost užitečného zatížení v bajtech. Pokud je vynechána, použije se níže uvedená výchozí hodnota pro danou kameru; ta je odvozena z velikosti protokolového bufferu každé desky jako buffer - 10 (header) - 4 (CRC).

Kamera

Velikost bufferu

Max. užitečné zatížení

OpenMV Cam M4 (OPENMV2)

512

498

OpenMV Cam M7 (OPENMV3)

512

498

OpenMV Cam H7 (OPENMV4)

512

498

OpenMV Cam H7 Plus (OPENMV4P)

4096

4082

OpenMV Pure Thermal (OPENMVPT)

4096

4082

OpenMV Cam RT1062 (OPENMV_RT1060)

4096

4082

OpenMV Cam N6 (OPENMV_N6)

8192

8178

OpenMV AE3 (OPENMV_AE3)

8192

8178

Arduino Portenta H7 (ARDUINO_PORTENTA_H7)

4096

4082

Arduino Giga (ARDUINO_GIGA)

4096

4082

Arduino Nicla Vision (ARDUINO_NICLA_VISION)

4096

4082

rtx_retries je počet pokusů o opětovné odeslání. Výchozí hodnota 3.

rtx_timeout_ms je časový limit opětovného odeslání v milisekundách (po každém vypršení se zdvojnásobí). Výchozí hodnota 500.

lock_interval_ms je minimální interval zámku v milisekundách. Výchozí hodnota 10.

poll_ms je interval dotazování v milisekundách. 0 (výchozí) zakazuje dotazování pomocí časovače.

protocol.is_active() bool

Vrací True, pokud je aktuálně připojen hostitel a protokolový zásobník je aktivní, jinak False.

protocol.register(name: str, *, backend: object, flags: int = 0) ProtocolChannel

Registruje objekt Python backend jako nový logický kanál a vrátí úchyt ProtocolChannel. Dostupné metody objektu backend (viz Rozhraní backendu níže) určují schopnosti kanálu; protocol.CHANNEL_FLAG_READ, protocol.CHANNEL_FLAG_WRITE a protocol.CHANNEL_FLAG_LOCK jsou do flags přidány automaticky, když jsou odpovídající metody implementovány.

name je název kanálu jako řetězec. Zkrácen na velikost bufferu pro názvy kanálů ve firmwaru. Povinné.

backend je objekt Python implementující rozhraní backendu. Povinné. Obvykle předáván klíčovým slovem (backend=...).

flags jsou další příznakové bity kanálu (viz konstanty CHANNEL_FLAG_*). Nepovinné; výchozí hodnota 0.

Vyvolá RuntimeError, pokud kanál nelze zaregistrovat (např. žádné volné sloty kanálů).

Třídy

class protocol.ProtocolChannel

Úchyt vrácený funkcí protocol.register. Instance se nevytvářejí přímo.

send_event(event: int, wait_ack: bool = False) None

Odešle hostiteli oznámení o události kanálu.

event je identifikátor události (celé číslo).

wait_ack, pokud je True, blokuje, dokud hostitel událost nepotvrdí.

Vyvolá RuntimeError, pokud odeslání události selže.

Rozhraní backendu

Objekt backendu předaný funkci protocol.register může implementovat libovolnou podmnožinu následujících metod. K protokolové vrstvě C jsou napojeny pouze metody přítomné na objektu; chybějící metody ponechávají odpovídající schopnost vypnutou.

class protocol.backend

Objekt backendu kanálu předaný funkci protocol.register. Níže uvedené metody popisují volitelné rozhraní, které může backend Python implementovat.

init() object

Voláno jednou při inicializaci kanálu. Při úspěchu vrací libovolnou hodnotu různou od None; výjimka nebo chybějící návratová hodnota je považována za chybu.

poll() bool

Vrací True, pokud má kanál data připravená ke čtení hostitelem.

lock() bool

Získá kanál pro přenos. Při úspěchu vrací True.

unlock() bool

Uvolní kanál po přenosu. Při úspěchu vrací True.

size() int

Vrací počet bajtů aktuálně čitelných z kanálu.

shape() tuple

Vrací n-tici až čtyř celých čísel popisujících tvar dat (např. rozměry obrazu). Protokolová vrstva spotřebuje až čtyři prvky.

flush() object

Vyprázdní veškerá čekající data. Při úspěchu vrací libovolnou hodnotu různou od None.

read(offset: int, size: int) bytes

Vrací až size bajtů počínaje od offset jako objekt podobný bytes, který podporuje buffer protocol.

readp(offset: int, size: int) bytes

Varianta read bez kopírování (zero-copy). Vrací buffer, jehož podkladovou paměť čte protokolová vrstva přímo; buffer musí zůstat platný po celou dobu trvání přenosu.

write(offset: int, data: bytearray) int

Zapíše data na offset. data je bytearray odkazující přímo na buffer C. Vrací počet zapsaných bajtů, nebo 0 při výchozím úspěchu.

ioctl(cmd: int, length: int, arg: bytearray | None) int

Zpracuje ioctl. arg je None, pokud je length nula, jinak bytearray odkazující na buffer C. Při úspěchu vrací 0 nebo None, nebo záporné celé číslo při chybě.

is_active() bool

U přenosových kanálů vrací True, pokud je podkladový přenos aktuálně připojen.

class protocol.CBORChannel(on_read: Callable | None = None, on_write: Callable | None = None)

Backend Python vyšší úrovně (poskytovaný zamrazeným balíčkem protocol), který serializuje pojmenovaná pole do CBOR pomocí celočíselných klíčů kompatibilních se SenML. Podporuje zobrazovací widgety (label, depth) a interaktivní ovládací prvky (toggle, slider, select) s callbacky on_read/on_write.

on_read je nepovinný volatelný objekt on_read(channel) vyvolaný před serializací kanálu pro hostitele. Použijte jej k obnovení hodnot polí.

on_write je nepovinný volatelný objekt on_write(channel, name, value) vyvolaný, když hostitel zapíše novou hodnotu pro pojmenované pole.

add(name: str, type: str, value: Any = None, unit: str | None = None, min: int | float | None = None, max: int | float | None = None, step: int | float | None = None, options: list | None = None, width: int | None = None, height: int | None = None) None

Přidá do kanálu pojmenované pole.

name je zobrazovaný název; musí být v rámci tohoto kanálu jedinečný.

type je typ widgetu: "label", "toggle", "slider", "select" nebo "depth".

value je počáteční hodnota. Výchozí hodnota závisí na type.

unit je řetězec jednotky pro label/slider (např. "Cel", "%RH").

min je minimální hodnota (rozsah slideru nebo rozsah hloubky).

max je maximální hodnota (rozsah slideru nebo rozsah hloubky).

step je velikost kroku (slider).

options je seznam řetězců možností (select).

width je šířka v pixelech (depth).

height je výška v pixelech (depth).

__getitem__(name: str) object

Vrací aktuální hodnotu pojmenovaného pole. U polí depth je vrácen buffer binárních dat, jinak skalární hodnota.

__setitem__(name: str, value: Any) None

Nastaví hodnotu pojmenovaného pole. U polí slider n-tice (min, max, value) současně aktualizuje rozsah i aktuální hodnotu. U polí depth je value buffer binárních dat.

poll() bool

Metoda rozhraní backendu. Vrací True, když jsou pro hostitele k dispozici serializovaná data.

size() int

Metoda rozhraní backendu. Vyvolá on_read (pokud je nastaveno) a vrátí velikost serializovaného bufferu.

read(offset: int, size: int) bytes

Metoda rozhraní backendu. Vrací výřez serializovaného bufferu.

write(offset: int, data: bytearray) int

Metoda rozhraní backendu. Dekóduje seznam aktualizací CBOR a aplikuje hodnoty na odpovídající pojmenovaná pole, přičemž pro každé vyvolá on_write.

Konstanty

Příznakové bity kanálu (kombinované bitově; předávané funkci protocol.register přes flags nebo nastavované automaticky na základě metod backendu).

protocol.CHANNEL_FLAG_READ: int

Kanál podporuje čtení.

protocol.CHANNEL_FLAG_WRITE: int

Kanál podporuje zápisy.

protocol.CHANNEL_FLAG_LOCK: int

Kanál implementuje lock/unlock.

protocol.CHANNEL_FLAG_PHYSICAL: int

Kanál představuje fyzický přenos (na rozdíl od logického datového kanálu).

Vestavěné identifikátory kanálů.

protocol.CHANNEL_ID_TRANSPORT: int

Vyhrazené ID kanálu pro aktivní přenos.

protocol.CHANNEL_ID_STDIN: int

ID kanálu vestavěného kanálu stdin.

protocol.CHANNEL_ID_STDOUT: int

ID kanálu vestavěného kanálu stdout.

protocol.CHANNEL_ID_STREAM: int

ID kanálu vestavěného kanálu stream.

protocol.CHANNEL_ID_PROFILE: int

ID kanálu vestavěného kanálu profileru (přítomen pouze tehdy, je-li firmware sestaven s povoleným profilerem).