9.14. Socket-uri cu asyncio

Un apel blocant recv() îngheață întregul script până când sosește un octet. Un apel blocant accept() deservește doar un singur client la un moment dat. Ambele sunt exact tipul de situație de „așteptare la I/O” pentru care există asyncio. Capitolul despre asyncio acoperă bucla de evenimente, corutinele și primitivele de sincronizare; această pagină acoperă elementele specifice rețelei.

Modulul asyncio expune rețelistica printr-un număr mic de funcții ajutătoare care primesc și returnează stream-uri – obiecte de nivel înalt care încapsulează un socket și oferă versiuni cu await pentru citire și scriere. Socketul subiacent este în continuare acolo; aplicația pur și simplu nu îl atinge direct.

9.14.1. Un client cu asyncio

asyncio.open_connection() este echivalentul asyncio al lui socket.socket.connect(). Deschide o conexiune TCP și returnează două obiecte stream: un reader și un writer

import asyncio

async def client():
    reader, writer = await asyncio.open_connection("192.168.1.20", 9000)

    writer.write(b"hello\n")
    await writer.drain()                   # wait until bytes have been sent

    reply = await reader.readline()
    print("reply:", reply)

    writer.close()
    await writer.wait_closed()

asyncio.run(client())

Trei aspecte de reținut:

  • Stabilirea conexiunii este un singur await în loc de un apel blocant. În timp ce handshake-ul este în desfășurare, bucla de evenimente este liberă să ruleze alte corutine.

  • write() introduce octeți într-un tampon de ieșire; drain() este apelul await care cedează controlul buclei până când acei octeți au fost efectiv trimiși prin rețea.

  • readline() citește octeți până când sosește un caracter de linie nouă. Clasa stream include și read() (citește până la N octeți) și readexactly() (citește exact N octeți), care rezolvă problema delimitării mesajelor în TCP fără a fi nevoie să scrieți manual buclele de încadrare.

9.14.2. Un server cu asyncio

asyncio.start_server() este echivalentul asyncio al secvenței bind/listen/accept. Primește o funcție de retroapelare (callback) care va fi rulată o dată pentru fiecare conexiune de intrare, cu aceeași pereche reader/writer pe care o folosește partea de client:

import asyncio

async def handle(reader, writer):
    addr = writer.get_extra_info("peername")
    print("connection from", addr)

    while True:
        data = await reader.read(1024)
        if not data:
            break
        writer.write(data)                 # echo back
        await writer.drain()

    writer.close()
    await writer.wait_closed()

async def main():
    server = await asyncio.start_server(handle, "0.0.0.0", 9000)
    print("listening on", server.sockets[0].getsockname())
    async with server:
        await server.serve_forever()

asyncio.run(main())

Fiecare conexiune acceptată devine propria sarcină care rulează handle. Bucla de evenimente comută între ele în mod natural – un client lent nu poate bloca ceilalți clienți, deoarece în timp ce acesta așteaptă la await reader.read(...) bucla este liberă să avanseze pe fiecare altă conexiune. Adăugarea a zece clienți concurenți nu necesită zece fire de execuție; aceeași buclă de evenimente cu un singur fir le conduce pe toate.

Acesta este motivul practic pentru care aplicațiile de rețelistică pentru cameră scrise cu asyncio scalează mult mai bine decât codul blocant echivalent: imaginea serverului din Socket-uri TCP era de tip un-client-la-un-moment-dat; aceasta este de tip mulți-clienți-deodată, fără efort suplimentar.

9.14.3. Lucru concurent alături de rețelistică

Marele avantaj este combinarea rețelisticii cu restul activității camerei în aceeași buclă. Camera poate capta un cadru, poate rula procesarea imaginii și poate deservi un protocol de rețea, toate întrețesute:

import asyncio

async def capture_loop():
    while True:
        img = await camera.snapshot()
        # process img ...
        await asyncio.sleep_ms(100)

async def handle(reader, writer):
    ...

async def main():
    server = await asyncio.start_server(handle, "0.0.0.0", 9000)
    await asyncio.gather(
        server.serve_forever(),
        capture_loop(),
    )

asyncio.run(main())

asyncio.gather() rulează cele două corutine pe aceeași buclă de evenimente. În timp ce camera doarme în sleep_ms() între cadre, serverul poate distribui traficul de rețea. În timp ce serverul așteaptă următorul octet, camera poate capta. Ambele avansează pe un singur fir MicroPython.

9.14.4. UDP cu asyncio

Modulul asyncio nu oferă aceleași stream-uri de nivel înalt pentru UDP – datagramele nu se potrivesc formei de citire/scriere a unui stream. Abordarea practică pe cameră este să plasați activitatea UDP în propria sa corutină, să comutați socketul în mod neblocant și să cedați controlul buclei de evenimente între încercările de citire:

import asyncio
import socket

async def udp_listener(port):
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.setblocking(False)
    s.bind(("0.0.0.0", port))

    while True:
        try:
            data, src = s.recvfrom(1024)
        except OSError:
            await asyncio.sleep_ms(10)
            continue
        print("got", data, "from", src)

Socketul este setat ca neblocant cu s.setblocking(False), astfel încât recvfrom() ridică imediat OSError atunci când nicio datagramă nu așteaptă, în loc să blocheze întreaga buclă de evenimente. Apelul await asyncio.sleep_ms(10) din ramura goală predă controlul înapoi buclei de evenimente până la următoarea verificare.

Trimiterea urmează aceeași formă: sendto() pe un socket neblocant fie reușește imediat, fie ridică o excepție. Nu există sendallto – datagramele UDP sunt atomice, așa că fiecare trimitere este o datagramă întreagă sau niciuna. Dacă tamponul de trimitere este plin, mișcarea corectă pentru UDP este de obicei să se renunțe la datagramă și să se lase următoarea să iasă la următoarea parcurgere a buclei:

async def udp_telemetry(target_addr, period_ms):
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.setblocking(False)

    while True:
        payload = collect_telemetry()
        try:
            s.sendto(payload, target_addr)
        except OSError:
            pass                # buffer full -- skip this one

        await asyncio.sleep_ms(period_ms)

Ramura de eșec este rară în practică. UDP nu are control de flux, așa că sendto() reușește aproape întotdeauna din prima încercare; ramura except există în principal pentru ca o scurtă perturbare a rețelei să nu blocheze corutina.

Secțiunea Asyncio acoperă modelele mai generale pentru combinarea I/O blocant într-un program asyncio; aceleași modele se aplică direct unui socket UDP.

9.14.5. Timeout-uri și anulare

Încapsularea unui apel de rețea în asyncio.wait_for() îi impune un termen-limită:

try:
    reply = await asyncio.wait_for(reader.readline(), timeout=2.0)
except asyncio.TimeoutError:
    print("server is slow")

O corutină care durează prea mult poate fi de asemenea anulată cu cancel() din altă parte. Ambele mecanisme sunt acoperite în detaliu în capitolul despre coordonare; ele se aplică neschimbat stream-urilor returnate de asyncio.open_connection() și asyncio.start_server().

Pentru referința completă Stream (clasa din spatele reader-elor și writer-elor, plus funcțiile ajutătoare folosite în treacăt pe această pagină), consultați asyncio — planificator de I/O asincron.