9.14. Gniazda z asyncio

Blokujące wywołanie recv() zamraża cały skrypt do czasu nadejścia bajtu. Blokujące wywołanie accept() obsługuje tylko jednego klienta naraz. Oba te przypadki to dokładnie ten rodzaj sytuacji „czekania na wejście/wyjście”, do której obsługi istnieje asyncio. Rozdział o asyncio omawia pętlę zdarzeń, korutyny i prymitywy synchronizacji; ta strona opisuje elementy charakterystyczne dla sieci.

Moduł asyncio udostępnia obsługę sieci poprzez niewielką liczbę pomocników, które przyjmują i zwracają strumienie – obiekty wysokiego poziomu opakowujące gniazdo i oferujące wersje odczytu i zapisu, na które można wykonać await. Bazowe gniazdo nadal istnieje; aplikacja po prostu nie dotyka go bezpośrednio.

9.14.1. Klient z asyncio

asyncio.open_connection() to asyncio-wy odpowiednik socket.socket.connect(). Otwiera połączenie TCP i zwraca dwa obiekty strumienia: reader i writer

import asyncio

async def client():
    reader, writer = await asyncio.open_connection("192.168.1.20", 9000)

    writer.write(b"hello\n")
    await writer.drain()                   # wait until bytes have been sent

    reply = await reader.readline()
    print("reply:", reply)

    writer.close()
    await writer.wait_closed()

asyncio.run(client())

Trzy rzeczy warte odnotowania:

  • Konfiguracja połączenia to jedno await zamiast wywołania blokującego. Podczas gdy uzgadnianie jest w toku, pętla zdarzeń może swobodnie wykonywać inne korutyny.

  • write() umieszcza bajty w buforze wychodzącym; drain() to await, które oddaje sterowanie pętli, dopóki te bajty nie zostaną faktycznie wysłane przez sieć.

  • readline() czyta bajty do momentu nadejścia znaku nowej linii. Klasa strumienia zawiera również read() (czyta do N bajtów) oraz readexactly() (czyta dokładnie N bajtów), które rozwiązują problem granic wiadomości w TCP bez ręcznego pisania pętli ramkowania.

9.14.2. Serwer z asyncio

asyncio.start_server() to asyncio-wy odpowiednik sekwencji bind/listen/accept. Przyjmuje wywołanie zwrotne, które zostanie uruchomione raz na każde przychodzące połączenie, z tą samą parą reader/writer, której używa strona klienta:

import asyncio

async def handle(reader, writer):
    addr = writer.get_extra_info("peername")
    print("connection from", addr)

    while True:
        data = await reader.read(1024)
        if not data:
            break
        writer.write(data)                 # echo back
        await writer.drain()

    writer.close()
    await writer.wait_closed()

async def main():
    server = await asyncio.start_server(handle, "0.0.0.0", 9000)
    print("listening on", server.sockets[0].getsockname())
    async with server:
        await server.serve_forever()

asyncio.run(main())

Każde zaakceptowane połączenie staje się własnym zadaniem uruchamiającym handle. Pętla zdarzeń naturalnie przełącza się między nimi – jeden powolny klient nie może zablokować pozostałych, ponieważ gdy czeka on na await reader.read(...), pętla może swobodnie posuwać naprzód każde inne połączenie. Dodanie dziesięciu jednoczesnych klientów nie wymaga dziesięciu wątków; ta sama jednowątkowa pętla zdarzeń obsługuje je wszystkie.

To praktyczny powód, dla którego aplikacje sieciowe dla kamery napisane z użyciem asyncio skalują się znacznie lepiej niż równoważny kod blokujący: obraz serwera na Gniazda TCP obsługiwał jednego klienta naraz; ten obsługuje wielu klientów jednocześnie bez dodatkowego wysiłku.

9.14.3. Praca równoległa obok obsługi sieci

