microdot.sse — Server-Sent Events¶
Os Server-Sent Events (SSE) são um protocolo de envio unidirecional simples: o cliente abre um pedido HTTP normal para Content-Type: text/event-stream, e o servidor mantém a ligação aberta e escreve eventos formatados à medida que acontecem. O browser expõe o stream como um objeto JavaScript EventSource. Comparando com WebSockets, o SSE é mais simples quando apenas o servidor envia; o cliente solicita sempre.
O caso de uso típico de câmara é publicar deteções / leituras de sensor / atualizações de estado para um telemóvel ou painel de controlo à medida que acontecem, sem que o painel tenha de fazer polling.
class SSE¶
- class microdot.sse.SSE¶
O handle que a rota recebe. Passado aos handlers como segundo argumento quando
with_sse()está em uso.- async send(data, event: str | None = None, event_id: str | None = None, retry: int | None = None, comment: bool = False)¶
Envia um único evento ao cliente.
- data
O payload do evento. Strings e bytes são enviados tal como estão. Dicts e listas são serializados para JSON. Qualquer outra coisa é convertida com
str().- event
Nome de evento opcional – o
EventSourcedo browser despacha para um ouvinte JavaScript com o mesmo nome quando este está definido.- event_id
ID de evento opcional – o browser envia-o de volta como
Last-Event-IDse a ligação cair e reconectar, o que permite ao servidor retomar a partir do ponto onde parou.- retry
Atraso de reconexão em segundos; o browser utiliza este valor se a ligação cair.
- comment
Se
True, o payload é enviado como uma linha de comentário SSE (ignorada pelo browser). Útil como heartbeat de keep-alive para evitar que timeouts de NAT fechem um stream que de outra forma estaria inativo.
Decoradores ao nível do módulo¶
- microdot.sse.with_sse(f)¶
Decorador que transforma uma rota num endpoint SSE. O handler recebe o pedido e um objeto
SSEfrom microdot import Microdot from microdot.sse import with_sse app = Microdot() @app.get('/events') @with_sse async def events(request, sse): import asyncio while True: await sse.send({'temp': read_sensor()}, event='reading') await asyncio.sleep(1)
O tempo de vida do handler é igual ao tempo de vida do stream – é executado enquanto o cliente permanecer ligado. O cancelamento (o cliente desliga-se, o servidor encerra) propaga-se como
asyncio.CancelledError, que o framework absorve.
- microdot.sse.sse_response(request, event_function, *args, **kwargs)¶
Ponto de entrada de baixo nível: devolve o tuplo de resposta que
with_sse()devolve internamente. Útil quando um endpoint SSE precisa de cabeçalhos personalizados ou códigos de estado que o decorador não permite:from microdot.sse import sse_response @app.get('/events') async def events(request): if not request.g.current_user: return 401 async def emit(req, sse): await sse.send('hello') return sse_response(request, emit)
event_function recebe
(request, sse, *args, **kwargs)e utilizaawait sse.send(...)para enviar eventos.
A resposta SSE define Content-Type: text/event-stream e mantém a ligação aberta até event_function retornar ou o cliente desligar. Use um ciclo while True com await asyncio.sleep entre envios para evitar hot-looping.