10.3. Transmissão ao vivo – um espectador

Os navegadores conseguem renderizar streams Motion JPEG (MJPEG) multipartes diretamente dentro de uma tag <img>. Entregue ao navegador uma única resposta HTTP que nunca termina, escreva JPEGs separados por um limite multipart, e o navegador exibe cada quadro à medida que ele chega.

O navegador envia GET /stream.jpg; a câmera responde com Content-Type multipart/x-mixed-replace e escreve uma parte com corpo JPEG por quadro até o navegador desconectar.

O tráfego é direto: um cabeçalho de resposta, Content-Type: multipart/x-mixed-replace; boundary=frame, depois uma linha --frame, Content-Type: image/jpeg, uma linha em branco, os bytes JPEG, \r\n, e repete. O navegador fecha a conexão quando a <img> é removida ou a aba é fechada.

10.3.1. Capturando sem bloquear

A csi0.snapshot() bloqueante usada até agora trava todo o event loop até que o sensor entregue um quadro. Isso era aceitável quando uma requisição disparava um snapshot e nada mais estava rodando. Uma vez que um stream está aberto, o servidor precisa continuar tratando outras requisições enquanto o próximo quadro está sendo capturado – a chamada de captura precisa ceder ao event loop enquanto está esperando pelo sensor.

O padrão é um wrapper fino AsyncCSI que consulta csi.CSI.snapshot() em modo não bloqueante e coloca a corrotina para dormir entre as consultas. O capítulo sobre asyncio percorreu esse padrão em AsyncCSI; por ora, inclua-o diretamente no script:

import asyncio

class AsyncCSI:
    def __init__(self, *args, **kwargs):
        self._csi = csi.CSI(*args, **kwargs)

    def __getattr__(self, name):
        return getattr(self._csi, name)

    async def snapshot(self):
        while True:
            img = self._csi.snapshot(blocking=False)
            if img is not None:
                return img
            await asyncio.sleep_ms(0)

Todos os demais métodos de CSI (reset(), pixformat(), framesize(), gain_db(), …) são encaminhados através de __getattr__; apenas snapshot() é substituído por uma versão aguardável que permite ao event loop agendar outras corrotinas entre as consultas.

Troque o csi.CSI() simples da rota de snapshot por um AsyncCSI():

csi0 = AsyncCSI()
csi0.reset()
csi0.pixformat(csi.RGB565)
csi0.framesize(csi.QVGA)

10.3.2. Corpos de stream são iteradores baseados em classe

Um corpo de resposta em stream é apenas um objeto que o microdot itera com async for, enviando cada bloco produzido pelo socket. No CPython, isso normalmente é uma função geradora assíncronaasync def com yield. O MicroPython não suporta isso:

Nota

O asyncio do MicroPython não suporta funções geradoras assíncronas (async def name(): ... yield ...). Os corpos de resposta em stream precisam ser iteradores assíncronos baseados em classe, com __aiter__ retornando self e __anext__ definido como async def.

Para um stream MJPEG, isso significa uma classe cujo __anext__ aguarda um quadro e o retorna emoldurado no wrapper multipart:

BOUNDARY = b'frame'

class FrameStream:
    def __aiter__(self):
        return self

    async def __anext__(self):
        img = await csi0.snapshot()
        jpeg = bytes(img.compress(quality=85).bytearray())
        return (b'--' + BOUNDARY + b'\r\n'
                b'Content-Type: image/jpeg\r\n\r\n'
                + jpeg + b'\r\n')

@app.get('/stream.jpg')
async def stream(request):
    return Response(
        body=FrameStream(),
        headers={
            'Content-Type':
                b'multipart/x-mixed-replace; boundary=' + BOUNDARY,
        },
    )

A instância é nova a cada requisição, de modo que cada cliente conectado recebe seu próprio iterador. Quando o navegador desconecta, o microdot para de aguardar __anext__ e o iterador é coletado pelo garbage collector.

Nota

O envoltório bytes(...) ao redor do JPEG é defensivo. bytearray() retorna uma view para dentro do buffer de imagem da câmera, e a próxima chamada de snapshot() reescreve esse buffer no lugar. Envolver em bytes copia o JPEG para fora, de modo que o bloco que o microdot está no meio da escrita permaneça estável mesmo que o flush do escritor não tenha terminado quando __anext__ rodar novamente.

10.3.3. Rodando o servidor dentro do asyncio

A chamada anterior app.run(host=..., port=...) é bloqueante. O handler MJPEG precisa compartilhar o loop com as consultas de snapshot do AsyncCSI, então troque app.run por start_server() dentro de um asyncio.run():

async def main():
    await app.start_server(host='0.0.0.0', port=80)

asyncio.run(main())

O wrapper asyncio.run() permite que o servidor seja uma tarefa entre várias – a corrotina main é então o lugar natural para gerar a captura, a detecção de movimento e qualquer outra coisa que precise compartilhar o loop com o servidor HTTP.

10.3.4. Um espectador por vez

Cada cliente conectado roda seu próprio iterador FrameStream, o que significa que cada cliente dispara sua própria chamada csi0.snapshot(). Dois navegadores significam duas leituras do sensor por intervalo de quadro, três significam três, e assim por diante. O sensor não pode, na verdade, entregar quadros mais rápido do que sua própria taxa de quadros, então as requisições se enfileiram umas atrás das outras e o stream de todos fica mais lento.

A solução é um único loop de captura compartilhado publicando um quadro para muitos leitores.