9.14. Sockets con asyncio

Una llamada bloqueante a recv() congela todo el script hasta que llega un byte. Una llamada bloqueante a accept() atiende a un solo cliente a la vez. Ambas son exactamente el tipo de situación de «esperar a la E/S» para la que existe asyncio. El capítulo sobre asyncio cubre el bucle de eventos, las corrutinas y las primitivas de sincronización; esta página cubre las piezas específicas de red.

El módulo asyncio expone la red a través de un pequeño conjunto de ayudantes que toman y devuelven streams – objetos de alto nivel que envuelven un socket y ofrecen versiones con await de la lectura y la escritura. El socket subyacente sigue ahí; la aplicación simplemente no lo toca directamente.

9.14.1. Un cliente con asyncio

asyncio.open_connection() es la contraparte de asyncio de socket.socket.connect(). Abre una conexión TCP y devuelve dos objetos stream: un lector y un escritor:

import asyncio

async def client():
    reader, writer = await asyncio.open_connection("192.168.1.20", 9000)

    writer.write(b"hello\n")
    await writer.drain()                   # wait until bytes have been sent

    reply = await reader.readline()
    print("reply:", reply)

    writer.close()
    await writer.wait_closed()

asyncio.run(client())

Tres cosas a tener en cuenta:

  • El establecimiento de la conexión es un solo await en lugar de una llamada bloqueante. Mientras el handshake está en curso, el bucle de eventos queda libre para ejecutar otras corrutinas.

  • write() coloca bytes en un búfer de salida; drain() es el await que cede el control al bucle hasta que esos bytes se han enviado realmente por la red.

  • readline() lee bytes hasta que llega un salto de línea. La clase stream incluye también read() (leer hasta N bytes) y readexactly() (leer exactamente N bytes), que resuelven el problema de los límites de mensaje de TCP sin escribir a mano los bucles de delimitación.

9.14.2. Un servidor con asyncio

asyncio.start_server() es la contraparte de asyncio de la secuencia bind/listen/accept. Toma una función de retorno (callback) que se ejecutará una vez por cada conexión entrante, con el mismo par lector/escritor que usa el lado del cliente:

import asyncio

async def handle(reader, writer):
    addr = writer.get_extra_info("peername")
    print("connection from", addr)

    while True:
        data = await reader.read(1024)
        if not data:
            break
        writer.write(data)                 # echo back
        await writer.drain()

    writer.close()
    await writer.wait_closed()

async def main():
    server = await asyncio.start_server(handle, "0.0.0.0", 9000)
    print("listening on", server.sockets[0].getsockname())
    async with server:
        await server.serve_forever()

asyncio.run(main())

Cada conexión aceptada se convierte en su propia tarea ejecutando handle. El bucle de eventos las despacha de forma natural – un cliente lento no puede bloquear a los demás, porque mientras está esperando en await reader.read(...) el bucle queda libre para avanzar en todas las demás conexiones. Añadir diez clientes simultáneos no requiere diez hilos; el mismo bucle de eventos de un solo hilo los gestiona todos.

Esta es la razón práctica por la que las aplicaciones de red para cámaras escritas con asyncio escalan mucho mejor que el código bloqueante equivalente: la imagen del servidor en Sockets TCP era de un cliente a la vez; esta es de muchos clientes a la vez sin esfuerzo adicional.

9.14.3. Trabajo concurrente junto con la red

La gran ventaja es combinar la red con el resto del trabajo de la cámara en el mismo bucle. La cámara puede capturar un fotograma, ejecutar procesamiento de imágenes y servir un protocolo de red, todo entrelazado:

import asyncio

async def capture_loop():
    while True:
        img = await camera.snapshot()
        # process img ...
        await asyncio.sleep_ms(100)

async def handle(reader, writer):
    ...

async def main():
    server = await asyncio.start_server(handle, "0.0.0.0", 9000)
    await asyncio.gather(
        server.serve_forever(),
        capture_loop(),
    )

asyncio.run(main())

asyncio.gather() ejecuta las dos corrutinas en el mismo bucle de eventos. Mientras la cámara duerme en sleep_ms() entre fotogramas, el servidor puede despachar el tráfico de red. Mientras el servidor espera el siguiente byte, la cámara puede capturar. Ambos avanzan en un único hilo de MicroPython.

9.14.4. UDP con asyncio

El módulo asyncio no ofrece los mismos streams de alto nivel para UDP – los datagramas no encajan en la forma de lectura/escritura de un stream. El enfoque práctico en la cámara es colocar el trabajo de UDP en su propia corrutina, poner el socket en modo no bloqueante y ceder el control al bucle de eventos entre intentos de lectura:

import asyncio
import socket

async def udp_listener(port):
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.setblocking(False)
    s.bind(("0.0.0.0", port))

    while True:
        try:
            data, src = s.recvfrom(1024)
        except OSError:
            await asyncio.sleep_ms(10)
            continue
        print("got", data, "from", src)

El socket se pone en modo no bloqueante con s.setblocking(False), de modo que recvfrom() lanza OSError de inmediato cuando no hay ningún datagrama esperando, en lugar de bloquear todo el bucle de eventos. El await asyncio.sleep_ms(10) de la rama vacía devuelve el control al bucle de eventos hasta el siguiente sondeo.

El envío sigue la misma forma: sendto() en un socket no bloqueante o bien tiene éxito de inmediato o bien lanza una excepción. No existe sendallto – los datagramas UDP son atómicos, así que cada envío es un datagrama completo o ninguno. Si el búfer de envío está lleno, lo correcto en UDP suele ser descartar el datagrama y dejar que el siguiente salga en la próxima vuelta del bucle:

async def udp_telemetry(target_addr, period_ms):
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.setblocking(False)

    while True:
        payload = collect_telemetry()
        try:
            s.sendto(payload, target_addr)
        except OSError:
            pass                # buffer full -- skip this one

        await asyncio.sleep_ms(period_ms)

La rama de fallo es poco frecuente en la práctica. UDP no tiene control de flujo, así que sendto() casi siempre tiene éxito al primer intento; el except existe sobre todo para que un breve contratiempo de la red no haga fallar la corrutina.

La sección Asyncio cubre los patrones más generales para combinar E/S bloqueante en un programa asyncio; los mismos patrones se aplican directamente a un socket UDP.

9.14.5. Tiempos de espera y cancelación

Envolver una llamada de red en asyncio.wait_for() le impone un plazo límite:

try:
    reply = await asyncio.wait_for(reader.readline(), timeout=2.0)
except asyncio.TimeoutError:
    print("server is slow")

Una corrutina que está tardando demasiado también puede cancelarse desde otro lugar con cancel(). Ambos mecanismos se cubren en detalle en el capítulo sobre coordinación; se aplican sin cambios a los streams devueltos por asyncio.open_connection() y asyncio.start_server().

Para la referencia completa de Stream (la clase detrás de los lectores y escritores, además de los ayudantes que esta página usó de pasada), consulta asyncio — planificador de E/S asíncrona.