microdot.sse — Server-Sent Events¶
Server-Sent Events (SSE) es un protocolo de push unidireccional simple: el cliente abre una solicitud HTTP normal a Content-Type: text/event-stream, y el servidor mantiene la conexión abierta y escribe eventos formateados a medida que ocurren. El navegador expone el stream como un objeto JavaScript EventSource. Comparado con WebSockets, SSE es más simple cuando solo el servidor hace push; el cliente siempre solicita.
El uso típico en cámara es publicar detecciones / lecturas de sensores / actualizaciones de estado a un teléfono o panel de control al ritmo en que ocurren, sin que el panel tenga que sondear.
class SSE¶
- class microdot.sse.SSE¶
El manejador que recibe la ruta. Se pasa a los manejadores como segundo argumento cuando se usa
with_sse().- async send(data, event: str | None = None, event_id: str | None = None, retry: int | None = None, comment: bool = False)¶
Envía un único evento al cliente.
- data
La carga útil del evento. Las cadenas y los bytes se envían tal cual. Los dicts y las listas se serializan a JSON. Cualquier otra cosa se convierte con
str().- event
Nombre de evento opcional – el
EventSourcedel navegador despacha a un listener de JavaScript del mismo nombre cuando se establece.- event_id
Id de evento opcional – el navegador lo devuelve como
Last-Event-IDsi la conexión se cae y se reconecta, lo que permite al servidor reanudar desde donde lo dejó.- retry
Retardo de reconexión en segundos; el navegador lo usa si la conexión se cae.
- comment
Si es
True, la carga útil se envía como una línea de comentario SSE (ignorada por el navegador). Útil como latido keep-alive para evitar que los tiempos de espera de NAT cierren un stream que de otro modo estaría inactivo.
Decoradores a nivel de módulo¶
- microdot.sse.with_sse(f)¶
Decorador que convierte una ruta en un endpoint SSE. El manejador recibe la solicitud y un objeto
SSE:from microdot import Microdot from microdot.sse import with_sse app = Microdot() @app.get('/events') @with_sse async def events(request, sse): import asyncio while True: await sse.send({'temp': read_sensor()}, event='reading') await asyncio.sleep(1)
El tiempo de vida del manejador es igual al tiempo de vida del stream – se ejecuta mientras el cliente permanezca conectado. La cancelación (el cliente se desconecta, el servidor se apaga) se propaga como
asyncio.CancelledError, que el framework absorbe.
- microdot.sse.sse_response(request, event_function, *args, **kwargs)¶
Punto de entrada de bajo nivel: devuelve la tupla de respuesta que
with_sse()devuelve internamente. Útil cuando un endpoint SSE necesita cabeceras o códigos de estado personalizados que el decorador no permite:from microdot.sse import sse_response @app.get('/events') async def events(request): if not request.g.current_user: return 401 async def emit(req, sse): await sse.send('hello') return sse_response(request, emit)
event_function toma
(request, sse, *args, **kwargs)y usaawait sse.send(...)para enviar eventos.
La respuesta SSE establece Content-Type: text/event-stream y mantiene la conexión abierta hasta que event_function retorna o el cliente se desconecta. Usa un bucle while True con un await asyncio.sleep entre envíos para evitar el bucle activo.