9.14. asyncioを使ったソケット

ブロッキングする recv() 呼び出しは、バイトが到着するまでスクリプト全体を凍結させます。ブロッキングする accept() 呼び出しは、一度に1クライアントしか処理できません。これらはどちらも、まさに asyncio が処理するために存在する「I/O待ち」の状況です。asyncioの章 ではイベントループ、コルーチン、同期プリミティブを扱います。このページではネットワーク特有の部分を扱います。

asyncioモジュールは、ストリーム(ソケットをラップし、読み書きの await 可能なバージョンを提供する高レベルオブジェクト)を受け取り返す少数のヘルパーを通じてネットワーク機能を公開します。基盤となるソケットは依然として存在しますが、アプリケーションはそれを直接触らないだけです。

9.14.1. asyncioを使ったクライアント

asyncio.open_connection()socket.socket.connect() のasyncio版に相当します。TCP接続を開き、2つのストリームオブジェクト、リーダーライター を返します:

import asyncio

async def client():
    reader, writer = await asyncio.open_connection("192.168.1.20", 9000)

    writer.write(b"hello\n")
    await writer.drain()                   # wait until bytes have been sent

    reply = await reader.readline()
    print("reply:", reply)

    writer.close()
    await writer.wait_closed()

asyncio.run(client())

注目すべき点は3つあります。

  • 接続のセットアップはブロッキング呼び出しではなく1つの await です。ハンドシェイクが進行している間、イベントループは他のコルーチンを自由に実行できます。

  • write() はバイトを送信バッファに入れます。drain() は、それらのバイトが実際にネットワーク上に送信されるまでループに制御を譲る await です。

  • readline() は改行が到着するまでバイトを読み取ります。ストリームクラスには read()(最大Nバイトを読む)や readexactly()(ちょうどNバイトを読む)も含まれており、フレーミングのループを手で書くことなくTCPのメッセージ境界問題を解決します。

9.14.2. asyncioを使ったサーバー

asyncio.start_server() は、bind/listen/acceptの一連の処理のasyncio版に相当します。これは、クライアント側が使うのと同じリーダー/ライターのペアを引数として、受信接続ごとに1回実行されるコールバックを受け取ります:

import asyncio

async def handle(reader, writer):
    addr = writer.get_extra_info("peername")
    print("connection from", addr)

    while True:
        data = await reader.read(1024)
        if not data:
            break
        writer.write(data)                 # echo back
        await writer.drain()

    writer.close()
    await writer.wait_closed()

async def main():
    server = await asyncio.start_server(handle, "0.0.0.0", 9000)
    print("listening on", server.sockets[0].getsockname())
    async with server:
        await server.serve_forever()

asyncio.run(main())

受け付けられた各接続は、handle を実行する独自のタスクになります。イベントループはそれらの間を自然にディスパッチします。遅いクライアントが1つあっても他をブロックすることはありません。なぜなら、それが await reader.read(...) で待っている間、ループは他のすべての接続で処理を進めることができるからです。10個の同時クライアントを追加するのに10個のスレッドは必要ありません。同じシングルスレッドのイベントループがそれらすべてを駆動します。

これが、asyncio向けに書かれたカメラネットワークアプリケーションが、同等のブロッキングコードよりもはるかにうまくスケールする実用的な理由です。TCP ソケット のサーバーの図は一度に1クライアントでしたが、こちらは追加の手間なしで一度に多数のクライアントを扱います。

9.14.3. ネットワーク処理と並行する作業

大きな利点は、ネットワーク処理をカメラの他の作業と同じループ内で組み合わせられることです。カメラはフレームをキャプチャし、画像処理を実行し、さらに ネットワークプロトコルを処理することを、すべてインターリーブして行えます:

import asyncio

async def capture_loop():
    while True:
        img = await camera.snapshot()
        # process img ...
        await asyncio.sleep_ms(100)

async def handle(reader, writer):
    ...

async def main():
    server = await asyncio.start_server(handle, "0.0.0.0", 9000)
    await asyncio.gather(
        server.serve_forever(),
        capture_loop(),
    )

asyncio.run(main())

asyncio.gather() は2つのコルーチンを同じイベントループ上で実行します。カメラがフレーム間で sleep_ms() でスリープしている間に、サーバーはネットワークトラフィックをディスパッチできます。サーバーが次のバイトを待っている間に、カメラはキャプチャを行えます。どちらも単一のMicroPythonスレッド上で処理を進めます。

9.14.4. asyncioを使ったUDP

asyncioモジュールはUDPに対しては同じ高レベルストリームを 提供しません。データグラムはストリームの読み書きの形に合わないからです。カメラ上での実用的なアプローチは、UDP処理を独自のコルーチンに入れ、ソケットを非ブロッキングモードに切り替え、読み取りの試行と試行の間にイベントループへ制御を譲ることです:

import asyncio
import socket

async def udp_listener(port):
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.setblocking(False)
    s.bind(("0.0.0.0", port))

    while True:
        try:
            data, src = s.recvfrom(1024)
        except OSError:
            await asyncio.sleep_ms(10)
            continue
        print("got", data, "from", src)

ソケットは s.setblocking(False) で非ブロッキングに設定されているため、データグラムが待機していないとき recvfrom() はイベントループ全体をブロックする代わりに即座に OSError を送出します。空の分岐にある await asyncio.sleep_ms(10) は、次のポーリングまで制御をイベントループに返します。

送信も同じ形です。非ブロッキングソケット上の sendto() は即座に成功するか送出するかのどちらかです。sendallto は存在しません。UDPデータグラムはアトミックなので、各送信は1つの完全なデータグラムか何もないかのどちらかです。送信バッファが満杯の場合、UDPでの正しい対処は通常、そのデータグラムを破棄し、次回ループを回るときに次のものを送出させることです:

async def udp_telemetry(target_addr, period_ms):
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.setblocking(False)

    while True:
        payload = collect_telemetry()
        try:
            s.sendto(payload, target_addr)
        except OSError:
            pass                # buffer full -- skip this one

        await asyncio.sleep_ms(period_ms)

失敗する分岐は実際にはまれです。UDPにはフロー制御がないため、sendto() はほぼ常に初回の試行で成功します。except は主に、一時的なネットワークの不具合でコルーチンがクラッシュしないようにするために存在します。

Asyncio のセクションでは、ブロッキングI/Oをasyncioプログラムに組み込むためのより広範なパターンを扱います。同じパターンはUDPソケットにそのまま適用できます。

9.14.5. タイムアウトとキャンセル

ネットワーク呼び出しを asyncio.wait_for() でラップすると、それに期限を設定できます:

try:
    reply = await asyncio.wait_for(reader.readline(), timeout=2.0)
except asyncio.TimeoutError:
    print("server is slow")

時間がかかりすぎているコルーチンは、他の場所から cancel() でキャンセルすることもできます。どちらの仕組みも 協調動作の章 で詳しく扱われています。これらは asyncio.open_connection()asyncio.start_server() が返すストリームにもそのまま適用できます。

Stream の完全なリファレンス(リーダーとライターの背後にあるクラス、およびこのページがついでに使ったヘルパー)については、asyncio --- 非同期I/Oスケジューラ を参照してください。