9.14. Sockets avec asyncio

Un appel bloquant à recv() fige tout le script jusqu’à l’arrivée d’un octet. Un appel bloquant à accept() ne sert qu’un seul client à la fois. Ces deux situations sont exactement le genre de cas d”« attente sur les E/S » pour lequel asyncio existe. Le chapitre asyncio couvre la boucle d’événements, les coroutines et les primitives de synchronisation ; cette page couvre les éléments spécifiques au réseau.

Le module asyncio expose la mise en réseau à travers un petit nombre d’utilitaires qui prennent et retournent des streams – des objets de haut niveau qui encapsulent un socket et offrent des versions await-ables de la lecture et de l’écriture. Le socket sous-jacent est toujours là ; l’application n’y touche simplement pas directement.

9.14.1. Un client avec asyncio

asyncio.open_connection() est l’équivalent asyncio de socket.socket.connect(). Il ouvre une connexion TCP et retourne deux objets stream : un reader et un writer

import asyncio

async def client():
    reader, writer = await asyncio.open_connection("192.168.1.20", 9000)

    writer.write(b"hello\n")
    await writer.drain()                   # wait until bytes have been sent

    reply = await reader.readline()
    print("reply:", reply)

    writer.close()
    await writer.wait_closed()

asyncio.run(client())

Trois points à noter :

  • La mise en place de la connexion est un seul await au lieu d’un appel bloquant. Pendant que la poignée de main est en cours, la boucle d’événements est libre d’exécuter d’autres coroutines.

  • write() place des octets dans un tampon sortant ; drain() est le await qui rend la main à la boucle jusqu’à ce que ces octets aient effectivement été envoyés sur le réseau.

  • readline() lit des octets jusqu’à l’arrivée d’un saut de ligne. La classe stream inclut également read() (lire jusqu’à N octets) et readexactly() (lire exactement N octets), qui résolvent le problème des limites de message de TCP sans écrire les boucles de découpage à la main.

9.14.2. Un serveur avec asyncio

asyncio.start_server() est l’équivalent asyncio de la séquence bind/listen/accept. Il prend une fonction de rappel qui sera exécutée une fois par connexion entrante, avec la même paire reader/writer que celle utilisée côté client

import asyncio

async def handle(reader, writer):
    addr = writer.get_extra_info("peername")
    print("connection from", addr)

    while True:
        data = await reader.read(1024)
        if not data:
            break
        writer.write(data)                 # echo back
        await writer.drain()

    writer.close()
    await writer.wait_closed()

async def main():
    server = await asyncio.start_server(handle, "0.0.0.0", 9000)
    print("listening on", server.sockets[0].getsockname())
    async with server:
        await server.serve_forever()

asyncio.run(main())

Chaque connexion acceptée devient sa propre tâche exécutant handle. La boucle d’événements répartit naturellement entre elles – un client lent ne peut pas bloquer les autres, car pendant qu’il attend sur await reader.read(...), la boucle est libre de progresser sur toutes les autres connexions. Ajouter dix clients concurrents ne nécessite pas dix threads ; la même boucle d’événements monothread les pilote tous.

C’est la raison pratique pour laquelle les applications de mise en réseau de caméra écrites pour asyncio passent à l’échelle bien mieux que le code bloquant équivalent : l’illustration du serveur dans Sockets TCP était un-client-à-la-fois ; celle-ci est plusieurs-clients-à-la-fois sans effort supplémentaire.

9.14.3. Travail concurrent en parallèle de la mise en réseau

Le grand avantage est de mélanger la mise en réseau avec le reste du travail de la caméra dans la même boucle. La caméra peut capturer une trame, exécuter du traitement d’image et servir un protocole réseau, le tout entrelacé

import asyncio

async def capture_loop():
    while True:
        img = await camera.snapshot()
        # process img ...
        await asyncio.sleep_ms(100)

async def handle(reader, writer):
    ...

async def main():
    server = await asyncio.start_server(handle, "0.0.0.0", 9000)
    await asyncio.gather(
        server.serve_forever(),
        capture_loop(),
    )

asyncio.run(main())

asyncio.gather() exécute les deux coroutines sur la même boucle d’événements. Pendant que la caméra dort dans sleep_ms() entre les trames, le serveur peut répartir le trafic réseau. Pendant que le serveur attend l’octet suivant, la caméra peut capturer. Les deux progressent sur un seul thread MicroPython.

9.14.4. UDP avec asyncio

Le module asyncio n’offre pas les mêmes streams de haut niveau pour UDP – les datagrammes ne correspondent pas à la forme lecture/écriture d’un stream. L’approche pratique sur la caméra consiste à placer le travail UDP dans sa propre coroutine, à basculer le socket en mode non bloquant et à rendre la main à la boucle d’événements entre les tentatives de lecture

import asyncio
import socket

async def udp_listener(port):
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.setblocking(False)
    s.bind(("0.0.0.0", port))

    while True:
        try:
            data, src = s.recvfrom(1024)
        except OSError:
            await asyncio.sleep_ms(10)
            continue
        print("got", data, "from", src)

Le socket est mis en mode non bloquant avec s.setblocking(False), de sorte que recvfrom() lève OSError immédiatement lorsqu’aucun datagramme n’est en attente, au lieu de bloquer toute la boucle d’événements. Le await asyncio.sleep_ms(10) dans la branche vide rend le contrôle à la boucle d’événements jusqu’au prochain sondage.

L’envoi suit la même forme : sendto() sur un socket non bloquant réussit immédiatement ou lève une exception. Il n’y a pas de sendallto – les datagrammes UDP sont atomiques, donc chaque envoi est un datagramme entier ou aucun. Si le tampon d’envoi est plein, la bonne décision pour UDP est généralement d’abandonner le datagramme et de laisser le suivant partir au prochain passage dans la boucle

async def udp_telemetry(target_addr, period_ms):
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.setblocking(False)

    while True:
        payload = collect_telemetry()
        try:
            s.sendto(payload, target_addr)
        except OSError:
            pass                # buffer full -- skip this one

        await asyncio.sleep_ms(period_ms)

La branche d’échec est rare en pratique. UDP n’a pas de contrôle de flux, donc sendto() réussit presque toujours du premier coup ; le except existe surtout pour qu’un bref accroc réseau ne fasse pas planter la coroutine.

La section Asyncio couvre les modèles plus larges pour intégrer des E/S bloquantes dans un programme asyncio ; les mêmes modèles s’appliquent directement à un socket UDP.

9.14.5. Délais d’attente et annulation

Envelopper un appel réseau dans asyncio.wait_for() lui impose une échéance

try:
    reply = await asyncio.wait_for(reader.readline(), timeout=2.0)
except asyncio.TimeoutError:
    print("server is slow")

Une coroutine qui prend trop de temps peut aussi être annulée depuis ailleurs avec cancel(). Les deux mécanismes sont couverts en détail dans le chapitre sur la coordination ; ils s’appliquent sans changement aux streams retournés par asyncio.open_connection() et asyncio.start_server().

Pour la référence complète de Stream (la classe derrière les readers et les writers, plus les utilitaires que cette page a utilisés au passage), voir asyncio — ordonnanceur d’entrées/sorties asynchrones.