9.14. Socketek asyncióval

Egy blokkoló recv() hívás befagyasztja az egész szkriptet, amíg meg nem érkezik egy bájt. Egy blokkoló accept() hívás egyszerre csak egy klienst szolgál ki. Mindkettő pontosan az a fajta „várakozás I/O-ra” helyzet, amelynek kezelésére az asyncio létezik. Az asyncio fejezet lefedi az eseményhurkot, a korutinokat és a szinkronizációs primitíveket; ez az oldal a hálózatspecifikus részeket tárgyalja.

Az asyncio modul a hálózatkezelést néhány segédfüggvényen keresztül teszi elérhetővé, amelyek stream-eket vesznek át és adnak vissza – magas szintű objektumokat, amelyek egy socketet csomagolnak be, és az olvasás és írás await-elhető változatait kínálják. Az alapul szolgáló socket továbbra is ott van; az alkalmazás csak nem nyúl hozzá közvetlenül.

9.14.1. Egy kliens asyncióval

Az asyncio.open_connection() az socket.socket.connect() asyncio megfelelője. TCP-kapcsolatot nyit, és két stream objektumot ad vissza: egy reader-t és egy writer-t:

import asyncio

async def client():
    reader, writer = await asyncio.open_connection("192.168.1.20", 9000)

    writer.write(b"hello\n")
    await writer.drain()                   # wait until bytes have been sent

    reply = await reader.readline()
    print("reply:", reply)

    writer.close()
    await writer.wait_closed()

asyncio.run(client())

Három dolgot érdemes megjegyezni:

  • A kapcsolat felépítése egyetlen await egy blokkoló hívás helyett. Amíg a kézfogás folyamatban van, az eseményhurok szabadon futtathat más korutinokat.

  • A write() bájtokat helyez egy kimenő pufferbe; a drain() az az await, amely átadja a vezérlést a huroknak, amíg ezek a bájtok ténylegesen ki nem mentek a hálózaton.

  • A readline() addig olvas bájtokat, amíg egy újsor nem érkezik. A stream osztály tartalmazza a read() (legfeljebb N bájt olvasása) és a readexactly() (pontosan N bájt olvasása) metódusokat is, amelyek megoldják a TCP üzenethatár-problémáját anélkül, hogy kézzel kellene megírni a keretező hurkokat.

9.14.2. Egy szerver asyncióval

Az asyncio.start_server() a bind/listen/accept tánc asyncio megfelelője. Egy visszahívást vesz át, amely beérkező kapcsolatonként egyszer fut le, ugyanazzal a reader/writer párral, amelyet a kliensoldal használ:

import asyncio

async def handle(reader, writer):
    addr = writer.get_extra_info("peername")
    print("connection from", addr)

    while True:
        data = await reader.read(1024)
        if not data:
            break
        writer.write(data)                 # echo back
        await writer.drain()

    writer.close()
    await writer.wait_closed()

async def main():
    server = await asyncio.start_server(handle, "0.0.0.0", 9000)
    print("listening on", server.sockets[0].getsockname())
    async with server:
        await server.serve_forever()

asyncio.run(main())

Minden elfogadott kapcsolat saját, handle függvényt futtató feladattá válik. Az eseményhurok természetesen váltogat közöttük – egy lassú kliens nem tudja blokkolni a többit, mert amíg az az await reader.read(...) hívásra vár, a hurok szabadon haladhat előre minden más kapcsolaton. Tíz egyidejű kliens hozzáadása nem igényel tíz szálat; ugyanaz az egyszálú eseményhurok hajtja mindet.

Ez a gyakorlati oka annak, hogy az asyncióra írt kamerás hálózati alkalmazások sokkal jobban skálázódnak, mint az ezzel egyenértékű blokkoló kód: a TCP socketek oldalon a szerverkép egyszerre egy klienses volt; ez egyszerre sok klienses, mindenféle többletmunka nélkül.

9.14.3. Párhuzamos munka a hálózatkezelés mellett

