9.14. asyncio를 이용한 소켓

블로킹 recv() 호출은 바이트가 도착할 때까지 스크립트 전체를 멈춥니다. 블로킹 accept() 호출은 한 번에 클라이언트 하나만 처리합니다. 이 둘은 모두 정확히 asyncio 가 처리하기 위해 존재하는 “I/O를 기다리는” 상황입니다. asyncio 챕터 에서 이벤트 루프, 코루틴, 동기화 프리미티브를 다룹니다. 이 페이지에서는 네트워크에 특화된 부분을 다룹니다.

asyncio 모듈은 스트림(streams) – 소켓을 감싸고 await 가능한 버전의 읽기와 쓰기를 제공하는 고수준 객체 – 을 받고 반환하는 소수의 헬퍼를 통해 네트워킹을 노출합니다. 기반이 되는 소켓은 여전히 존재하지만, 응용은 그것을 직접 건드리지 않습니다.

9.14.1. asyncio를 이용한 클라이언트

asyncio.open_connection()socket.socket.connect() 의 asyncio 대응물입니다. TCP 연결을 열고 두 개의 스트림 객체, 즉 리더(reader)라이터(writer) 를 반환합니다:

import asyncio

async def client():
    reader, writer = await asyncio.open_connection("192.168.1.20", 9000)

    writer.write(b"hello\n")
    await writer.drain()                   # wait until bytes have been sent

    reply = await reader.readline()
    print("reply:", reply)

    writer.close()
    await writer.wait_closed()

asyncio.run(client())

주목할 세 가지:

  • 연결 설정은 블로킹 호출 대신 하나의 await 입니다. 핸드셰이크가 진행되는 동안 이벤트 루프는 다른 코루틴을 자유롭게 실행할 수 있습니다.

  • write() 는 바이트를 송신 버퍼에 넣습니다. drain() 은 그 바이트가 실제로 네트워크를 통해 전송될 때까지 루프에 제어를 양보하는 await 입니다.

  • readline() 은 개행이 도착할 때까지 바이트를 읽습니다. 스트림 클래스에는 read() (최대 N바이트 읽기)와 readexactly() (정확히 N바이트 읽기)도 포함되어 있으며, 이는 프레이밍 루프를 손으로 작성하지 않고도 TCP의 메시지 경계 문제를 해결합니다.

9.14.2. asyncio를 이용한 서버

asyncio.start_server() 는 bind/listen/accept 과정의 asyncio 대응물입니다. 들어오는 연결마다 한 번씩 실행될 콜백을 받으며, 클라이언트 쪽이 사용하는 것과 동일한 리더/라이터 쌍을 받습니다:

import asyncio

async def handle(reader, writer):
    addr = writer.get_extra_info("peername")
    print("connection from", addr)

    while True:
        data = await reader.read(1024)
        if not data:
            break
        writer.write(data)                 # echo back
        await writer.drain()

    writer.close()
    await writer.wait_closed()

async def main():
    server = await asyncio.start_server(handle, "0.0.0.0", 9000)
    print("listening on", server.sockets[0].getsockname())
    async with server:
        await server.serve_forever()

asyncio.run(main())

수락된 모든 연결은 handle 을 실행하는 자체 태스크가 됩니다. 이벤트 루프는 자연스럽게 이들 사이를 디스패치합니다 – 느린 클라이언트 하나가 다른 것들을 막을 수 없는데, 그것이 await reader.read(...) 에서 기다리는 동안 루프가 다른 모든 연결에서 진행을 만들어낼 자유가 있기 때문입니다. 동시 클라이언트 10개를 추가하는 데 스레드 10개가 필요하지 않습니다. 동일한 단일 스레드 이벤트 루프가 그들 모두를 구동합니다.

이것이 asyncio용으로 작성된 카메라 네트워킹 응용이 그에 상응하는 블로킹 코드보다 훨씬 더 잘 확장되는 실질적인 이유입니다. TCP 소켓 의 서버 그림은 한 번에 한 클라이언트였지만, 이것은 추가 노력 없이 한 번에 여러 클라이언트입니다.

9.14.3. 네트워킹과 병행하는 동시 작업

