9.14. 使用 asyncio 的 Socket

一個阻塞式的 recv() 呼叫會凍結整個指令碼,直到有位元組抵達。一個阻塞式的 accept() 呼叫一次只能服務一個用戶端。這兩者都正是 asyncio 為了處理而存在的那種「等待 I/O」情境。asyncio 章節 涵蓋了事件迴圈、協程以及同步原語;本頁則涵蓋與網路相關的部分。

asyncio 模組透過少數幾個輔助函式來公開網路功能,這些函式接受並回傳 串流——一種包裝 socket 並提供可 await 版本讀取與寫入的高階物件。底層的 socket 仍然存在;只是應用不會直接接觸它。

9.14.1. 使用 asyncio 的用戶端

asyncio.open_connection()socket.socket.connect() 的 asyncio 對應物。它會開啟一個 TCP 連線並回傳兩個串流物件:一個 reader 與一個 writer::

import asyncio

async def client():
    reader, writer = await asyncio.open_connection("192.168.1.20", 9000)

    writer.write(b"hello\n")
    await writer.drain()                   # wait until bytes have been sent

    reply = await reader.readline()
    print("reply:", reply)

    writer.close()
    await writer.wait_closed()

asyncio.run(client())

有三點需要注意:

  • 連線的建立是一次 await,而非阻塞式呼叫。在交握進行的同時,事件迴圈可以自由地執行其他協程。

  • write() 會將位元組放入外送緩衝區;drain() 則是會讓出給迴圈的 await,直到那些位元組確實已透過網路送出為止。

  • readline() 會持續讀取位元組,直到換行符號抵達。該串流類別也包含 read()(讀取最多 N 個位元組)與 readexactly()(恰好讀取 N 個位元組),它們可以解決 TCP 的訊息邊界問題,而無需手動撰寫成幀迴圈。

9.14.2. 使用 asyncio 的伺服器

asyncio.start_server() 是 bind/listen/accept 這一連串操作的 asyncio 對應物。它接受一個回呼函式,該函式會在每個進來的連線時執行一次,並使用與用戶端相同的 reader/writer 配對::

import asyncio

async def handle(reader, writer):
    addr = writer.get_extra_info("peername")
    print("connection from", addr)

    while True:
        data = await reader.read(1024)
        if not data:
            break
        writer.write(data)                 # echo back
        await writer.drain()

    writer.close()
    await writer.wait_closed()

async def main():
    server = await asyncio.start_server(handle, "0.0.0.0", 9000)
    print("listening on", server.sockets[0].getsockname())
    async with server:
        await server.serve_forever()

asyncio.run(main())

每個被接受的連線都會成為它自己執行 handle 的任務。事件迴圈會自然地在它們之間調度——一個緩慢的用戶端無法阻塞其他用戶端,因為當它在 await reader.read(...) 上等待時,迴圈可以自由地在其他每一個連線上取得進展。新增十個並行的用戶端並不需要十個執行緒;同一個單執行緒的事件迴圈就能驅動它們全部。

這正是為 asyncio 撰寫的相機網路應用比等效的阻塞式程式碼擴充性好得多的實際原因:TCP socket 上的伺服器情境是一次一個用戶端;而這個情境則是一次多個用戶端,且無需任何額外工夫。

9.14.3. 在進行網路工作的同時並行處理其他工作

最大的回報在於將網路功能與相機其餘的工作混合在同一個迴圈中。相機可以擷取一張影格、執行影像處理,並且 服務一個網路協定,這些全都交錯進行::

import asyncio

async def capture_loop():
    while True:
        img = await camera.snapshot()
        # process img ...
        await asyncio.sleep_ms(100)

async def handle(reader, writer):
    ...

async def main():
    server = await asyncio.start_server(handle, "0.0.0.0", 9000)
    await asyncio.gather(
        server.serve_forever(),
        capture_loop(),
    )

asyncio.run(main())

asyncio.gather() 會在同一個事件迴圈上執行這兩個協程。當相機在影格之間於 sleep_ms() 中休眠時,伺服器便有機會調度網路流量。當伺服器在等待下一個位元組時,相機便有機會擷取影像。兩者都在單一個 MicroPython 執行緒上取得進展。

9.14.4. 使用 asyncio 的 UDP

asyncio 模組 並未 為 UDP 提供相同的高階串流——資料報並不符合串流的讀取/寫入形式。在相機上的實際做法是將 UDP 工作放入它自己的協程中,將 socket 切換為非阻塞模式,並在每次讀取嘗試之間讓出給事件迴圈::

import asyncio
import socket

async def udp_listener(port):
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.setblocking(False)
    s.bind(("0.0.0.0", port))

    while True:
        try:
            data, src = s.recvfrom(1024)
        except OSError:
            await asyncio.sleep_ms(10)
            continue
        print("got", data, "from", src)

透過 s.setblocking(False) 將 socket 設為非阻塞,因此當沒有資料報在等待時,recvfrom() 會立即引發 OSError,而不會阻塞整個事件迴圈。空分支中的 await asyncio.sleep_ms(10) 會將控制權交還給事件迴圈,直到下一次輪詢為止。

傳送遵循相同的形式:在非阻塞 socket 上呼叫 sendto(),要不是立即成功,就是引發錯誤。並沒有 sendallto——UDP 資料報是不可分割的,因此每次傳送要不是一整個資料報,就是完全沒有。如果傳送緩衝區已滿,對 UDP 而言正確的做法通常是丟棄該資料報,並讓下一個資料報在下一次迴圈時送出::

async def udp_telemetry(target_addr, period_ms):
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.setblocking(False)

    while True:
        payload = collect_telemetry()
        try:
            s.sendto(payload, target_addr)
        except OSError:
            pass                # buffer full -- skip this one

        await asyncio.sleep_ms(period_ms)

失敗的分支在實務中很少見。UDP 沒有流量控制,因此 sendto() 幾乎總是在第一次嘗試時就成功;那個 except 之所以存在,主要是為了讓短暫的網路中斷不會讓協程崩潰。

Asyncio 一節涵蓋了將阻塞式 I/O 混入 asyncio 程式的更廣泛模式;同樣的模式可直接套用於 UDP socket。

9.14.5. 逾時與取消

將網路呼叫包裝在 asyncio.wait_for() 中,便會為它設定一個期限::

try:
    reply = await asyncio.wait_for(reader.readline(), timeout=2.0)
except asyncio.TimeoutError:
    print("server is slow")

一個耗時過久的協程也可以從別處被 cancel() 取消。這兩種機制都在 協調章節 中有詳細的說明;它們可原封不動地套用於 asyncio.open_connection()asyncio.start_server() 所回傳的串流。

關於完整的 Stream 參考(reader 與 writer 背後的類別,以及本頁順帶用到的輔助函式),請參閱 asyncio --- 非同步 I/O 排程器