Największą korzyścią jest łączenie obsługi sieci z resztą pracy kamery w tej samej pętli. Kamera może przechwycić ramkę, wykonać przetwarzanie obrazu oraz obsłużyć protokół sieciowy, wszystko z przeplotem:

import asyncio

async def capture_loop():
    while True:
        img = await camera.snapshot()
        # process img ...
        await asyncio.sleep_ms(100)

async def handle(reader, writer):
    ...

async def main():
    server = await asyncio.start_server(handle, "0.0.0.0", 9000)
    await asyncio.gather(
        server.serve_forever(),
        capture_loop(),
    )

asyncio.run(main())

asyncio.gather() uruchamia obie korutyny w tej samej pętli zdarzeń. Podczas gdy kamera śpi w sleep_ms() między ramkami, serwer może rozdzielać ruch sieciowy. Podczas gdy serwer czeka na kolejny bajt, kamera może przechwytywać. Oba posuwają się naprzód na jednym wątku MicroPython.

9.14.4. UDP z asyncio

Moduł asyncio nie oferuje tych samych wysokopoziomowych strumieni dla UDP – datagramy nie pasują do modelu odczytu/zapisu strumienia. Praktycznym podejściem na kamerze jest umieszczenie pracy z UDP we własnej korutynie, przełączenie gniazda w tryb nieblokujący i oddawanie sterowania pętli zdarzeń między próbami odczytu:

import asyncio
import socket

async def udp_listener(port):
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.setblocking(False)
    s.bind(("0.0.0.0", port))

    while True:
        try:
            data, src = s.recvfrom(1024)
        except OSError:
            await asyncio.sleep_ms(10)
            continue
        print("got", data, "from", src)

Gniazdo jest ustawione jako nieblokujące za pomocą s.setblocking(False), więc recvfrom() natychmiast zgłasza OSError, gdy nie czeka żaden datagram, zamiast blokować całą pętlę zdarzeń. await asyncio.sleep_ms(10) w pustej gałęzi oddaje sterowanie pętli zdarzeń aż do kolejnego odpytania.

Wysyłanie wygląda tak samo: sendto() na gnieździe nieblokującym albo od razu się powodzi, albo zgłasza wyjątek. Nie ma sendallto – datagramy UDP są atomowe, więc każde wysłanie to jeden cały datagram albo żaden. Jeśli bufor wysyłania jest pełny, właściwym posunięciem dla UDP jest zwykle porzucenie datagramu i wysłanie kolejnego przy następnym przejściu przez pętlę:

async def udp_telemetry(target_addr, period_ms):
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.setblocking(False)

    while True:
        payload = collect_telemetry()
        try:
            s.sendto(payload, target_addr)
        except OSError:
            pass                # buffer full -- skip this one

        await asyncio.sleep_ms(period_ms)

Gałąź niepowodzenia jest w praktyce rzadka. UDP nie ma kontroli przepływu, więc sendto() niemal zawsze powodzi się za pierwszym razem; klauzula except istnieje głównie po to, aby krótka usterka sieciowa nie spowodowała awarii korutyny.

Sekcja Asyncio omawia szersze wzorce łączenia blokującego wejścia/wyjścia z programem asyncio; te same wzorce stosują się bezpośrednio do gniazda UDP.

9.14.5. Limity czasu i anulowanie

Opakowanie wywołania sieciowego w asyncio.wait_for() nakłada na nie termin:

try:
    reply = await asyncio.wait_for(reader.readline(), timeout=2.0)
except asyncio.TimeoutError:
    print("server is slow")

Korutyna, która trwa zbyt długo, może też zostać anulowana z zewnątrz za pomocą cancel(). Oba mechanizmy omówiono szczegółowo w rozdziale o koordynacji; stosują się one bez zmian do strumieni zwracanych przez asyncio.open_connection() i asyncio.start_server().

Pełny opis Stream (klasy stojącej za readerami i writerami oraz pomocników użytych mimochodem na tej stronie) znajdziesz w asyncio — asynchroniczny harmonogram operacji we/wy.