A nagy nyereség az, hogy a hálózatkezelést a kamera többi munkájával keverhetjük ugyanabban a hurokban. A kamera képkockát rögzíthet, képfeldolgozást futtathat, és kiszolgálhat egy hálózati protokollt, mindezt egymásba fűzve:

import asyncio

async def capture_loop():
    while True:
        img = await camera.snapshot()
        # process img ...
        await asyncio.sleep_ms(100)

async def handle(reader, writer):
    ...

async def main():
    server = await asyncio.start_server(handle, "0.0.0.0", 9000)
    await asyncio.gather(
        server.serve_forever(),
        capture_loop(),
    )

asyncio.run(main())

Az asyncio.gather() a két korutint ugyanazon az eseményhurkon futtatja. Amíg a kamera a sleep_ms() hívásban alszik két képkocka között, a szerver dolgozhatja fel a hálózati forgalmat. Amíg a szerver a következő bájtra vár, a kamera rögzíthet. Mindkettő egyetlen MicroPython szálon halad előre.

9.14.4. UDP asyncióval

Az asyncio modul nem kínálja ugyanazokat a magas szintű streameket UDP-hez – a datagramok nem illeszkednek egy stream olvasás/írás formájába. A gyakorlati megközelítés a kamerán az, hogy az UDP-munkát saját korutinba tesszük, a socketet nem blokkoló módba kapcsoljuk, és az olvasási kísérletek között átadjuk a vezérlést az eseményhuroknak:

import asyncio
import socket

async def udp_listener(port):
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.setblocking(False)
    s.bind(("0.0.0.0", port))

    while True:
        try:
            data, src = s.recvfrom(1024)
        except OSError:
            await asyncio.sleep_ms(10)
            continue
        print("got", data, "from", src)

A socketet az s.setblocking(False) állítja nem blokkolóra, így a recvfrom() azonnal OSError kivételt vált ki, amikor nincs datagram várakozóban, ahelyett hogy blokkolná az egész eseményhurkot. Az üres ágban lévő await asyncio.sleep_ms(10) visszaadja a vezérlést az eseményhuroknak a következő lekérdezésig.

A küldés ugyanezt a formát követi: a sendto() egy nem blokkoló socketen vagy azonnal sikerül, vagy kivételt vált ki. Nincs sendallto – az UDP-datagramok atomiak, így minden küldés egy teljes datagram, vagy egy sem. Ha a küldési puffer megtelt, UDP esetén általában az a helyes lépés, hogy eldobjuk a datagramot, és hagyjuk, hogy a következő a hurok következő körében menjen ki:

async def udp_telemetry(target_addr, period_ms):
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.setblocking(False)

    while True:
        payload = collect_telemetry()
        try:
            s.sendto(payload, target_addr)
        except OSError:
            pass                # buffer full -- skip this one

        await asyncio.sleep_ms(period_ms)

A hibázó ág a gyakorlatban ritka. Az UDP-nek nincs folyamszabályozása, így a sendto() szinte mindig sikerül az első próbálkozásra; az except többnyire azért létezik, hogy egy rövid hálózati zökkenő ne omlasztsa össze a korutint.

Az Asyncio szakasz a blokkoló I/O asyncio programba keverésének tágabb mintáit tárgyalja; ugyanezek a minták közvetlenül alkalmazhatók egy UDP socketre.

9.14.5. Időtúllépések és megszakítás

Egy hálózati hívás asyncio.wait_for() függvénybe csomagolása határidőt szab rá:

try:
    reply = await asyncio.wait_for(reader.readline(), timeout=2.0)
except asyncio.TimeoutError:
    print("server is slow")

Egy túl sokáig tartó korutint máshonnan is cancel()-elni lehet. Mindkét mechanizmust részletesen tárgyalja a koordinációs fejezet; változatlanul alkalmazhatók az asyncio.open_connection() és az asyncio.start_server() által visszaadott streamekre.

A teljes Stream referenciáért (a readerek és writerek mögött álló osztályért, valamint az ezen az oldalon mellékesen használt segédfüggvényekért) lásd: asyncio — aszinkron I/O ütemező.