9.14. Sockets met asyncio

Een blokkerende recv()-aanroep bevriest het hele script totdat er een byte arriveert. Een blokkerende accept()-aanroep bedient maar één client tegelijk. Beide zijn precies het soort “wachten op I/O”-situatie waar asyncio voor bestaat. Het asyncio-hoofdstuk behandelt de event loop, coroutines en de synchronisatieprimitieven; deze pagina behandelt de netwerkspecifieke onderdelen.

De asyncio-module stelt netwerken beschikbaar via een klein aantal hulpfuncties die streams aannemen en teruggeven – objecten op hoog niveau die een socket omhullen en await-bare versies van lezen en schrijven bieden. De onderliggende socket is er nog steeds; de toepassing raakt hem alleen niet rechtstreeks aan.

9.14.1. Een client met asyncio

asyncio.open_connection() is de asyncio-tegenhanger van socket.socket.connect(). Het opent een TCP-verbinding en geeft twee streamobjecten terug: een reader en een writer

import asyncio

async def client():
    reader, writer = await asyncio.open_connection("192.168.1.20", 9000)

    writer.write(b"hello\n")
    await writer.drain()                   # wait until bytes have been sent

    reply = await reader.readline()
    print("reply:", reply)

    writer.close()
    await writer.wait_closed()

asyncio.run(client())

Drie dingen om op te merken:

  • Het opzetten van de verbinding is één await in plaats van een blokkerende aanroep. Terwijl de handshake gaande is, is de event loop vrij om andere coroutines uit te voeren.

  • write() zet bytes in een uitgaande buffer; drain() is de await die de controle teruggeeft aan de loop totdat die bytes ook werkelijk over het netwerk zijn verzonden.

  • readline() leest bytes totdat er een newline arriveert. De streamklasse bevat ook read() (lees tot N bytes) en readexactly() (lees precies N bytes), die het probleem van TCP’s berichtgrenzen oplossen zonder dat je de framinglussen met de hand schrijft.

9.14.2. Een server met asyncio

asyncio.start_server() is de asyncio-tegenhanger van de bind/listen/accept-dans. Het neemt een callback aan die één keer per binnenkomende verbinding wordt uitgevoerd, met hetzelfde reader/writer-paar dat de clientzijde gebruikt:

import asyncio

async def handle(reader, writer):
    addr = writer.get_extra_info("peername")
    print("connection from", addr)

    while True:
        data = await reader.read(1024)
        if not data:
            break
        writer.write(data)                 # echo back
        await writer.drain()

    writer.close()
    await writer.wait_closed()

async def main():
    server = await asyncio.start_server(handle, "0.0.0.0", 9000)
    print("listening on", server.sockets[0].getsockname())
    async with server:
        await server.serve_forever()

asyncio.run(main())

Elke geaccepteerde verbinding wordt zijn eigen taak die handle uitvoert. De event loop wisselt er op natuurlijke wijze tussen – één trage client kan de andere niet blokkeren, want terwijl die op await reader.read(...) wacht, is de loop vrij om voortgang te boeken op elke andere verbinding. Tien gelijktijdige clients toevoegen vereist geen tien threads; dezelfde single-threaded event loop drijft ze allemaal aan.

Dit is de praktische reden waarom voor asyncio geschreven netwerktoepassingen voor de camera zoveel beter schalen dan de equivalente blokkerende code: het serverbeeld op TCP-sockets was één-client-tegelijk; dit is veel-clients-tegelijk zonder extra moeite.

9.14.3. Gelijktijdig werk naast netwerken

De grote opbrengst is het combineren van netwerken met de rest van het werk van de camera in dezelfde loop. De camera kan een frame vastleggen, beeldverwerking uitvoeren én een netwerkprotocol bedienen, allemaal door elkaar heen:

import asyncio

async def capture_loop():
    while True:
        img = await camera.snapshot()
        # process img ...
        await asyncio.sleep_ms(100)

async def handle(reader, writer):
    ...

async def main():
    server = await asyncio.start_server(handle, "0.0.0.0", 9000)
    await asyncio.gather(
        server.serve_forever(),
        capture_loop(),
    )

asyncio.run(main())

asyncio.gather() voert de twee coroutines uit op dezelfde event loop. Terwijl de camera in sleep_ms() tussen frames slaapt, krijgt de server de kans om netwerkverkeer af te handelen. Terwijl de server op de volgende byte wacht, krijgt de camera de kans om vast te leggen. Beide boeken voortgang op één enkele MicroPython-thread.

9.14.4. UDP met asyncio

De asyncio-module biedt niet dezelfde streams op hoog niveau voor UDP – datagrammen passen niet in de lees/schrijf-vorm van een stream. De praktische aanpak op de camera is om het UDP-werk in een eigen coroutine te plaatsen, de socket in niet-blokkerende modus te zetten, en tussen leespogingen de controle terug te geven aan de event loop:

import asyncio
import socket

async def udp_listener(port):
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.setblocking(False)
    s.bind(("0.0.0.0", port))

    while True:
        try:
            data, src = s.recvfrom(1024)
        except OSError:
            await asyncio.sleep_ms(10)
            continue
        print("got", data, "from", src)

De socket wordt niet-blokkerend gemaakt met s.setblocking(False), zodat recvfrom() onmiddellijk een OSError opwerpt wanneer er geen datagram wacht, in plaats van de hele event loop te blokkeren. De await asyncio.sleep_ms(10) in de lege tak geeft de controle terug aan de event loop tot de volgende poll.

Verzenden volgt dezelfde vorm: sendto() op een niet-blokkerende socket slaagt ofwel onmiddellijk of werpt een fout op. Er is geen sendallto – UDP-datagrammen zijn atomair, dus elke verzending is één heel datagram of helemaal niets. Als de verzendbuffer vol is, is de juiste keuze voor UDP meestal om het datagram te laten vallen en het volgende de volgende keer door de loop te laten gaan:

async def udp_telemetry(target_addr, period_ms):
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.setblocking(False)

    while True:
        payload = collect_telemetry()
        try:
            s.sendto(payload, target_addr)
        except OSError:
            pass                # buffer full -- skip this one

        await asyncio.sleep_ms(period_ms)

De mislukkende tak is in de praktijk zeldzaam. UDP heeft geen flow control, dus sendto() slaagt vrijwel altijd bij de eerste poging; de except bestaat vooral zodat een korte netwerkhapering de coroutine niet laat crashen.

Het onderdeel Asyncio behandelt de bredere patronen voor het mengen van blokkerende I/O in een asyncio-programma; dezelfde patronen zijn rechtstreeks van toepassing op een UDP-socket.

9.14.5. Time-outs en annulering

Door een netwerkaanroep in asyncio.wait_for() te verpakken, stel je er een deadline op in:

try:
    reply = await asyncio.wait_for(reader.readline(), timeout=2.0)
except asyncio.TimeoutError:
    print("server is slow")

Een coroutine die te lang duurt kan ook van elders worden gecancel()-d. Beide mechanismen worden gedetailleerd behandeld in het hoofdstuk coördinatie; ze zijn ongewijzigd van toepassing op streams die door asyncio.open_connection() en asyncio.start_server() worden teruggegeven.

Voor de volledige Stream-referentie (de klasse achter de readers en writers, plus de hulpfuncties die deze pagina terloops gebruikte), zie asyncio — asynchrone I/O-scheduler.