큰 이점은 동일한 루프 안에서 네트워킹을 카메라의 나머지 작업과 섞는 것입니다. 카메라는 프레임을 캡처하고, 이미지 처리를 실행하고, 동시에 네트워크 프로토콜을 처리할 수 있으며, 이 모든 것이 서로 교차되어 진행됩니다:

import asyncio

async def capture_loop():
    while True:
        img = await camera.snapshot()
        # process img ...
        await asyncio.sleep_ms(100)

async def handle(reader, writer):
    ...

async def main():
    server = await asyncio.start_server(handle, "0.0.0.0", 9000)
    await asyncio.gather(
        server.serve_forever(),
        capture_loop(),
    )

asyncio.run(main())

asyncio.gather() 는 두 코루틴을 동일한 이벤트 루프에서 실행합니다. 카메라가 프레임 사이에 sleep_ms() 에서 잠자는 동안 서버는 네트워크 트래픽을 디스패치할 수 있습니다. 서버가 다음 바이트를 기다리는 동안 카메라는 캡처할 수 있습니다. 둘 다 단일 MicroPython 스레드에서 진행을 만들어냅니다.

9.14.4. asyncio를 이용한 UDP

asyncio 모듈은 UDP에 대해서는 동일한 고수준 스트림을 제공하지 않습니다 – 데이터그램은 스트림의 읽기/쓰기 형태에 맞지 않습니다. 카메라에서의 실용적인 접근법은 UDP 작업을 자체 코루틴에 두고, 소켓을 논블로킹 모드로 전환하고, 읽기 시도 사이에 이벤트 루프에 제어를 양보하는 것입니다:

import asyncio
import socket

async def udp_listener(port):
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.setblocking(False)
    s.bind(("0.0.0.0", port))

    while True:
        try:
            data, src = s.recvfrom(1024)
        except OSError:
            await asyncio.sleep_ms(10)
            continue
        print("got", data, "from", src)

소켓은 s.setblocking(False) 로 논블로킹으로 설정되므로, recvfrom() 은 대기 중인 데이터그램이 없을 때 전체 이벤트 루프를 막는 대신 즉시 OSError 를 발생시킵니다. 빈 분기의 await asyncio.sleep_ms(10) 은 다음 폴링까지 이벤트 루프에 제어를 돌려줍니다.

전송도 같은 형태를 따릅니다. 논블로킹 소켓에서의 sendto() 는 즉시 성공하거나 예외를 발생시킵니다. sendallto 는 없습니다 – UDP 데이터그램은 원자적이므로 각 전송은 하나의 완전한 데이터그램이거나 아무것도 아닙니다. 전송 버퍼가 가득 차면 UDP에서 올바른 조치는 보통 그 데이터그램을 버리고 다음 것이 다음 루프 차례에 나가도록 하는 것입니다:

async def udp_telemetry(target_addr, period_ms):
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.setblocking(False)

    while True:
        payload = collect_telemetry()
        try:
            s.sendto(payload, target_addr)
        except OSError:
            pass                # buffer full -- skip this one

        await asyncio.sleep_ms(period_ms)

실패 분기는 실제로는 드뭅니다. UDP에는 흐름 제어가 없으므로 sendto() 는 거의 항상 첫 시도에 성공합니다. except 는 주로 짧은 네트워크 끊김이 코루틴을 중단시키지 않도록 하기 위해 존재합니다.

Asyncio 섹션은 블로킹 I/O를 asyncio 프로그램에 섞는 더 폭넓은 패턴을 다룹니다. 동일한 패턴이 UDP 소켓에도 곧바로 적용됩니다.

9.14.5. 타임아웃과 취소

네트워크 호출을 asyncio.wait_for() 로 감싸면 거기에 데드라인을 부여합니다:

try:
    reply = await asyncio.wait_for(reader.readline(), timeout=2.0)
except asyncio.TimeoutError:
    print("server is slow")

너무 오래 걸리는 코루틴은 다른 곳에서 cancel() 로 취소될 수도 있습니다. 두 메커니즘 모두 코디네이션 챕터 에서 자세히 다룹니다. 이들은 asyncio.open_connection()asyncio.start_server() 가 반환하는 스트림에 변경 없이 적용됩니다.

전체 Stream 레퍼런스(리더와 라이터 뒤에 있는 클래스와, 이 페이지에서 지나가며 사용한 헬퍼들)는 asyncio — 비동기 I/O 스케줄러 를 참조하세요.