9.14. Utičnice s asyncio

Blokirajući poziv recv() zamrzava cijelu skriptu dok ne stigne bajt. Blokirajući poziv accept() poslužuje samo jednog klijenta odjednom. Oboje je upravo onaj tip situacije „čekanja na U/I” za koju asyncio i postoji. Poglavlje o asyncio pokriva petlju događaja, korutine i primitive sinkronizacije; ova stranica pokriva dijelove specifične za mreže.

Modul asyncio izlaže umrežavanje kroz mali broj pomoćnih funkcija koje primaju i vraćaju tokove (streams) – objekte visoke razine koji omataju utičnicu i nude await-abilne verzije čitanja i pisanja. Temeljna utičnica i dalje je tu; aplikacija je samo ne dira izravno.

9.14.1. Klijent s asyncio

asyncio.open_connection() je asyncio pandan za socket.socket.connect(). Otvara TCP vezu i vraća dva objekta toka: čitač i pisač

import asyncio

async def client():
    reader, writer = await asyncio.open_connection("192.168.1.20", 9000)

    writer.write(b"hello\n")
    await writer.drain()                   # wait until bytes have been sent

    reply = await reader.readline()
    print("reply:", reply)

    writer.close()
    await writer.wait_closed()

asyncio.run(client())

Tri stvari koje treba primijetiti:

  • Postavljanje veze jedan je await umjesto blokirajućeg poziva. Dok je rukovanje u tijeku, petlja događaja slobodna je izvoditi druge korutine.

  • write() stavlja bajtove u odlazni međuspremnik; drain() je await koji prepušta kontrolu petlji dok ti bajtovi zaista ne budu poslani preko mreže.

  • readline() čita bajtove dok ne stigne znak novog retka. Klasa toka uključuje i read() (čita do N bajtova) i readexactly() (čita točno N bajtova), koje rješavaju TCP-ov problem granica poruka bez ručnog pisanja petlji za uokvirivanje.

9.14.2. Poslužitelj s asyncio

asyncio.start_server() je asyncio pandan plesu bind/listen/accept. Prima povratni poziv koji će se izvesti jednom po dolaznoj vezi, s istim parom čitač/pisač koji koristi klijentska strana:

import asyncio

async def handle(reader, writer):
    addr = writer.get_extra_info("peername")
    print("connection from", addr)

    while True:
        data = await reader.read(1024)
        if not data:
            break
        writer.write(data)                 # echo back
        await writer.drain()

    writer.close()
    await writer.wait_closed()

async def main():
    server = await asyncio.start_server(handle, "0.0.0.0", 9000)
    print("listening on", server.sockets[0].getsockname())
    async with server:
        await server.serve_forever()

asyncio.run(main())

Svaka prihvaćena veza postaje vlastiti zadatak koji izvodi handle. Petlja događaja prirodno raspoređuje između njih – jedan spori klijent ne može blokirati ostale, jer dok čeka na await reader.read(...) petlja je slobodna napredovati na svakoj drugoj vezi. Dodavanje deset istovremenih klijenata ne zahtijeva deset dretvi; ista jednodretvena petlja događaja pokreće ih sve.

Ovo je praktičan razlog zašto se aplikacije za umrežavanje kamera pisane za asyncio skaliraju mnogo bolje od ekvivalentnog blokirajućeg koda: slika poslužitelja na TCP utičnice bila je jedan-klijent-odjednom; ova je mnogo-klijenata-istovremeno bez dodatnog napora.

9.14.3. Istovremeni rad uz umrežavanje

Velika dobit je miješanje umrežavanja s ostatkom posla kamere u istoj petlji. Kamera može snimiti sličicu, obraditi sliku, i posluživati mrežni protokol, sve isprepleteno:

import asyncio

async def capture_loop():
    while True:
        img = await camera.snapshot()
        # process img ...
        await asyncio.sleep_ms(100)

async def handle(reader, writer):
    ...

async def main():
    server = await asyncio.start_server(handle, "0.0.0.0", 9000)
    await asyncio.gather(
        server.serve_forever(),
        capture_loop(),
    )

asyncio.run(main())

asyncio.gather() izvodi dvije korutine na istoj petlji događaja. Dok kamera spava u sleep_ms() između sličica, poslužitelj može raspoređivati mrežni promet. Dok poslužitelj čeka sljedeći bajt, kamera može snimati. Oboje napreduje na jednoj MicroPython dretvi.

9.14.4. UDP s asyncio

Modul asyncio ne nudi iste tokove visoke razine za UDP – datagrami ne odgovaraju obliku čitanja/pisanja toka. Praktičan pristup na kameri jest staviti UDP posao u vlastitu korutinu, prebaciti utičnicu u neblokirajući način rada i prepustiti kontrolu petlji događaja između pokušaja čitanja:

import asyncio
import socket

async def udp_listener(port):
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.setblocking(False)
    s.bind(("0.0.0.0", port))

    while True:
        try:
            data, src = s.recvfrom(1024)
        except OSError:
            await asyncio.sleep_ms(10)
            continue
        print("got", data, "from", src)

Utičnica se postavlja kao neblokirajuća pomoću s.setblocking(False), pa recvfrom() odmah baca OSError kad nijedan datagram ne čeka umjesto da blokira cijelu petlju događaja. await asyncio.sleep_ms(10) u praznoj grani vraća kontrolu petlji događaja do sljedeće provjere.

Slanje slijedi isti oblik: sendto() na neblokirajućoj utičnici ili odmah uspijeva ili baca iznimku. Ne postoji sendallto – UDP datagrami su atomski, pa je svako slanje jedan cijeli datagram ili nijedan. Ako je međuspremnik za slanje pun, ispravan potez za UDP obično je odbaciti datagram i pustiti sljedeći da izađe pri sljedećem prolazu kroz petlju:

async def udp_telemetry(target_addr, period_ms):
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.setblocking(False)

    while True:
        payload = collect_telemetry()
        try:
            s.sendto(payload, target_addr)
        except OSError:
            pass                # buffer full -- skip this one

        await asyncio.sleep_ms(period_ms)

Grana s neuspjehom rijetka je u praksi. UDP nema kontrolu protoka, pa sendto() gotovo uvijek uspijeva iz prvog pokušaja; except uglavnom postoji kako kratak mrežni zastoj ne bi srušio korutinu.

Odjeljak Asyncio pokriva šire obrasce za miješanje blokirajućeg U/I u asyncio program; isti obrasci izravno se primjenjuju na UDP utičnicu.

9.14.5. Vremenska ograničenja i otkazivanje

Omatanje mrežnog poziva u asyncio.wait_for() postavlja mu rok:

try:
    reply = await asyncio.wait_for(reader.readline(), timeout=2.0)
except asyncio.TimeoutError:
    print("server is slow")

Korutina koja predugo traje također može biti otkazana (cancel()) s drugog mjesta. Oba mehanizma detaljno su obrađena u poglavlju o koordinaciji; primjenjuju se nepromijenjeno na tokove koje vraćaju asyncio.open_connection() i asyncio.start_server().

Za potpunu referencu klase Stream (klase iza čitača i pisača, uz pomoćne funkcije koje je ova stranica usput koristila), pogledajte asyncio — asinkroni I/O planer.