9.14. asyncioを使ったソケット¶
ブロッキングする recv() 呼び出しは、バイトが到着するまでスクリプト全体を凍結させます。ブロッキングする accept() 呼び出しは、一度に1クライアントしか処理できません。これらはどちらも、まさに asyncio が処理するために存在する「I/O待ち」の状況です。asyncioの章 ではイベントループ、コルーチン、同期プリミティブを扱います。このページではネットワーク特有の部分を扱います。
asyncioモジュールは、ストリーム(ソケットをラップし、読み書きの await 可能なバージョンを提供する高レベルオブジェクト)を受け取り返す少数のヘルパーを通じてネットワーク機能を公開します。基盤となるソケットは依然として存在しますが、アプリケーションはそれを直接触らないだけです。
9.14.1. asyncioを使ったクライアント¶
asyncio.open_connection() は socket.socket.connect() のasyncio版に相当します。TCP接続を開き、2つのストリームオブジェクト、リーダー と ライター を返します:
import asyncio
async def client():
reader, writer = await asyncio.open_connection("192.168.1.20", 9000)
writer.write(b"hello\n")
await writer.drain() # wait until bytes have been sent
reply = await reader.readline()
print("reply:", reply)
writer.close()
await writer.wait_closed()
asyncio.run(client())
注目すべき点は3つあります。
接続のセットアップはブロッキング呼び出しではなく1つの
awaitです。ハンドシェイクが進行している間、イベントループは他のコルーチンを自由に実行できます。write()はバイトを送信バッファに入れます。drain()は、それらのバイトが実際にネットワーク上に送信されるまでループに制御を譲るawaitです。readline()は改行が到着するまでバイトを読み取ります。ストリームクラスにはread()(最大Nバイトを読む)やreadexactly()(ちょうどNバイトを読む)も含まれており、フレーミングのループを手で書くことなくTCPのメッセージ境界問題を解決します。
9.14.2. asyncioを使ったサーバー¶
asyncio.start_server() は、bind/listen/acceptの一連の処理のasyncio版に相当します。これは、クライアント側が使うのと同じリーダー/ライターのペアを引数として、受信接続ごとに1回実行されるコールバックを受け取ります:
import asyncio
async def handle(reader, writer):
addr = writer.get_extra_info("peername")
print("connection from", addr)
while True:
data = await reader.read(1024)
if not data:
break
writer.write(data) # echo back
await writer.drain()
writer.close()
await writer.wait_closed()
async def main():
server = await asyncio.start_server(handle, "0.0.0.0", 9000)
print("listening on", server.sockets[0].getsockname())
async with server:
await server.serve_forever()
asyncio.run(main())
受け付けられた各接続は、handle を実行する独自のタスクになります。イベントループはそれらの間を自然にディスパッチします。遅いクライアントが1つあっても他をブロックすることはありません。なぜなら、それが await reader.read(...) で待っている間、ループは他のすべての接続で処理を進めることができるからです。10個の同時クライアントを追加するのに10個のスレッドは必要ありません。同じシングルスレッドのイベントループがそれらすべてを駆動します。
これが、asyncio向けに書かれたカメラネットワークアプリケーションが、同等のブロッキングコードよりもはるかにうまくスケールする実用的な理由です。TCP ソケット のサーバーの図は一度に1クライアントでしたが、こちらは追加の手間なしで一度に多数のクライアントを扱います。
9.14.3. ネットワーク処理と並行する作業¶
大きな利点は、ネットワーク処理をカメラの他の作業と同じループ内で組み合わせられることです。カメラはフレームをキャプチャし、画像処理を実行し、さらに ネットワークプロトコルを処理することを、すべてインターリーブして行えます:
import asyncio
async def capture_loop():
while True:
img = await camera.snapshot()
# process img ...
await asyncio.sleep_ms(100)
async def handle(reader, writer):
...
async def main():
server = await asyncio.start_server(handle, "0.0.0.0", 9000)
await asyncio.gather(
server.serve_forever(),
capture_loop(),
)
asyncio.run(main())
asyncio.gather() は2つのコルーチンを同じイベントループ上で実行します。カメラがフレーム間で sleep_ms() でスリープしている間に、サーバーはネットワークトラフィックをディスパッチできます。サーバーが次のバイトを待っている間に、カメラはキャプチャを行えます。どちらも単一のMicroPythonスレッド上で処理を進めます。
9.14.4. asyncioを使ったUDP¶
asyncioモジュールはUDPに対しては同じ高レベルストリームを 提供しません。データグラムはストリームの読み書きの形に合わないからです。カメラ上での実用的なアプローチは、UDP処理を独自のコルーチンに入れ、ソケットを非ブロッキングモードに切り替え、読み取りの試行と試行の間にイベントループへ制御を譲ることです:
import asyncio
import socket
async def udp_listener(port):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.setblocking(False)
s.bind(("0.0.0.0", port))
while True:
try:
data, src = s.recvfrom(1024)
except OSError:
await asyncio.sleep_ms(10)
continue
print("got", data, "from", src)
ソケットは s.setblocking(False) で非ブロッキングに設定されているため、データグラムが待機していないとき recvfrom() はイベントループ全体をブロックする代わりに即座に OSError を送出します。空の分岐にある await asyncio.sleep_ms(10) は、次のポーリングまで制御をイベントループに返します。
送信も同じ形です。非ブロッキングソケット上の sendto() は即座に成功するか送出するかのどちらかです。sendallto は存在しません。UDPデータグラムはアトミックなので、各送信は1つの完全なデータグラムか何もないかのどちらかです。送信バッファが満杯の場合、UDPでの正しい対処は通常、そのデータグラムを破棄し、次回ループを回るときに次のものを送出させることです:
async def udp_telemetry(target_addr, period_ms):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.setblocking(False)
while True:
payload = collect_telemetry()
try:
s.sendto(payload, target_addr)
except OSError:
pass # buffer full -- skip this one
await asyncio.sleep_ms(period_ms)
失敗する分岐は実際にはまれです。UDPにはフロー制御がないため、sendto() はほぼ常に初回の試行で成功します。except は主に、一時的なネットワークの不具合でコルーチンがクラッシュしないようにするために存在します。
Asyncio のセクションでは、ブロッキングI/Oをasyncioプログラムに組み込むためのより広範なパターンを扱います。同じパターンはUDPソケットにそのまま適用できます。
9.14.5. タイムアウトとキャンセル¶
ネットワーク呼び出しを asyncio.wait_for() でラップすると、それに期限を設定できます:
try:
reply = await asyncio.wait_for(reader.readline(), timeout=2.0)
except asyncio.TimeoutError:
print("server is slow")
時間がかかりすぎているコルーチンは、他の場所から cancel() でキャンセルすることもできます。どちらの仕組みも 協調動作の章 で詳しく扱われています。これらは asyncio.open_connection() や asyncio.start_server() が返すストリームにもそのまま適用できます。
Stream の完全なリファレンス(リーダーとライターの背後にあるクラス、およびこのページがついでに使ったヘルパー)については、asyncio --- 非同期I/Oスケジューラ を参照してください。