9.14. Socketek asyncióval¶
Egy blokkoló recv() hívás befagyasztja az egész szkriptet, amíg meg nem érkezik egy bájt. Egy blokkoló accept() hívás egyszerre csak egy klienst szolgál ki. Mindkettő pontosan az a fajta „várakozás I/O-ra” helyzet, amelynek kezelésére az asyncio létezik. Az asyncio fejezet lefedi az eseményhurkot, a korutinokat és a szinkronizációs primitíveket; ez az oldal a hálózatspecifikus részeket tárgyalja.
Az asyncio modul a hálózatkezelést néhány segédfüggvényen keresztül teszi elérhetővé, amelyek stream-eket vesznek át és adnak vissza – magas szintű objektumokat, amelyek egy socketet csomagolnak be, és az olvasás és írás await-elhető változatait kínálják. Az alapul szolgáló socket továbbra is ott van; az alkalmazás csak nem nyúl hozzá közvetlenül.
9.14.1. Egy kliens asyncióval¶
Az asyncio.open_connection() az socket.socket.connect() asyncio megfelelője. TCP-kapcsolatot nyit, és két stream objektumot ad vissza: egy reader-t és egy writer-t:
import asyncio
async def client():
reader, writer = await asyncio.open_connection("192.168.1.20", 9000)
writer.write(b"hello\n")
await writer.drain() # wait until bytes have been sent
reply = await reader.readline()
print("reply:", reply)
writer.close()
await writer.wait_closed()
asyncio.run(client())
Három dolgot érdemes megjegyezni:
A kapcsolat felépítése egyetlen
awaitegy blokkoló hívás helyett. Amíg a kézfogás folyamatban van, az eseményhurok szabadon futtathat más korutinokat.A
write()bájtokat helyez egy kimenő pufferbe; adrain()az azawait, amely átadja a vezérlést a huroknak, amíg ezek a bájtok ténylegesen ki nem mentek a hálózaton.A
readline()addig olvas bájtokat, amíg egy újsor nem érkezik. A stream osztály tartalmazza aread()(legfeljebb N bájt olvasása) és areadexactly()(pontosan N bájt olvasása) metódusokat is, amelyek megoldják a TCP üzenethatár-problémáját anélkül, hogy kézzel kellene megírni a keretező hurkokat.
9.14.2. Egy szerver asyncióval¶
Az asyncio.start_server() a bind/listen/accept tánc asyncio megfelelője. Egy visszahívást vesz át, amely beérkező kapcsolatonként egyszer fut le, ugyanazzal a reader/writer párral, amelyet a kliensoldal használ:
import asyncio
async def handle(reader, writer):
addr = writer.get_extra_info("peername")
print("connection from", addr)
while True:
data = await reader.read(1024)
if not data:
break
writer.write(data) # echo back
await writer.drain()
writer.close()
await writer.wait_closed()
async def main():
server = await asyncio.start_server(handle, "0.0.0.0", 9000)
print("listening on", server.sockets[0].getsockname())
async with server:
await server.serve_forever()
asyncio.run(main())
Minden elfogadott kapcsolat saját, handle függvényt futtató feladattá válik. Az eseményhurok természetesen váltogat közöttük – egy lassú kliens nem tudja blokkolni a többit, mert amíg az az await reader.read(...) hívásra vár, a hurok szabadon haladhat előre minden más kapcsolaton. Tíz egyidejű kliens hozzáadása nem igényel tíz szálat; ugyanaz az egyszálú eseményhurok hajtja mindet.
Ez a gyakorlati oka annak, hogy az asyncióra írt kamerás hálózati alkalmazások sokkal jobban skálázódnak, mint az ezzel egyenértékű blokkoló kód: a TCP socketek oldalon a szerverkép egyszerre egy klienses volt; ez egyszerre sok klienses, mindenféle többletmunka nélkül.
9.14.3. Párhuzamos munka a hálózatkezelés mellett¶
A nagy nyereség az, hogy a hálózatkezelést a kamera többi munkájával keverhetjük ugyanabban a hurokban. A kamera képkockát rögzíthet, képfeldolgozást futtathat, és kiszolgálhat egy hálózati protokollt, mindezt egymásba fűzve:
import asyncio
async def capture_loop():
while True:
img = await camera.snapshot()
# process img ...
await asyncio.sleep_ms(100)
async def handle(reader, writer):
...
async def main():
server = await asyncio.start_server(handle, "0.0.0.0", 9000)
await asyncio.gather(
server.serve_forever(),
capture_loop(),
)
asyncio.run(main())
Az asyncio.gather() a két korutint ugyanazon az eseményhurkon futtatja. Amíg a kamera a sleep_ms() hívásban alszik két képkocka között, a szerver dolgozhatja fel a hálózati forgalmat. Amíg a szerver a következő bájtra vár, a kamera rögzíthet. Mindkettő egyetlen MicroPython szálon halad előre.
9.14.4. UDP asyncióval¶
Az asyncio modul nem kínálja ugyanazokat a magas szintű streameket UDP-hez – a datagramok nem illeszkednek egy stream olvasás/írás formájába. A gyakorlati megközelítés a kamerán az, hogy az UDP-munkát saját korutinba tesszük, a socketet nem blokkoló módba kapcsoljuk, és az olvasási kísérletek között átadjuk a vezérlést az eseményhuroknak:
import asyncio
import socket
async def udp_listener(port):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.setblocking(False)
s.bind(("0.0.0.0", port))
while True:
try:
data, src = s.recvfrom(1024)
except OSError:
await asyncio.sleep_ms(10)
continue
print("got", data, "from", src)
A socketet az s.setblocking(False) állítja nem blokkolóra, így a recvfrom() azonnal OSError kivételt vált ki, amikor nincs datagram várakozóban, ahelyett hogy blokkolná az egész eseményhurkot. Az üres ágban lévő await asyncio.sleep_ms(10) visszaadja a vezérlést az eseményhuroknak a következő lekérdezésig.
A küldés ugyanezt a formát követi: a sendto() egy nem blokkoló socketen vagy azonnal sikerül, vagy kivételt vált ki. Nincs sendallto – az UDP-datagramok atomiak, így minden küldés egy teljes datagram, vagy egy sem. Ha a küldési puffer megtelt, UDP esetén általában az a helyes lépés, hogy eldobjuk a datagramot, és hagyjuk, hogy a következő a hurok következő körében menjen ki:
async def udp_telemetry(target_addr, period_ms):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.setblocking(False)
while True:
payload = collect_telemetry()
try:
s.sendto(payload, target_addr)
except OSError:
pass # buffer full -- skip this one
await asyncio.sleep_ms(period_ms)
A hibázó ág a gyakorlatban ritka. Az UDP-nek nincs folyamszabályozása, így a sendto() szinte mindig sikerül az első próbálkozásra; az except többnyire azért létezik, hogy egy rövid hálózati zökkenő ne omlasztsa össze a korutint.
Az Asyncio szakasz a blokkoló I/O asyncio programba keverésének tágabb mintáit tárgyalja; ugyanezek a minták közvetlenül alkalmazhatók egy UDP socketre.
9.14.5. Időtúllépések és megszakítás¶
Egy hálózati hívás asyncio.wait_for() függvénybe csomagolása határidőt szab rá:
try:
reply = await asyncio.wait_for(reader.readline(), timeout=2.0)
except asyncio.TimeoutError:
print("server is slow")
Egy túl sokáig tartó korutint máshonnan is cancel()-elni lehet. Mindkét mechanizmust részletesen tárgyalja a koordinációs fejezet; változatlanul alkalmazhatók az asyncio.open_connection() és az asyncio.start_server() által visszaadott streamekre.
A teljes Stream referenciáért (a readerek és writerek mögött álló osztályért, valamint az ezen az oldalon mellékesen használt segédfüggvényekért) lásd: asyncio — aszinkron I/O ütemező.