9.14. Sockets mit asyncio

Ein blockierender recv()-Aufruf friert das gesamte Skript ein, bis ein Byte eintrifft. Ein blockierender accept()-Aufruf bedient immer nur einen Client gleichzeitig. Beide sind genau die Art von „Warten auf I/O“-Situation, für deren Bewältigung asyncio existiert. Das asyncio-Kapitel behandelt die Ereignisschleife, Coroutinen und die Synchronisierungsprimitive; diese Seite behandelt die netzwerkspezifischen Teile.

Das asyncio-Modul stellt Netzwerkfunktionalität über eine kleine Anzahl von Hilfsfunktionen bereit, die Streams entgegennehmen und zurückgeben – hochstufige Objekte, die einen Socket umschließen und await-bare Versionen von Lesen und Schreiben bieten. Der zugrunde liegende Socket ist weiterhin vorhanden; die Anwendung greift nur nicht direkt auf ihn zu.

9.14.1. Ein Client mit asyncio

asyncio.open_connection() ist das asyncio-Gegenstück zu socket.socket.connect(). Es öffnet eine TCP-Verbindung und gibt zwei Stream-Objekte zurück: einen Reader und einen Writer:

import asyncio

async def client():
    reader, writer = await asyncio.open_connection("192.168.1.20", 9000)

    writer.write(b"hello\n")
    await writer.drain()                   # wait until bytes have been sent

    reply = await reader.readline()
    print("reply:", reply)

    writer.close()
    await writer.wait_closed()

asyncio.run(client())

Drei Dinge sind zu beachten:

  • Der Verbindungsaufbau ist ein einziges await statt eines blockierenden Aufrufs. Während der Handshake unterwegs ist, kann die Ereignisschleife andere Coroutinen ausführen.

  • write() legt Bytes in einen ausgehenden Puffer; drain() ist das await, das an die Schleife zurückgibt, bis diese Bytes tatsächlich über das Netzwerk gesendet wurden.

  • readline() liest Bytes, bis ein Zeilenumbruch eintrifft. Die Stream-Klasse enthält außerdem read() (liest bis zu N Bytes) und readexactly() (liest genau N Bytes), die TCPs Problem der Nachrichtengrenzen lösen, ohne die Framing-Schleifen von Hand zu schreiben.

9.14.2. Ein Server mit asyncio

asyncio.start_server() ist das asyncio-Gegenstück zum bind/listen/accept-Tanz. Es nimmt ein Callback entgegen, das einmal pro eingehender Verbindung ausgeführt wird, mit demselben Reader/Writer-Paar, das die Client-Seite verwendet:

import asyncio

async def handle(reader, writer):
    addr = writer.get_extra_info("peername")
    print("connection from", addr)

    while True:
        data = await reader.read(1024)
        if not data:
            break
        writer.write(data)                 # echo back
        await writer.drain()

    writer.close()
    await writer.wait_closed()

async def main():
    server = await asyncio.start_server(handle, "0.0.0.0", 9000)
    print("listening on", server.sockets[0].getsockname())
    async with server:
        await server.serve_forever()

asyncio.run(main())

Jede angenommene Verbindung wird zu einer eigenen Task, die handle ausführt. Die Ereignisschleife verteilt zwischen ihnen ganz natürlich – ein langsamer Client kann die anderen nicht blockieren, denn während er auf await reader.read(...) wartet, kann die Schleife bei jeder anderen Verbindung Fortschritte machen. Zehn gleichzeitige Clients hinzuzufügen erfordert nicht zehn Threads; dieselbe single-threaded Ereignisschleife treibt sie alle an.

Das ist der praktische Grund, warum für asyncio geschriebene Kamera-Netzwerkanwendungen so viel besser skalieren als der entsprechende blockierende Code: Das Server-Bild auf TCP-Sockets war ein-Client-nach-dem-anderen; dieses hier ist viele-Clients-auf-einmal ohne zusätzlichen Aufwand.

9.14.3. Gleichzeitige Arbeit neben dem Netzwerk

