9.14. 使用 asyncio 的套接字

阻塞式的 recv() 调用会让整个脚本冻结,直到有字节到达为止。阻塞式的 accept() 调用一次只能服务一个客户端。这两种情况正是 asyncio 为之而生的“等待 I/O”场景。asyncio 章节 涵盖了事件循环、协程以及同步原语;本页介绍与网络相关的部分。

asyncio 模块通过少数几个辅助函数来暴露网络功能,这些辅助函数接收并返回——一种封装套接字并提供可 await 的读写版本的高层对象。底层套接字依然存在;应用程序只是不直接接触它而已。

9.14.1. 使用 asyncio 的客户端

asyncio.open_connection()socket.socket.connect() 在 asyncio 中的对应物。它打开一个 TCP 连接并返回两个流对象:一个读取器(reader)和一个写入器(writer):

import asyncio

async def client():
    reader, writer = await asyncio.open_connection("192.168.1.20", 9000)

    writer.write(b"hello\n")
    await writer.drain()                   # wait until bytes have been sent

    reply = await reader.readline()
    print("reply:", reply)

    writer.close()
    await writer.wait_closed()

asyncio.run(client())

有三点需要注意:

  • 建立连接只需一次 await,而不是一次阻塞调用。在握手进行期间,事件循环可以自由地运行其他协程。

  • write() 将字节放入出站缓冲区;drain() 则是那个让出控制权给循环的 await,直到这些字节真正通过网络发送出去为止。

  • readline() 持续读取字节直到出现换行符为止。流类还包含 read()(最多读取 N 个字节)和 readexactly()(恰好读取 N 个字节),它们无需手写组帧循环即可解决 TCP 的消息边界问题。

9.14.2. 使用 asyncio 的服务器

asyncio.start_server() 是 bind/listen/accept 这套流程在 asyncio 中的对应物。它接收一个回调,该回调会针对每个传入连接运行一次,使用的正是客户端一侧所用的同一对读取器/写入器:

import asyncio

async def handle(reader, writer):
    addr = writer.get_extra_info("peername")
    print("connection from", addr)

    while True:
        data = await reader.read(1024)
        if not data:
            break
        writer.write(data)                 # echo back
        await writer.drain()

    writer.close()
    await writer.wait_closed()

async def main():
    server = await asyncio.start_server(handle, "0.0.0.0", 9000)
    print("listening on", server.sockets[0].getsockname())
    async with server:
        await server.serve_forever()

asyncio.run(main())

每个被接受的连接都会成为运行 handle 的独立任务。事件循环会在它们之间自然地分派——一个缓慢的客户端无法阻塞其他客户端,因为当它在 await reader.read(...) 上等待时,循环可以自由地推进其他所有连接的进度。增加十个并发客户端并不需要十个线程;同一个单线程事件循环就能驱动它们全部。

这正是为什么用 asyncio 编写的摄像头网络应用比等价的阻塞式代码扩展性好得多的实际原因:TCP 套接字 中的服务器画面是一次只服务一个客户端;而这里则是一次服务多个客户端,且无需任何额外的工作。

9.14.3. 在网络之外同时进行并发工作

最大的回报在于,能在同一个循环里将网络处理与摄像头的其余工作混合在一起。摄像头可以采集一帧、运行图像处理,并且服务一个网络协议,所有这些都交错进行:

import asyncio

async def capture_loop():
    while True:
        img = await camera.snapshot()
        # process img ...
        await asyncio.sleep_ms(100)

async def handle(reader, writer):
    ...

async def main():
    server = await asyncio.start_server(handle, "0.0.0.0", 9000)
    await asyncio.gather(
        server.serve_forever(),
        capture_loop(),
    )

asyncio.run(main())

asyncio.gather() 在同一个事件循环上运行这两个协程。当摄像头在两帧之间于 sleep_ms() 中休眠时,服务器得以分派网络流量。当服务器在等待下一个字节时,摄像头得以进行采集。两者都在单个 MicroPython 线程上推进进度。

9.14.4. 使用 asyncio 的 UDP

asyncio 模块没有为 UDP 提供同样的高层流——数据报并不契合流的读/写形态。在摄像头上的实用做法是将 UDP 工作放进它自己的协程,把套接字切换为非阻塞模式,并在两次读取尝试之间让出控制权给事件循环:

import asyncio
import socket

async def udp_listener(port):
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.setblocking(False)
    s.bind(("0.0.0.0", port))

    while True:
        try:
            data, src = s.recvfrom(1024)
        except OSError:
            await asyncio.sleep_ms(10)
            continue
        print("got", data, "from", src)

套接字通过 s.setblocking(False) 被设为非阻塞,因此当没有数据报在等待时,recvfrom() 会立即抛出 OSError,而不是阻塞整个事件循环。空分支中的 await asyncio.sleep_ms(10) 会把控制权交还给事件循环,直到下一次轮询。

发送遵循同样的形态:在非阻塞套接字上的 sendto() 要么立即成功,要么抛出异常。没有 sendallto——UDP 数据报是原子的,所以每次发送要么是一整个数据报,要么什么都没发。如果发送缓冲区已满,对 UDP 而言通常正确的做法是丢弃该数据报,让下一个数据报在下一次循环时发出去:

async def udp_telemetry(target_addr, period_ms):
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.setblocking(False)

    while True:
        payload = collect_telemetry()
        try:
            s.sendto(payload, target_addr)
        except OSError:
            pass                # buffer full -- skip this one

        await asyncio.sleep_ms(period_ms)

失败分支在实践中很少见。UDP 没有流量控制,所以 sendto() 几乎总是在第一次尝试就成功;那个 except 主要是为了让短暂的网络小故障不致使协程崩溃。

Asyncio 一节涵盖了将阻塞式 I/O 混入 asyncio 程序的更广泛模式;同样的模式可直接应用于 UDP 套接字。

9.14.5. 超时与取消

把一个网络调用包装进 asyncio.wait_for() 即可为它设定一个截止期限:

try:
    reply = await asyncio.wait_for(reader.readline(), timeout=2.0)
except asyncio.TimeoutError:
    print("server is slow")

一个耗时过长的协程也可以从别处被 cancel() 取消。这两种机制都在 协调章节 中有详细介绍;它们原封不动地适用于 asyncio.open_connection()asyncio.start_server() 所返回的流。

如需 Stream 的完整参考(即读取器和写入器背后的那个类,以及本页顺带用到的辅助函数),请参阅 asyncio --- 异步 I/O 调度器