protocol — Canais do Protocolo OpenMV

O módulo protocol expõe o protocolo de anfitrião OpenMV ao Python. Permite que a pilha de protocolo do lado do firmware seja inicializada e configurada, e permite que o código do utilizador registe canais lógicos personalizados suportados por um objeto Python que implementa a interface do canal (read, write, size, poll, etc.). É isto que as ferramentas de acompanhamento de ambiente de trabalho utilizam quando transmitem dados de imagem ou expõem widgets interativos a uma câmara ligada.

Exemplos

Transmitir uma imagem RGB565 para uma ferramenta de anfitrião usando um backend personalizado que implementa a interface de canal bruto (backend.size(), backend.shape(), backend.poll(), backend.read()):

import csi
import protocol

csi0 = csi.CSI()
csi0.reset()
csi0.pixformat(csi.RGB565)
csi0.framesize(csi.HD)

img = csi0.snapshot()
img_mv = memoryview(img.bytearray())
frame_ready = True


class FrameChannel:
    def size(self):
        return len(img_mv)

    def shape(self):
        return (img.height(), img.width(), len(img_mv))

    def poll(self):
        return frame_ready

    def read(self, offset, size):
        global frame_ready
        end = offset + size
        chunk = img_mv[offset:end]
        if end >= len(img_mv):
            frame_ready = False
        return chunk


protocol.register(name="frame", backend=FrameChannel())

while True:
    if not frame_ready:
        img = csi0.snapshot()
        img_mv = memoryview(img.bytearray())
        frame_ready = True

O script correspondente do lado do anfitrião, usando o pacote Python openmv (pip install openmv) para ligar, enviar o script para a câmara e obter cada fotograma:

import cv2
import numpy as np
from openmv.camera import Camera

# The on-cam script above, stored as a string (or read from a file).
SCRIPT = open("frame_streamer_on_cam.py").read()

with Camera("/dev/ttyACM0", baudrate=921600) as cam:
    cam.stop()
    cam.exec(SCRIPT)

    while True:
        status = cam.read_status()
        if not cam.has_channel("frame") or not status.get("frame"):
            continue

        h, w, size = cam._channel_shape(cam.get_channel(name="frame"))
        if cam.channel_size("frame") < size:
            continue

        data = cam.channel_read("frame", size)
        rgb565 = np.frombuffer(data, dtype="<u2").reshape(h, w)

        # Unpack RGB565 to an HxWx3 uint8 RGB image.
        r = ((rgb565 >> 11) & 0x1F) << 3
        g = ((rgb565 >>  5) & 0x3F) << 2
        b = ( rgb565        & 0x1F) << 3
        frame = np.dstack([r, g, b]).astype(np.uint8)

        # Display with OpenCV (cv2 expects BGR, not RGB).
        cv2.imshow("OpenMV", cv2.cvtColor(frame, cv2.COLOR_RGB2BGR))
        if cv2.waitKey(1) == ord("q"):
            break

cv2.destroyAllWindows()

Substitua /dev/ttyACM0 pela porta série da câmara (por exemplo, COM3 no Windows). O construtor openmv.camera.Camera aceita os mesmos parâmetros de protocolo que init (crc / seq / ack / events / max_payload / max_retry / timeout) quando a pilha do lado da câmara foi reconfigurada para corresponder.

Funções

protocol.init(crc: bool = True, seq: bool = True, ack: bool = True, events: bool = True, max_payload: int = ..., rtx_retries: int = 3, rtx_timeout_ms: int = 500, lock_interval_ms: int = 10, poll_ms: int = 0) None

Inicializa (ou reconfigura) a pilha de protocolo e regista os canais de dados lógicos predefinidos (stdin, stdout, stream e, se compilado, profile). Lança RuntimeError se a inicialização falhar. O firmware arranca com uma pilha de protocolo USB predefinida já em execução, pelo que chamar esta função só é necessário para alterar o transporte ou substituir os parâmetros de enquadramento predefinidos.

crc ativa a validação CRC nos fotogramas do protocolo.

seq ativa o rastreamento do número de sequência.

ack ativa os reconhecimentos por fotograma.

events ativa as notificações de eventos do canal.

max_payload é o tamanho máximo do payload em bytes. Se omitido, é usado o valor predefinido por câmara abaixo indicado; é derivado do tamanho do buffer de protocolo de cada placa como buffer - 10 (header) - 4 (CRC).