Der große Gewinn ist die Verbindung von Netzwerk mit dem Rest der Kamera-Arbeit in derselben Schleife. Die Kamera kann ein Einzelbild aufnehmen, Bildverarbeitung ausführen und ein Netzwerkprotokoll bedienen, alles verschachtelt:

import asyncio

async def capture_loop():
    while True:
        img = await camera.snapshot()
        # process img ...
        await asyncio.sleep_ms(100)

async def handle(reader, writer):
    ...

async def main():
    server = await asyncio.start_server(handle, "0.0.0.0", 9000)
    await asyncio.gather(
        server.serve_forever(),
        capture_loop(),
    )

asyncio.run(main())

asyncio.gather() führt die beiden Coroutinen auf derselben Ereignisschleife aus. Während die Kamera in sleep_ms() zwischen Einzelbildern schläft, darf der Server Netzwerkverkehr verteilen. Während der Server auf das nächste Byte wartet, darf die Kamera aufnehmen. Beide machen Fortschritte auf einem einzigen MicroPython-Thread.

9.14.4. UDP mit asyncio

Das asyncio-Modul bietet für UDP nicht dieselben hochstufigen Streams – Datagramme passen nicht in die Lese/Schreib-Form eines Streams. Der praktische Ansatz auf der Kamera besteht darin, die UDP-Arbeit in eine eigene Coroutine zu legen, den Socket in den nicht-blockierenden Modus zu schalten und zwischen Leseversuchen an die Ereignisschleife zurückzugeben:

import asyncio
import socket

async def udp_listener(port):
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.setblocking(False)
    s.bind(("0.0.0.0", port))

    while True:
        try:
            data, src = s.recvfrom(1024)
        except OSError:
            await asyncio.sleep_ms(10)
            continue
        print("got", data, "from", src)

Der Socket wird mit s.setblocking(False) auf nicht-blockierend gesetzt, sodass recvfrom() sofort OSError auslöst, wenn kein Datagramm wartet, anstatt die gesamte Ereignisschleife zu blockieren. Das await asyncio.sleep_ms(10) im leeren Zweig gibt die Kontrolle bis zur nächsten Abfrage an die Ereignisschleife zurück.

Das Senden folgt derselben Form: sendto() auf einem nicht-blockierenden Socket gelingt entweder sofort oder löst aus. Es gibt kein sendallto – UDP-Datagramme sind atomar, sodass jeder Sendevorgang ein ganzes Datagramm oder keines ist. Wenn der Sendepuffer voll ist, besteht der richtige Schritt bei UDP meist darin, das Datagramm zu verwerfen und das nächste beim nächsten Schleifendurchlauf hinauszulassen:

async def udp_telemetry(target_addr, period_ms):
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.setblocking(False)

    while True:
        payload = collect_telemetry()
        try:
            s.sendto(payload, target_addr)
        except OSError:
            pass                # buffer full -- skip this one

        await asyncio.sleep_ms(period_ms)

Der fehlschlagende Zweig ist in der Praxis selten. UDP hat keine Flusssteuerung, sodass sendto() fast immer beim ersten Versuch gelingt; das except existiert hauptsächlich, damit ein kurzer Netzwerkaussetzer die Coroutine nicht zum Absturz bringt.

Der Abschnitt Asyncio behandelt die breiteren Muster zum Einbinden von blockierendem I/O in ein asyncio-Programm; dieselben Muster gelten direkt für einen UDP-Socket.

9.14.5. Timeouts und Abbruch

Das Einwickeln eines Netzwerkaufrufs in asyncio.wait_for() setzt ihm eine Frist:

try:
    reply = await asyncio.wait_for(reader.readline(), timeout=2.0)
except asyncio.TimeoutError:
    print("server is slow")

Eine Coroutine, die zu lange dauert, kann auch von anderer Stelle mit cancel() abgebrochen werden. Beide Mechanismen werden im Koordinations-Kapitel ausführlich behandelt; sie gelten unverändert für Streams, die von asyncio.open_connection() und asyncio.start_server() zurückgegeben werden.

Für die vollständige Stream-Referenz (die Klasse hinter den Readern und Writern sowie die auf dieser Seite beiläufig verwendeten Hilfsfunktionen) siehe asyncio — asynchroner I/O-Scheduler.