10.4. Compartilhando um loop de captura entre espectadores

Cada cliente conectado chamando csi0.snapshot() de forma independente é um desperdício, e, uma vez que dois streams estão abertos ao mesmo tempo, fica pior: o sensor entrega quadros em sua própria taxa, e cada captura duplicada deixa todos mais lentos. A abordagem correta é uma corrotina de captura que publica “o quadro mais recente” em um slot compartilhado, mais iteradores por cliente que leem do slot.

Uma tarefa de captura escreve bytes JPEG em um único slot latest_jpeg; três iteradores cliente-de-stream leem do slot e cada um aguarda no evento compartilhado new_frame.

10.4.1. A tarefa de captura

Uma corrotina em segundo plano captura quadros tão rápido quanto o sensor os entrega, comprime cada um em JPEG dentro de um bytes compartilhado e pulsa um evento para que qualquer cliente em espera desperte:

latest_jpeg = None
new_frame = asyncio.Event()

async def capture_loop():
    global latest_jpeg
    while True:
        img = await csi0.snapshot()
        latest_jpeg = bytes(img.compress(quality=85).bytearray())
        new_frame.set()
        new_frame.clear()

O par set() / clear() é o padrão de pulso. set() desbloqueia de uma só vez todas as corrotinas que estão aguardando no evento; clear() reseta imediatamente o evento, de modo que o próximo wait() bloqueie novamente. Com múltiplos consumidores (um espectador, outro espectador, qualquer outra corrotina que precise reagir a um novo quadro), nenhum consumidor isolado é responsável por resetar o evento, e ninguém rouba o despertar de ninguém.

Nota

O envoltório bytes(...) ao redor do JPEG é fundamental aqui. bytearray() retorna uma view para dentro do buffer de imagem da câmera; a próxima chamada de snapshot() reescreve esse buffer no lugar com o quadro seguinte. latest_jpeg sobrevive ao img local, então, sem a cópia, cada leitor veria o slot mudar sob seus pés a cada captura.

10.4.2. Iteradores por cliente leem do slot

O handler de stream MJPEG para de chamar csi0.snapshot() por conta própria. Em vez disso, cada instância FrameStream aguarda no evento compartilhado e lê dos bytes compartilhados:

class FrameStream:
    # One instance per connected client. Each one independently
    # waits on the shared new_frame pulse; the capture loop is
    # responsible for resetting the event between frames.

    def __aiter__(self):
        return self

    async def __anext__(self):
        await new_frame.wait()
        if latest_jpeg is None:
            return b''
        return (b'--' + BOUNDARY + b'\r\n'
                b'Content-Type: image/jpeg\r\n\r\n'
                + latest_jpeg + b'\r\n')

A rota de snapshot também muda: ela não dispara mais uma captura, ela retorna o que quer que latest_jpeg esteja segurando no momento:

@app.get('/snapshot.jpg')
async def snapshot(request):
    if latest_jpeg is None:
        return 'no frame yet', 503
    return Response(
        body=latest_jpeg,
        headers={'Content-Type': 'image/jpeg'},
    )

A tupla (body, status) é a abreviação do microdot para definir um código de status HTTP sem construir um microdot.Response. 503 diz estou aqui mas não estou pronto – o código certo para “pergunte novamente em um instante”.

10.4.3. Rodando a captura junto com o servidor

main agora tem duas corrotinas de nível superior: o loop de captura e o servidor HTTP. asyncio.gather() roda ambas, e, se qualquer uma falhar, a outra é cancelada:

async def main():
    await asyncio.gather(
        capture_loop(),
        app.start_server(host='0.0.0.0', port=80),
    )

asyncio.run(main())

Agora o sensor lê um quadro por ciclo, não importa quantos espectadores estejam conectados. O primeiro navegador a acessar /stream.jpg vê quadros; o segundo também, o terceiro, o décimo – todos compartilham a mesma captura, e a câmera permanece igualmente responsiva em suas outras rotas.