microdot.sse — Server-Sent Events

Server-Sent Events (SSE) — це простий однонаправлений протокол push: клієнт відкриває звичайний HTTP-запит до Content-Type: text/event-stream, а сервер тримає з’єднання відкритим і записує відформатовані події в міру їх появи. Браузер надає потік як JavaScript-об’єкт EventSource. Порівняно з WebSockets, SSE простіший, коли лише сервер відправляє; клієнт завжди запитує.

Типове використання на камері – публікація результатів виявлення / показань датчиків / оновлень статусу на телефон або панель управління з тією частотою, з якою вони відбуваються, без необхідності опитування з боку панелі.

class SSE

class microdot.sse.SSE

Дескриптор, який отримує маршрут. Передається обробникам як другий аргумент, коли використовується with_sse().

async send(data, event: str | None = None, event_id: str | None = None, retry: int | None = None, comment: bool = False)

Надіслати одну подію клієнту.

data

Payload події. Рядки та байти надсилаються як є. Dicts і списки серіалізуються в JSON. Все інше перетворюється через str().

event

Необов’язкова назва події – EventSource браузера направляє до JavaScript-слухача з такою самою назвою, коли це встановлено.

event_id

Необов’язковий ідентифікатор події – браузер надсилає його назад як Last-Event-ID, якщо з’єднання обривається і відновлюється, що дозволяє серверу продовжити з того місця, де він зупинився.

retry

Затримка перепідключення у секундах; браузер використовує це, якщо з’єднання обривається.

comment

Якщо True, payload надсилається як коментар SSE (ігнорується браузером). Корисно як keepalive-heartbeat для запобігання закриттю NAT-таймаутами інакше неактивного потоку.

Декоратори рівня модуля

microdot.sse.with_sse(f)

Декоратор, що перетворює маршрут на SSE-кінцеву точку. Обробник отримує запит та об’єкт SSE

from microdot import Microdot
from microdot.sse import with_sse

app = Microdot()

@app.get('/events')
@with_sse
async def events(request, sse):
    import asyncio
    while True:
        await sse.send({'temp': read_sensor()}, event='reading')
        await asyncio.sleep(1)

Час життя обробника дорівнює часу життя потоку – він виконується протягом усього часу, поки клієнт залишається підключеним. Скасування (клієнт відключається, сервер вимикається) поширюється як asyncio.CancelledError, який фреймворк поглинає.

microdot.sse.sse_response(request, event_function, *args, **kwargs)

Низькорівнева точка входу: повертає кортеж відповіді, який with_sse() повертає під капотом. Корисно, коли SSE-кінцева точка потребує спеціальних заголовків або кодів статусу, які декоратор не дозволяє:

from microdot.sse import sse_response

@app.get('/events')
async def events(request):
    if not request.g.current_user:
        return 401
    async def emit(req, sse):
        await sse.send('hello')
    return sse_response(request, emit)

event_function приймає (request, sse, *args, **kwargs) і використовує await sse.send(...) для відправлення подій.

SSE-відповідь встановлює Content-Type: text/event-stream і тримає з’єднання відкритим до повернення event_function або відключення клієнта. Використовуйте цикл while True з await asyncio.sleep між надсиланнями, щоб уникнути активного циклу.