microdot.sse — Server-Sent Events¶
Server-Sent Events (SSE) – это простой однонаправленный протокол отправки: клиент открывает обычный HTTP-запрос к Content-Type: text/event-stream, а сервер держит соединение открытым и записывает отформатированные события по мере их возникновения. Браузер предоставляет поток в виде JavaScript-объекта EventSource. По сравнению с WebSockets, SSE проще, когда данные отправляет только сервер; клиент всегда запрашивает.
Типичное применение на камере – публикация обнаружений / показаний датчиков / обновлений состояния на телефон или панель мониторинга по мере их возникновения, без необходимости опроса со стороны панели.
class SSE¶
- class microdot.sse.SSE¶
Дескриптор, который получает маршрут. Передаётся обработчикам в качестве второго аргумента, когда используется
with_sse().- async send(data, event: str | None = None, event_id: str | None = None, retry: int | None = None, comment: bool = False)¶
Отправляет клиенту одно событие.
- data
Полезная нагрузка события. Строки и байты отправляются как есть. Словари и списки сериализуются в JSON. Всё остальное преобразуется через
str().- event
Необязательное имя события –
EventSourceбраузера направляет его JavaScript-слушателю с тем же именем, когда оно задано.- event_id
Необязательный идентификатор события – браузер отправляет его обратно как
Last-Event-ID, если соединение обрывается и восстанавливается, что позволяет серверу продолжить с того места, где он остановился.- retry
Задержка переподключения в секундах; браузер использует её, если соединение обрывается.
- comment
Если
True, полезная нагрузка отправляется как строка SSE-комментария (игнорируется браузером). Полезно как контрольное сообщение keep-alive, чтобы тайм-ауты NAT не закрывали в остальном простаивающий поток.
Декораторы уровня модуля¶
- microdot.sse.with_sse(f)¶
Декоратор, превращающий маршрут в конечную точку SSE. Обработчик получает запрос и объект
SSE:from microdot import Microdot from microdot.sse import with_sse app = Microdot() @app.get('/events') @with_sse async def events(request, sse): import asyncio while True: await sse.send({'temp': read_sensor()}, event='reading') await asyncio.sleep(1)
Время жизни обработчика равно времени жизни потока – он выполняется, пока клиент остаётся подключённым. Отмена (клиент отключается, сервер завершает работу) распространяется как
asyncio.CancelledError, которую фреймворк поглощает.
- microdot.sse.sse_response(request, event_function, *args, **kwargs)¶
Низкоуровневая точка входа: возвращает кортеж ответа, который
with_sse()возвращает внутри. Полезно, когда конечной точке SSE нужны пользовательские заголовки или коды состояния, которые декоратор не допускает:from microdot.sse import sse_response @app.get('/events') async def events(request): if not request.g.current_user: return 401 async def emit(req, sse): await sse.send('hello') return sse_response(request, emit)
event_function принимает
(request, sse, *args, **kwargs)и используетawait sse.send(...)для отправки событий.
Ответ SSE устанавливает Content-Type: text/event-stream и держит соединение открытым, пока event_function не вернётся или клиент не отключится. Используйте цикл while True с await asyncio.sleep между отправками, чтобы избежать зацикливания без задержки.