microdot.sse — Server-Sent Events¶
I Server-Sent Events (SSE) sono un semplice protocollo di push unidirezionale: il client apre una normale richiesta HTTP verso Content-Type: text/event-stream, e il server mantiene la connessione aperta e scrive eventi formattati man mano che si verificano. Il browser espone lo stream come oggetto JavaScript EventSource. Rispetto ai WebSocket, gli SSE sono più semplici quando solo il server effettua il push; il client effettua sempre la richiesta.
L’uso tipico con una camera è la pubblicazione di rilevamenti / letture del sensore / aggiornamenti di stato verso un telefono o una dashboard alla velocità con cui si verificano, senza che la dashboard debba effettuare polling.
class SSE¶
- class microdot.sse.SSE¶
L’handle che la route riceve. Passato agli handler come secondo argomento quando
with_sse()è in uso.- async send(data, event: str | None = None, event_id: str | None = None, retry: int | None = None, comment: bool = False)¶
Invia un singolo evento al client.
- data
Il payload dell’evento. Stringhe e byte vengono inviati così come sono. I dict e le liste vengono serializzati in JSON. Qualsiasi altra cosa viene convertita con
str().- event
Nome dell’evento opzionale – l”
EventSourcedel browser invia l’evento a un listener JavaScript con lo stesso nome quando è impostato.- event_id
ID dell’evento opzionale – il browser lo rispedisce come
Last-Event-IDse la connessione cade e si riconnette, il che consente al server di riprendere dal punto in cui si era interrotto.- retry
Ritardo di riconnessione in secondi; il browser lo usa se la connessione cade.
- comment
Se
True, il payload viene inviato come riga di commento SSE (ignorata dal browser). Utile come heartbeat keep-alive per impedire ai timeout NAT di chiudere uno stream altrimenti inattivo.
Decoratori a livello di modulo¶
- microdot.sse.with_sse(f)¶
Decoratore che trasforma una route in un endpoint SSE. L’handler riceve la request e un oggetto
SSEfrom microdot import Microdot from microdot.sse import with_sse app = Microdot() @app.get('/events') @with_sse async def events(request, sse): import asyncio while True: await sse.send({'temp': read_sensor()}, event='reading') await asyncio.sleep(1)
La durata di vita dell’handler è pari a quella dello stream – viene eseguito finché il client rimane connesso. La cancellazione (il client si disconnette, il server si arresta) si propaga come
asyncio.CancelledError, che il framework assorbe.
- microdot.sse.sse_response(request, event_function, *args, **kwargs)¶
Punto di ingresso a basso livello: restituisce la tupla di risposta che
with_sse()restituisce internamente. Utile quando un endpoint SSE necessita di header o codici di stato personalizzati che il decoratore non consente:from microdot.sse import sse_response @app.get('/events') async def events(request): if not request.g.current_user: return 401 async def emit(req, sse): await sse.send('hello') return sse_response(request, emit)
event_function riceve
(request, sse, *args, **kwargs)e usaawait sse.send(...)per effettuare il push degli eventi.
La risposta SSE imposta Content-Type: text/event-stream e mantiene la connessione aperta finché event_function non ritorna o il client non si disconnette. Usare un loop while True con un await asyncio.sleep tra gli invii per evitare l’hot-looping.