10.4. Compartilhando um loop de captura entre espectadores

Cada cliente conectado chamando csi0.snapshot() de forma independente é um desperdício, e, uma vez que dois streams estão abertos ao mesmo tempo, fica pior: o sensor entrega quadros em sua própria taxa, e cada captura duplicada deixa todos mais lentos. A abordagem correta é uma corrotina de captura que publica “o quadro mais recente” em um slot compartilhado, mais iteradores por cliente que leem do slot.

One capture task writes JPEG bytes to a single latest_jpeg slot; three stream-client iterators read from the slot and each wait on the shared new_frame event.

10.4.1. A tarefa de captura

Uma corrotina em segundo plano captura quadros tão rápido quanto o sensor os entrega, comprime cada um em JPEG dentro de um bytes compartilhado e pulsa um evento para que qualquer cliente em espera desperte:

latest_jpeg = None
new_frame = asyncio.Event()

async def capture_loop():
    global latest_jpeg
    while True:
        img = await csi0.snapshot()
        latest_jpeg = bytes(img.compress(quality=85).bytearray())
        new_frame.set()
        new_frame.clear()

O par set() / clear() é o padrão de pulso. set() desbloqueia de uma só vez todas as corrotinas que estão aguardando no evento; clear() reseta imediatamente o evento, de modo que o próximo wait() bloqueie novamente. Com múltiplos consumidores (um espectador, outro espectador, qualquer outra corrotina que precise reagir a um novo quadro), nenhum consumidor isolado é responsável por resetar o evento, e ninguém rouba o despertar de ninguém.

Nota

O envoltório bytes(...) ao redor do JPEG é fundamental aqui. bytearray() retorna uma view para dentro do buffer de imagem da câmera; a próxima chamada de snapshot() reescreve esse buffer no lugar com o quadro seguinte. latest_jpeg sobrevive ao img local, então, sem a cópia, cada leitor veria o slot mudar sob seus pés a cada captura.

10.4.2. Iteradores por cliente leem do slot

O handler de stream MJPEG para de chamar csi0.snapshot() por conta própria. Em vez disso, cada instância FrameStream aguarda no evento compartilhado e lê dos bytes compartilhados:

class FrameStream:
    # One instance per connected client. Each one independently
    # waits on the shared new_frame pulse; the capture loop is
    # responsible for resetting the event between frames.

    def __aiter__(self):
        return self

    async def __anext__(self):
        await new_frame.wait()
        if latest_jpeg is None:
            return b''
        return (b'--' + BOUNDARY + b'\r\n'
                b'Content-Type: image/jpeg\r\n\r\n'
                + latest_jpeg + b'\r\n')

A rota de snapshot também muda: ela não dispara mais uma captura, ela retorna o que quer que latest_jpeg esteja segurando no momento:

@app.get('/snapshot.jpg')
async def snapshot(request):
    if latest_jpeg is None:
        return 'no frame yet', 503
    return Response(
        body=latest_jpeg,
        headers={'Content-Type': 'image/jpeg'},
    )

A tupla (body, status) é a abreviação do microdot para definir um código de status HTTP sem construir um microdot.Response. 503 diz estou aqui mas não estou pronto – o código certo para “pergunte novamente em um instante”.

10.4.3. Rodando a captura junto com o servidor

main agora tem duas corrotinas de nível superior: o loop de captura e o servidor HTTP. asyncio.gather() roda ambas, e, se qualquer uma falhar, a outra é cancelada:

async def main():
    await asyncio.gather(
        capture_loop(),
        app.start_server(host='0.0.0.0', port=80),
    )

asyncio.run(main())

Agora o sensor lê um quadro por ciclo, não importa quantos espectadores estejam conectados. O primeiro navegador a acessar /stream.jpg vê quadros; o segundo também, o terceiro, o décimo – todos compartilham a mesma captura, e a câmera permanece igualmente responsiva em suas outras rotas.