Câmara

Tamanho do buffer

Payload máximo

OpenMV Cam M4 (OPENMV2)

512

498

OpenMV Cam M7 (OPENMV3)

512

498

OpenMV Cam H7 (OPENMV4)

512

498

OpenMV Cam H7 Plus (OPENMV4P)

4096

4082

OpenMV Pure Thermal (OPENMVPT)

4096

4082

OpenMV Cam RT1062 (OPENMV_RT1060)

4096

4082

OpenMV Cam N6 (OPENMV_N6)

8192

8178

OpenMV AE3 (OPENMV_AE3)

8192

8178

Arduino Portenta H7 (ARDUINO_PORTENTA_H7)

4096

4082

Arduino Giga (ARDUINO_GIGA)

4096

4082

Arduino Nicla Vision (ARDUINO_NICLA_VISION)

4096

4082

rtx_retries é o número de tentativas de retransmissão. Predefinição: 3.

rtx_timeout_ms é o tempo limite de retransmissão em milissegundos (duplicado após cada tempo limite esgotado). Predefinição: 500.

lock_interval_ms é o intervalo mínimo de bloqueio em milissegundos. Predefinição: 10.

poll_ms é o intervalo de sondagem em milissegundos. 0 (o valor predefinido) desativa a sondagem por temporizador.

protocol.is_active() bool

Devolve True se um anfitrião estiver atualmente ligado e a pilha de protocolo estiver ativa, caso contrário False.

protocol.register(name: str, *, backend: object, flags: int = 0) ProtocolChannel

Regista um objeto Python backend como um novo canal lógico e devolve um identificador ProtocolChannel. Os métodos disponíveis do objeto backend (consulte Interface de Backend abaixo) determinam as capacidades do canal; protocol.CHANNEL_FLAG_READ, protocol.CHANNEL_FLAG_WRITE e protocol.CHANNEL_FLAG_LOCK são adicionados automaticamente a flags quando os métodos correspondentes estiverem implementados.

name é o nome do canal como uma string. Truncado ao tamanho do buffer de nome do canal do firmware. Obrigatório.

backend é o objeto Python que implementa a interface de backend. Obrigatório. Normalmente passado por palavra-chave (backend=...).

flags são bits de sinalizadores de canal adicionais (consulte as constantes CHANNEL_FLAG_*). Opcional; predefinição é 0.

Lança RuntimeError se o canal não puder ser registado (por exemplo, sem slots de canal livres).

Classes

class protocol.ProtocolChannel

Identificador devolvido por protocol.register. As instâncias não são construídas diretamente.

send_event(event: int, wait_ack: bool = False) None

Envia uma notificação de evento de canal para o anfitrião.

event é o identificador do evento (inteiro).

wait_ack se True bloqueia até o anfitrião reconhecer o evento.

Lança RuntimeError se o envio do evento falhar.

Interface de Backend

Um objeto backend passado a protocol.register pode implementar qualquer subconjunto dos seguintes métodos. Apenas os métodos presentes no objeto são ligados à camada de protocolo C; os métodos em falta deixam a capacidade correspondente desativada.

class protocol.backend

Objeto backend do canal passado a protocol.register. Os métodos abaixo descrevem a interface opcional que um backend Python pode implementar.

init() object

Chamado uma vez quando o canal é inicializado. Devolva qualquer valor não None em caso de sucesso; uma exceção ou retorno em falta é tratado como erro.

poll() bool

Devolve True se o canal tiver dados prontos para serem lidos pelo anfitrião.

lock() bool

Adquire o canal para uma transferência. Devolve True em caso de sucesso.

unlock() bool

Liberta o canal após uma transferência. Devolve True em caso de sucesso.

size() int

Devolve o número de bytes atualmente legíveis do canal.

shape() tuple

Devolve um tuplo de até quatro inteiros descrevendo a forma dos dados (por exemplo, dimensões da imagem). Até quatro elementos são consumidos pela camada de protocolo.

flush() object

Liberta quaisquer dados pendentes. Devolve qualquer valor não None em caso de sucesso.

read(offset: int, size: int) bytes

Devolve até size bytes começando em offset como um objeto semelhante a bytes que suporta o protocolo de buffer.

