microdot.sse — Server-Sent Events

Server-Sent Events (SSE) es un protocolo de push unidireccional simple: el cliente abre una solicitud HTTP normal a Content-Type: text/event-stream, y el servidor mantiene la conexión abierta y escribe eventos formateados a medida que ocurren. El navegador expone el stream como un objeto JavaScript EventSource. Comparado con WebSockets, SSE es más simple cuando solo el servidor hace push; el cliente siempre solicita.

El uso típico en cámara es publicar detecciones / lecturas de sensores / actualizaciones de estado a un teléfono o panel de control al ritmo en que ocurren, sin que el panel tenga que sondear.

class SSE

class microdot.sse.SSE

El manejador que recibe la ruta. Se pasa a los manejadores como segundo argumento cuando se usa with_sse().

async send(data, event: str | None = None, event_id: str | None = None, retry: int | None = None, comment: bool = False)

Envía un único evento al cliente.

data

La carga útil del evento. Las cadenas y los bytes se envían tal cual. Los dicts y las listas se serializan a JSON. Cualquier otra cosa se convierte con str().

event

Nombre de evento opcional – el EventSource del navegador despacha a un listener de JavaScript del mismo nombre cuando se establece.

event_id

Id de evento opcional – el navegador lo devuelve como Last-Event-ID si la conexión se cae y se reconecta, lo que permite al servidor reanudar desde donde lo dejó.

retry

Retardo de reconexión en segundos; el navegador lo usa si la conexión se cae.

comment

Si es True, la carga útil se envía como una línea de comentario SSE (ignorada por el navegador). Útil como latido keep-alive para evitar que los tiempos de espera de NAT cierren un stream que de otro modo estaría inactivo.

Decoradores a nivel de módulo

microdot.sse.with_sse(f)

Decorador que convierte una ruta en un endpoint SSE. El manejador recibe la solicitud y un objeto SSE:

from microdot import Microdot
from microdot.sse import with_sse

app = Microdot()

@app.get('/events')
@with_sse
async def events(request, sse):
    import asyncio
    while True:
        await sse.send({'temp': read_sensor()}, event='reading')
        await asyncio.sleep(1)

El tiempo de vida del manejador es igual al tiempo de vida del stream – se ejecuta mientras el cliente permanezca conectado. La cancelación (el cliente se desconecta, el servidor se apaga) se propaga como asyncio.CancelledError, que el framework absorbe.

microdot.sse.sse_response(request, event_function, *args, **kwargs)

Punto de entrada de bajo nivel: devuelve la tupla de respuesta que with_sse() devuelve internamente. Útil cuando un endpoint SSE necesita cabeceras o códigos de estado personalizados que el decorador no permite:

from microdot.sse import sse_response

@app.get('/events')
async def events(request):
    if not request.g.current_user:
        return 401
    async def emit(req, sse):
        await sse.send('hello')
    return sse_response(request, emit)

event_function toma (request, sse, *args, **kwargs) y usa await sse.send(...) para enviar eventos.

La respuesta SSE establece Content-Type: text/event-stream y mantiene la conexión abierta hasta que event_function retorna o el cliente se desconecta. Usa un bucle while True con un await asyncio.sleep entre envíos para evitar el bucle activo.