microdot.sse — Server-Sent Events¶
Server-Sent Events (SSE) — це простий однонаправлений протокол push: клієнт відкриває звичайний HTTP-запит до Content-Type: text/event-stream, а сервер тримає з’єднання відкритим і записує відформатовані події в міру їх появи. Браузер надає потік як JavaScript-об’єкт EventSource. Порівняно з WebSockets, SSE простіший, коли лише сервер відправляє; клієнт завжди запитує.
Типове використання на камері – публікація результатів виявлення / показань датчиків / оновлень статусу на телефон або панель управління з тією частотою, з якою вони відбуваються, без необхідності опитування з боку панелі.
class SSE¶
- class microdot.sse.SSE¶
Дескриптор, який отримує маршрут. Передається обробникам як другий аргумент, коли використовується
with_sse().- async send(data, event: str | None = None, event_id: str | None = None, retry: int | None = None, comment: bool = False)¶
Надіслати одну подію клієнту.
- data
Payload події. Рядки та байти надсилаються як є. Dicts і списки серіалізуються в JSON. Все інше перетворюється через
str().- event
Необов’язкова назва події –
EventSourceбраузера направляє до JavaScript-слухача з такою самою назвою, коли це встановлено.- event_id
Необов’язковий ідентифікатор події – браузер надсилає його назад як
Last-Event-ID, якщо з’єднання обривається і відновлюється, що дозволяє серверу продовжити з того місця, де він зупинився.- retry
Затримка перепідключення у секундах; браузер використовує це, якщо з’єднання обривається.
- comment
Якщо
True, payload надсилається як коментар SSE (ігнорується браузером). Корисно як keepalive-heartbeat для запобігання закриттю NAT-таймаутами інакше неактивного потоку.
Декоратори рівня модуля¶
- microdot.sse.with_sse(f)¶
Декоратор, що перетворює маршрут на SSE-кінцеву точку. Обробник отримує запит та об’єкт
SSEfrom microdot import Microdot from microdot.sse import with_sse app = Microdot() @app.get('/events') @with_sse async def events(request, sse): import asyncio while True: await sse.send({'temp': read_sensor()}, event='reading') await asyncio.sleep(1)
Час життя обробника дорівнює часу життя потоку – він виконується протягом усього часу, поки клієнт залишається підключеним. Скасування (клієнт відключається, сервер вимикається) поширюється як
asyncio.CancelledError, який фреймворк поглинає.
- microdot.sse.sse_response(request, event_function, *args, **kwargs)¶
Низькорівнева точка входу: повертає кортеж відповіді, який
with_sse()повертає під капотом. Корисно, коли SSE-кінцева точка потребує спеціальних заголовків або кодів статусу, які декоратор не дозволяє:from microdot.sse import sse_response @app.get('/events') async def events(request): if not request.g.current_user: return 401 async def emit(req, sse): await sse.send('hello') return sse_response(request, emit)
event_function приймає
(request, sse, *args, **kwargs)і використовуєawait sse.send(...)для відправлення подій.
SSE-відповідь встановлює Content-Type: text/event-stream і тримає з’єднання відкритим до повернення event_function або відключення клієнта. Використовуйте цикл while True з await asyncio.sleep між надсиланнями, щоб уникнути активного циклу.