microdot.sse — Server-Sent Events¶
Server-Sent Events (SSE) é um protocolo de push unidirecional simples: o cliente abre uma requisição HTTP normal para Content-Type: text/event-stream, e o servidor mantém a conexão aberta e escreve eventos formatados à medida que ocorrem. O navegador expõe o stream como um objeto JavaScript EventSource. Em comparação com WebSockets, o SSE é mais simples quando apenas o servidor faz push; o cliente sempre faz a requisição.
O uso típico em câmera é publicar detecções / leituras de sensores / atualizações de status para um telefone ou painel à taxa em que ocorrem, sem que o painel precise fazer polling.
class SSE¶
- class microdot.sse.SSE¶
O handle que a rota recebe. Passado aos manipuladores como o segundo argumento quando
with_sse()está em uso.- async send(data, event: str | None = None, event_id: str | None = None, retry: int | None = None, comment: bool = False)¶
Envia um único evento ao cliente.
- data
O payload do evento. Strings e bytes são enviados como estão. Dicts e lists são serializados em JSON. Qualquer outra coisa é convertida com
str().- event
Nome de evento opcional – o
EventSourcedo navegador despacha para um listener JavaScript de mesmo nome quando isto é definido.- event_id
Id de evento opcional – o navegador o envia de volta como
Last-Event-IDse a conexão cair e reconectar, o que permite ao servidor retomar de onde parou.- retry
Atraso de reconexão em segundos; o navegador usa isto se a conexão cair.
- comment
Se
True, o payload é enviado como uma linha de comentário SSE (ignorada pelo navegador). Útil como um heartbeat de keep-alive para impedir que timeouts de NAT fechem um stream que de outra forma estaria ocioso.
Decoradores de nível de módulo¶
- microdot.sse.with_sse(f)¶
Decorador que transforma uma rota em um endpoint SSE. O manipulador recebe a requisição e um objeto
SSEfrom microdot import Microdot from microdot.sse import with_sse app = Microdot() @app.get('/events') @with_sse async def events(request, sse): import asyncio while True: await sse.send({'temp': read_sensor()}, event='reading') await asyncio.sleep(1)
O tempo de vida do manipulador é igual ao tempo de vida do stream – ele é executado enquanto o cliente permanecer conectado. O cancelamento (o cliente se desconecta, o servidor é encerrado) propaga-se como
asyncio.CancelledError, que o framework absorve.
- microdot.sse.sse_response(request, event_function, *args, **kwargs)¶
Ponto de entrada de baixo nível: retorna a tupla de resposta que
with_sse()retorna por baixo dos panos. Útil quando um endpoint SSE precisa de cabeçalhos personalizados ou códigos de status que o decorador não permite:from microdot.sse import sse_response @app.get('/events') async def events(request): if not request.g.current_user: return 401 async def emit(req, sse): await sse.send('hello') return sse_response(request, emit)
event_function recebe
(request, sse, *args, **kwargs)e usaawait sse.send(...)para enviar eventos.
A resposta SSE define Content-Type: text/event-stream e mantém a conexão aberta até que event_function retorne ou o cliente se desconecte. Use um laço while True com um await asyncio.sleep entre os envios para evitar laços em alta frequência.