microdot.sse — Server-Sent Events

Server-Sent Events (SSE) – это простой однонаправленный протокол отправки: клиент открывает обычный HTTP-запрос к Content-Type: text/event-stream, а сервер держит соединение открытым и записывает отформатированные события по мере их возникновения. Браузер предоставляет поток в виде JavaScript-объекта EventSource. По сравнению с WebSockets, SSE проще, когда данные отправляет только сервер; клиент всегда запрашивает.

Типичное применение на камере – публикация обнаружений / показаний датчиков / обновлений состояния на телефон или панель мониторинга по мере их возникновения, без необходимости опроса со стороны панели.

class SSE

class microdot.sse.SSE

Дескриптор, который получает маршрут. Передаётся обработчикам в качестве второго аргумента, когда используется with_sse().

async send(data, event: str | None = None, event_id: str | None = None, retry: int | None = None, comment: bool = False)

Отправляет клиенту одно событие.

data

Полезная нагрузка события. Строки и байты отправляются как есть. Словари и списки сериализуются в JSON. Всё остальное преобразуется через str().

event

Необязательное имя события – EventSource браузера направляет его JavaScript-слушателю с тем же именем, когда оно задано.

event_id

Необязательный идентификатор события – браузер отправляет его обратно как Last-Event-ID, если соединение обрывается и восстанавливается, что позволяет серверу продолжить с того места, где он остановился.

retry

Задержка переподключения в секундах; браузер использует её, если соединение обрывается.

comment

Если True, полезная нагрузка отправляется как строка SSE-комментария (игнорируется браузером). Полезно как контрольное сообщение keep-alive, чтобы тайм-ауты NAT не закрывали в остальном простаивающий поток.

Декораторы уровня модуля

microdot.sse.with_sse(f)

Декоратор, превращающий маршрут в конечную точку SSE. Обработчик получает запрос и объект SSE:

from microdot import Microdot
from microdot.sse import with_sse

app = Microdot()

@app.get('/events')
@with_sse
async def events(request, sse):
    import asyncio
    while True:
        await sse.send({'temp': read_sensor()}, event='reading')
        await asyncio.sleep(1)

Время жизни обработчика равно времени жизни потока – он выполняется, пока клиент остаётся подключённым. Отмена (клиент отключается, сервер завершает работу) распространяется как asyncio.CancelledError, которую фреймворк поглощает.

microdot.sse.sse_response(request, event_function, *args, **kwargs)

Низкоуровневая точка входа: возвращает кортеж ответа, который with_sse() возвращает внутри. Полезно, когда конечной точке SSE нужны пользовательские заголовки или коды состояния, которые декоратор не допускает:

from microdot.sse import sse_response

@app.get('/events')
async def events(request):
    if not request.g.current_user:
        return 401
    async def emit(req, sse):
        await sse.send('hello')
    return sse_response(request, emit)

event_function принимает (request, sse, *args, **kwargs) и использует await sse.send(...) для отправки событий.

Ответ SSE устанавливает Content-Type: text/event-stream и держит соединение открытым, пока event_function не вернётся или клиент не отключится. Используйте цикл while True с await asyncio.sleep между отправками, чтобы избежать зацикливания без задержки.