readp(offset: int, size: int) bytes

Variante de cópia zero de read. Devolve um buffer cuja memória subjacente é lida diretamente pela camada de protocolo; o buffer deve permanecer válido durante a transferência.

write(offset: int, data: bytearray) int

Escreve data em offset. data é um bytearray referenciando o buffer C diretamente. Devolve o número de bytes escritos, ou 0 em caso de sucesso predefinido.

ioctl(cmd: int, length: int, arg: bytearray | None) int

Trata um ioctl. arg é None se length for zero, caso contrário um bytearray referenciando o buffer C. Devolve 0 ou None em caso de sucesso, ou um inteiro negativo em caso de erro.

is_active() bool

Para canais de transporte, devolve True se o transporte subjacente estiver atualmente ligado.

class protocol.CBORChannel(on_read: Callable | None = None, on_write: Callable | None = None)

Um backend Python de nível superior (fornecido pelo pacote protocol congelado) que serializa campos nomeados para CBOR usando chaves inteiras compatíveis com SenML. Suporta widgets de apresentação (label, depth) e controlos interativos (toggle, slider, select) com callbacks on_read/on_write.

on_read é um callable opcional on_read(channel) invocado antes de o canal ser serializado para o anfitrião. Utilize-o para atualizar os valores dos campos.

on_write é um callable opcional on_write(channel, name, value) invocado quando o anfitrião escreve um novo valor para um campo nomeado.

add(name: str, type: str, value: Any = None, unit: str | None = None, min: int | float | None = None, max: int | float | None = None, step: int | float | None = None, options: list | None = None, width: int | None = None, height: int | None = None) None

Adiciona um campo nomeado ao canal.

name é o nome de exibição; deve ser único neste canal.

type é o tipo de widget: "label", "toggle", "slider", "select" ou "depth".

value é o valor inicial. O valor predefinido depende de type.

unit é a string de unidade para label/slider (por exemplo, "Cel", "%RH").

min é o valor mínimo (intervalo do slider ou intervalo de profundidade).

max é o valor máximo (intervalo do slider ou intervalo de profundidade).

step é o tamanho do passo (slider).

options é a lista de strings de opções (select).

width é a largura em pixels (depth).

height é a altura em pixels (depth).

__getitem__(name: str) object

Devolve o valor atual do campo nomeado. Para campos depth, o buffer de dados binários é devolvido; caso contrário, o valor escalar.

__setitem__(name: str, value: Any) None

Define o valor do campo nomeado. Para campos slider, um tuplo (min, max, value) atualiza simultaneamente o intervalo e o valor atual. Para campos depth, value é o buffer de dados binários.

poll() bool

Método da interface de backend. Devolve True quando os dados serializados estão disponíveis para o anfitrião.

size() int

Método da interface de backend. Invoca on_read (se definido) e devolve o tamanho do buffer serializado.

read(offset: int, size: int) bytes

Método da interface de backend. Devolve um fragmento do buffer serializado.

write(offset: int, data: bytearray) int

Método da interface de backend. Descodifica uma lista de atualização CBOR e aplica os valores aos campos nomeados correspondentes, invocando on_write para cada um.

Constantes

Bits de sinalizadores de canal (combinados por bitwise; passados a protocol.register via flags ou definidos automaticamente com base nos métodos do backend).

protocol.CHANNEL_FLAG_READ: int

O canal suporta leituras.

protocol.CHANNEL_FLAG_WRITE: int

O canal suporta escritas.

protocol.CHANNEL_FLAG_LOCK: int

O canal implementa lock/unlock.

protocol.CHANNEL_FLAG_PHYSICAL: int

O canal representa um transporte físico (em oposição a um canal de dados lógico).

Identificadores de canal incorporados.

protocol.CHANNEL_ID_TRANSPORT: int

ID de canal reservado para o transporte ativo.

protocol.CHANNEL_ID_STDIN: int

ID de canal do canal stdin incorporado.

protocol.CHANNEL_ID_STDOUT: int

ID de canal do canal stdout incorporado.

protocol.CHANNEL_ID_STREAM: int

ID de canal do canal stream incorporado.

protocol.CHANNEL_ID_PROFILE: int

ID de canal do canal de criação de perfis incorporado (apenas presente quando o firmware é compilado com o criador de perfis ativado).