microdot.sse — Server-Sent Events¶
Server-Sent Events (SSE) is een eenvoudig eenrichtings-pushprotocol: de client opent een normaal HTTP-request naar Content-Type: text/event-stream, en de server houdt de verbinding open en schrijft geformatteerde events zodra ze plaatsvinden. De browser stelt de stream beschikbaar als een JavaScript EventSource-object. Vergeleken met WebSockets is SSE eenvoudiger wanneer alleen de server pusht; de client doet altijd het verzoek.
Het typische cameragebruik is het publiceren van detecties / sensoraflezingen / statusupdates naar een telefoon of dashboard met de snelheid waarmee ze plaatsvinden, zonder dat het dashboard hoeft te pollen.
class SSE¶
- class microdot.sse.SSE¶
De handle die de route ontvangt. Wordt als tweede argument aan handlers doorgegeven wanneer
with_sse()in gebruik is.- async send(data, event: str | None = None, event_id: str | None = None, retry: int | None = None, comment: bool = False)¶
Push een enkel event naar de client.
- data
De event-payload. Strings en bytes worden as-is verzonden. Dicts en lists worden geserialiseerd naar JSON. Al het andere wordt geconverteerd met
str().- event
Optionele eventnaam – de
EventSourcevan de browser dispatcht naar een JavaScript-listener met dezelfde naam wanneer dit is ingesteld.- event_id
Optionele event-id – de browser stuurt deze terug als
Last-Event-IDals de verbinding wegvalt en opnieuw verbinding maakt, waardoor de server kan hervatten waar hij was gebleven.- retry
Vertraging voor opnieuw verbinden in seconden; de browser gebruikt dit als de verbinding wegvalt.
- comment
Indien
Truewordt de payload verzonden als een SSE-commentaarregel (genegeerd door de browser). Handig als keep-alive-hartslag om te voorkomen dat NAT-timeouts een verder inactieve stream sluiten.
Decorators op moduleniveau¶
- microdot.sse.with_sse(f)¶
Decorator die een route omzet in een SSE-endpoint. De handler ontvangt het request en een
SSE-object:from microdot import Microdot from microdot.sse import with_sse app = Microdot() @app.get('/events') @with_sse async def events(request, sse): import asyncio while True: await sse.send({'temp': read_sensor()}, event='reading') await asyncio.sleep(1)
De levensduur van de handler is gelijk aan de levensduur van de stream – hij draait zolang de client verbonden blijft. Annulering (de client verbreekt de verbinding, de server sluit af) propageert als
asyncio.CancelledError, die het framework opvangt.
- microdot.sse.sse_response(request, event_function, *args, **kwargs)¶
Laag-niveau-ingang: geeft de respons-tuple terug die
with_sse()onder de motorkap teruggeeft. Handig wanneer een SSE-endpoint aangepaste headers of statuscodes nodig heeft die de decorator niet toestaat:from microdot.sse import sse_response @app.get('/events') async def events(request): if not request.g.current_user: return 401 async def emit(req, sse): await sse.send('hello') return sse_response(request, emit)
event_function neemt
(request, sse, *args, **kwargs)en gebruiktawait sse.send(...)om events te pushen.
De SSE-respons stelt Content-Type: text/event-stream in en houdt de verbinding open totdat event_function terugkeert of de client de verbinding verbreekt. Gebruik een while True-lus met een await asyncio.sleep tussen verzendingen om hot-looping te voorkomen.