microdot.sse --- 服务器发送事件

服务器发送事件(Server-Sent Events,SSE)是一种简单的单向推送协议:客户端向 Content-Type: text/event-stream 打开一个普通的 HTTP 请求,服务器保持连接打开,并在事件发生时写出格式化的事件。浏览器将该流以 JavaScript EventSource 对象的形式暴露出来。与 WebSocket 相比,当只有服务器进行推送时,SSE 更简单;客户端始终是发起请求的一方。

典型的摄像头用途是按事件发生的速率向手机或仪表盘发布检测结果 / 传感器读数 / 状态更新,而无需仪表盘进行轮询。

class SSE

class microdot.sse.SSE

路由所接收的句柄。当使用 with_sse() 时,作为第二个参数传递给处理器。

async send(data, event: str | None = None, event_id: str | None = None, retry: int | None = None, comment: bool = False)

向客户端推送单个事件。

data

事件载荷。字符串和字节原样发送。Dict 和 list 序列化为 JSON。其他任何内容都会经 str() 转换。

event

可选的事件名称 -- 设置此项时,浏览器的 EventSource 会将其分发给同名的 JavaScript 监听器。

event_id

可选的事件 id -- 如果连接断开并重连,浏览器会以 Last-Event-ID 形式将其发回,这使服务器能够从中断处恢复。

retry

重连延迟,以秒为单位;如果连接断开,浏览器会使用此值。

comment

若为 True,则载荷作为一个 SSE 注释行发送(被浏览器忽略)。可用作保活心跳,以阻止 NAT 超时关闭一个原本空闲的流。

模块级装饰器

microdot.sse.with_sse(f)

装饰器,将一个路由变成 SSE 端点。处理器接收请求和一个 SSE 对象:

from microdot import Microdot
from microdot.sse import with_sse

app = Microdot()

@app.get('/events')
@with_sse
async def events(request, sse):
    import asyncio
    while True:
        await sse.send({'temp': read_sensor()}, event='reading')
        await asyncio.sleep(1)

处理器的生命周期等同于流的生命周期 -- 只要客户端保持连接,它就会一直运行。取消(客户端断开连接、服务器关闭)会以 asyncio.CancelledError 形式传播,框架会将其吞掉。

microdot.sse.sse_response(request, event_function, *args, **kwargs)

底层入口点:返回 with_sse() 在底层所返回的响应元组。当一个 SSE 端点需要装饰器不允许的自定义头部或状态码时很有用:

from microdot.sse import sse_response

@app.get('/events')
async def events(request):
    if not request.g.current_user:
        return 401
    async def emit(req, sse):
        await sse.send('hello')
    return sse_response(request, emit)

event_function 接收 (request, sse, *args, **kwargs),并使用 await sse.send(...) 来推送事件。

SSE 响应设置 Content-Type: text/event-stream 并保持连接打开,直到 event_function 返回或客户端断开连接。请使用 while True 循环,并在两次发送之间加入 await asyncio.sleep,以避免热循环。