9.14. Sockets com asyncio

Uma chamada bloqueante a recv() congela todo o script até que um byte chegue. Uma chamada bloqueante a accept() atende apenas um cliente por vez. Ambas são exatamente o tipo de situação de “esperar por E/S” que o asyncio existe para lidar. O capítulo sobre asyncio aborda o loop de eventos, as corrotinas e as primitivas de sincronização; esta página aborda as partes específicas de rede.

O módulo asyncio expõe a rede por meio de um pequeno número de auxiliares que recebem e retornam streams – objetos de alto nível que envolvem um socket e oferecem versões com await de leitura e escrita. O socket subjacente continua lá; a aplicação simplesmente não o toca diretamente.

9.14.1. Um cliente com asyncio

asyncio.open_connection() é a contraparte do asyncio para socket.socket.connect(). Ela abre uma conexão TCP e retorna dois objetos de stream: um reader e um writer

import asyncio

async def client():
    reader, writer = await asyncio.open_connection("192.168.1.20", 9000)

    writer.write(b"hello\n")
    await writer.drain()                   # wait until bytes have been sent

    reply = await reader.readline()
    print("reply:", reply)

    writer.close()
    await writer.wait_closed()

asyncio.run(client())

Três pontos a observar:

  • A configuração da conexão é um único await em vez de uma chamada bloqueante. Enquanto o handshake está em andamento, o loop de eventos fica livre para executar outras corrotinas.

  • write() coloca bytes em um buffer de saída; drain() é o await que devolve o controle ao loop até que esses bytes tenham de fato sido enviados pela rede.

  • readline() lê bytes até que uma quebra de linha chegue. A classe stream também inclui read() (lê até N bytes) e readexactly() (lê exatamente N bytes), que resolvem o problema de fronteira de mensagens do TCP sem escrever os loops de enquadramento à mão.

9.14.2. Um servidor com asyncio

asyncio.start_server() é a contraparte do asyncio para a sequência bind/listen/accept. Ela recebe um callback que será executado uma vez por conexão de entrada, com o mesmo par reader/writer que o lado do cliente usa:

import asyncio

async def handle(reader, writer):
    addr = writer.get_extra_info("peername")
    print("connection from", addr)

    while True:
        data = await reader.read(1024)
        if not data:
            break
        writer.write(data)                 # echo back
        await writer.drain()

    writer.close()
    await writer.wait_closed()

async def main():
    server = await asyncio.start_server(handle, "0.0.0.0", 9000)
    print("listening on", server.sockets[0].getsockname())
    async with server:
        await server.serve_forever()

asyncio.run(main())

Cada conexão aceita se torna sua própria task executando handle. O loop de eventos distribui o trabalho entre elas naturalmente – um cliente lento não pode bloquear os outros, porque enquanto ele está esperando em await reader.read(...) o loop fica livre para avançar em todas as outras conexões. Adicionar dez clientes concorrentes não requer dez threads; o mesmo loop de eventos de thread única conduz todos eles.

Esta é a razão prática pela qual as aplicações de rede de câmera escritas para asyncio escalam muito melhor do que o código bloqueante equivalente: o cenário do servidor em Sockets TCP era um-cliente-por-vez; este é muitos-clientes-de-uma-vez sem esforço extra.

9.14.3. Trabalho concorrente junto com a rede

O grande benefício é misturar a rede com o restante do trabalho da câmera no mesmo loop. A câmera pode capturar um quadro, executar processamento de imagem e atender a um protocolo de rede, tudo intercalado:

import asyncio

async def capture_loop():
    while True:
        img = await camera.snapshot()
        # process img ...
        await asyncio.sleep_ms(100)

async def handle(reader, writer):
    ...

async def main():
    server = await asyncio.start_server(handle, "0.0.0.0", 9000)
    await asyncio.gather(
        server.serve_forever(),
        capture_loop(),
    )

asyncio.run(main())

asyncio.gather() executa as duas corrotinas no mesmo loop de eventos. Enquanto a câmera está dormindo em sleep_ms() entre os quadros, o servidor pode despachar o tráfego de rede. Enquanto o servidor está esperando o próximo byte, a câmera pode capturar. Ambos avançam em uma única thread do MicroPython.

9.14.4. UDP com asyncio

O módulo asyncio não oferece os mesmos streams de alto nível para UDP – os datagramas não se encaixam na forma de leitura/escrita de um stream. A abordagem prática na câmera é colocar o trabalho UDP em sua própria corrotina, mudar o socket para o modo não bloqueante e devolver o controle ao loop de eventos entre as tentativas de leitura:

import asyncio
import socket

async def udp_listener(port):
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.setblocking(False)
    s.bind(("0.0.0.0", port))

    while True:
        try:
            data, src = s.recvfrom(1024)
        except OSError:
            await asyncio.sleep_ms(10)
            continue
        print("got", data, "from", src)

O socket é definido como não bloqueante com s.setblocking(False), de modo que recvfrom() lança OSError imediatamente quando nenhum datagrama está aguardando, em vez de bloquear todo o loop de eventos. O await asyncio.sleep_ms(10) no ramo vazio devolve o controle ao loop de eventos até a próxima sondagem.

O envio segue a mesma forma: sendto() em um socket não bloqueante ou tem sucesso imediatamente ou lança uma exceção. Não existe sendallto – os datagramas UDP são atômicos, então cada envio é um datagrama inteiro ou nenhum. Se o buffer de envio estiver cheio, a atitude certa para o UDP geralmente é descartar o datagrama e deixar o próximo sair na passagem seguinte pelo loop:

async def udp_telemetry(target_addr, period_ms):
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.setblocking(False)

    while True:
        payload = collect_telemetry()
        try:
            s.sendto(payload, target_addr)
        except OSError:
            pass                # buffer full -- skip this one

        await asyncio.sleep_ms(period_ms)

O ramo de falha é raro na prática. O UDP não tem controle de fluxo, então sendto() quase sempre tem sucesso na primeira tentativa; o except existe principalmente para que uma breve falha de rede não derrube a corrotina.

A seção Asyncio aborda os padrões mais amplos para misturar E/S bloqueante em um programa asyncio; os mesmos padrões se aplicam diretamente a um socket UDP.

9.14.5. Timeouts e cancelamento

Envolver uma chamada de rede em asyncio.wait_for() impõe um prazo a ela:

try:
    reply = await asyncio.wait_for(reader.readline(), timeout=2.0)
except asyncio.TimeoutError:
    print("server is slow")

Uma corrotina que está demorando demais também pode ser cancelada de outro lugar com cancel(). Ambos os mecanismos são abordados em detalhe no capítulo sobre coordenação; eles se aplicam sem alteração aos streams retornados por asyncio.open_connection() e asyncio.start_server().

Para a referência completa de Stream (a classe por trás dos readers e writers, além dos auxiliares que esta página usou de passagem), consulte asyncio — agendador de E/S assíncrona.