9.14. Sockety s asyncio

Blokující volání recv() zmrazí celý skript, dokud nedorazí byte. Blokující volání accept() obsluhuje vždy jen jednoho klienta. Obě tyto situace jsou přesně tím druhem „čekání na I/O“, k jehož řešení asyncio existuje. Kapitola o asyncio pokrývá smyčku událostí, korutiny a synchronizační primitiva; tato stránka pokrývá části specifické pro síť.

Modul asyncio zpřístupňuje síťování prostřednictvím malého počtu pomocníků, kteří přijímají a vracejí streamy – objekty vysoké úrovně, které obalují socket a nabízejí await-ovatelné verze čtení a zápisu. Podkladový socket tam stále je; aplikace se ho jen přímo nedotýká.

9.14.1. Klient s asyncio

asyncio.open_connection() je protějškem socket.socket.connect() v asyncio. Otevře TCP spojení a vrátí dva objekty streamu: reader a writer

import asyncio

async def client():
    reader, writer = await asyncio.open_connection("192.168.1.20", 9000)

    writer.write(b"hello\n")
    await writer.drain()                   # wait until bytes have been sent

    reply = await reader.readline()
    print("reply:", reply)

    writer.close()
    await writer.wait_closed()

asyncio.run(client())

Tři věci, kterých si všimnout:

  • Navázání spojení je jediné await místo blokujícího volání. Zatímco handshake probíhá, smyčka událostí může spouštět jiné korutiny.

  • write() vkládá byty do odchozího bufferu; drain() je ono await, které předá řízení smyčce, dokud tyto byty nejsou skutečně odeslány po síti.

  • readline() čte byty, dokud nedorazí znak nového řádku. Třída streamu zahrnuje také read() (přečte až N bytů) a readexactly() (přečte přesně N bytů), které řeší problém hranic zpráv v TCP, aniž byste museli rámovací smyčky psát ručně.

9.14.2. Server s asyncio

asyncio.start_server() je protějškem tance bind/listen/accept v asyncio. Přijímá callback, který se spustí jednou pro každé příchozí spojení, se stejnou dvojicí reader/writer, jakou používá klientská strana:

import asyncio

async def handle(reader, writer):
    addr = writer.get_extra_info("peername")
    print("connection from", addr)

    while True:
        data = await reader.read(1024)
        if not data:
            break
        writer.write(data)                 # echo back
        await writer.drain()

    writer.close()
    await writer.wait_closed()

async def main():
    server = await asyncio.start_server(handle, "0.0.0.0", 9000)
    print("listening on", server.sockets[0].getsockname())
    async with server:
        await server.serve_forever()

asyncio.run(main())

Každé přijaté spojení se stává vlastní úlohou spouštějící handle. Smyčka událostí mezi nimi přirozeně přepíná – jeden pomalý klient nemůže blokovat ostatní, protože zatímco čeká na await reader.read(...), smyčka může pokračovat v práci na každém dalším spojení. Přidání deseti souběžných klientů nevyžaduje deset vláken; všechny pohání tatáž jednovláknová smyčka událostí.

To je praktický důvod, proč se kamerové síťové aplikace napsané pro asyncio škálují mnohem lépe než ekvivalentní blokující kód: obrázek serveru na TCP sokety zvládal jednoho klienta naráz; tento zvládá mnoho klientů naráz bez jakéhokoli úsilí navíc.

9.14.3. Souběžná práce vedle síťování

Velkou výhodou je míchání síťování se zbytkem práce kamery ve stejné smyčce. Kamera může pořizovat snímek, provádět zpracování obrazu a obsluhovat síťový protokol, vše prokládaně:

import asyncio

async def capture_loop():
    while True:
        img = await camera.snapshot()
        # process img ...
        await asyncio.sleep_ms(100)

async def handle(reader, writer):
    ...

async def main():
    server = await asyncio.start_server(handle, "0.0.0.0", 9000)
    await asyncio.gather(
        server.serve_forever(),
        capture_loop(),
    )

asyncio.run(main())

asyncio.gather() spouští obě korutiny na stejné smyčce událostí. Zatímco kamera spí v sleep_ms() mezi snímky, server dostává prostor odbavovat síťový provoz. Zatímco server čeká na další byte, kamera dostává prostor pořizovat snímek. Oba postupují vpřed na jediném vláknu MicroPythonu.

9.14.4. UDP s asyncio

Modul asyncio nenabízí stejné streamy vysoké úrovně pro UDP – datagramy se nehodí do podoby čtení/zápisu streamu. Praktický přístup na kameře je umístit práci s UDP do vlastní korutiny, přepnout socket do neblokujícího režimu a mezi pokusy o čtení předávat řízení smyčce událostí:

import asyncio
import socket

async def udp_listener(port):
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.setblocking(False)
    s.bind(("0.0.0.0", port))

    while True:
        try:
            data, src = s.recvfrom(1024)
        except OSError:
            await asyncio.sleep_ms(10)
            continue
        print("got", data, "from", src)

Socket je nastaven jako neblokující pomocí s.setblocking(False), takže recvfrom() vyvolá OSError okamžitě, když žádný datagram nečeká, místo aby blokoval celou smyčku událostí. await asyncio.sleep_ms(10) v prázdné větvi předá řízení zpět smyčce událostí až do dalšího dotazu.

Odesílání má stejnou podobu: sendto() na neblokujícím socketu buď okamžitě uspěje, nebo vyvolá výjimku. Neexistuje žádné sendallto – UDP datagramy jsou atomické, takže každé odeslání je buď celý datagram, nebo žádný. Pokud je odesílací buffer plný, správným krokem u UDP je obvykle datagram zahodit a nechat ten další odejít při příštím průchodu smyčkou:

async def udp_telemetry(target_addr, period_ms):
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.setblocking(False)

    while True:
        payload = collect_telemetry()
        try:
            s.sendto(payload, target_addr)
        except OSError:
            pass                # buffer full -- skip this one

        await asyncio.sleep_ms(period_ms)

Selhávající větev je v praxi vzácná. UDP nemá řízení toku, takže sendto() téměř vždy uspěje na první pokus; except existuje hlavně proto, aby krátkodobá síťová porucha korutinu neshodila.

Sekce Asyncio pokrývá širší vzory pro míchání blokujícího I/O do programu s asyncio; stejné vzory platí přímo i pro UDP socket.

9.14.5. Časové limity a zrušení

Obalení síťového volání do asyncio.wait_for() mu nastaví časový limit:

try:
    reply = await asyncio.wait_for(reader.readline(), timeout=2.0)
except asyncio.TimeoutError:
    print("server is slow")

Korutinu, která trvá příliš dlouho, lze také cancel()-novat odjinud. Oba mechanismy jsou podrobně popsány v kapitole o koordinaci; platí beze změny pro streamy vracené asyncio.open_connection() a asyncio.start_server().

Úplnou referenci Stream (třída za readery a writery plus pomocníci, které tato stránka mimochodem použila) najdete v asyncio — asynchronní I/O plánovač.