9.14. asyncio ile soketler

Engelleyen bir recv() çağrısı, bir bayt gelene kadar tüm betiği dondurur. Engelleyen bir accept() çağrısı aynı anda yalnızca bir istemciye hizmet eder. Bunların ikisi de tam olarak asyncio modülünün ele almak için var olduğu türden “G/Ç’yi bekle” durumlarıdır. asyncio bölümü olay döngüsünü, eşyordamları ve senkronizasyon ilkellerini kapsar; bu sayfa ağa özgü parçaları kapsar.

asyncio modülü, ağ iletişimini akışlar alan ve döndüren az sayıda yardımcı aracılığıyla sunar – bunlar bir soketi saran ve okuma ile yazmanın await edilebilir sürümlerini sunan üst düzey nesnelerdir. Alttaki soket hâlâ oradadır; uygulama yalnızca ona doğrudan dokunmaz.

9.14.1. asyncio ile bir istemci

asyncio.open_connection(), socket.socket.connect() çağrısının asyncio karşılığıdır. Bir TCP bağlantısı açar ve iki akış nesnesi döndürür: bir okuyucu ve bir yazıcı

import asyncio

async def client():
    reader, writer = await asyncio.open_connection("192.168.1.20", 9000)

    writer.write(b"hello\n")
    await writer.drain()                   # wait until bytes have been sent

    reply = await reader.readline()
    print("reply:", reply)

    writer.close()
    await writer.wait_closed()

asyncio.run(client())

Dikkat edilecek üç şey:

  • Bağlantı kurulumu, engelleyen bir çağrı yerine tek bir await işlemidir. El sıkışma devam ederken, olay döngüsü başka eşyordamları çalıştırmakta serbesttir.

  • write(), baytları bir giden arabelleğe koyar; drain(), bu baytlar ağ üzerinden gerçekten gönderilene kadar döngüye denetimi bırakan await işlemidir.

  • readline(), bir satır sonu gelene kadar bayt okur. Akış sınıfı ayrıca read() (en fazla N bayt oku) ve readexactly() (tam olarak N bayt oku) yöntemlerini de içerir; bunlar çerçeveleme döngülerini elle yazmadan TCP’nin mesaj sınırı sorununu çözer.

9.14.2. asyncio ile bir sunucu

asyncio.start_server(), bind/listen/accept dansının asyncio karşılığıdır. Her gelen bağlantı için bir kez çalıştırılacak, istemci tarafının kullandığı aynı okuyucu/yazıcı çiftiyle çalışan bir geri çağırma alır:

import asyncio

async def handle(reader, writer):
    addr = writer.get_extra_info("peername")
    print("connection from", addr)

    while True:
        data = await reader.read(1024)
        if not data:
            break
        writer.write(data)                 # echo back
        await writer.drain()

    writer.close()
    await writer.wait_closed()

async def main():
    server = await asyncio.start_server(handle, "0.0.0.0", 9000)
    print("listening on", server.sockets[0].getsockname())
    async with server:
        await server.serve_forever()

asyncio.run(main())

Kabul edilen her bağlantı, handle işlevini çalıştıran kendi görevi olur. Olay döngüsü, bunlar arasında doğal olarak geçiş yapar – yavaş bir istemci diğerlerini engelleyemez, çünkü o await reader.read(...) ile beklerken döngü diğer her bağlantıda ilerleme kaydetmekte serbesttir. On eşzamanlı istemci eklemek on iş parçacığı gerektirmez; aynı tek iş parçacıklı olay döngüsü hepsini yürütür.

asyncio için yazılmış kamera ağ uygulamalarının eşdeğer engelleyen koddan çok daha iyi ölçeklenmesinin pratik nedeni budur: TCP soketleri üzerindeki sunucu görüntüsü aynı anda tek istemciyken; bu, fazladan hiçbir çaba olmadan aynı anda birçok istemcidir.

9.14.3. Ağ iletişimiyle birlikte eşzamanlı çalışma

En büyük kazanç, ağ iletişimini kameranın geri kalan işiyle aynı döngüde harmanlamaktır. Kamera bir çerçeve yakalayabilir, görüntü işleme çalıştırabilir ve bir ağ protokolüne hizmet edebilir; tümü iç içe geçmiş olarak:

import asyncio

async def capture_loop():
    while True:
        img = await camera.snapshot()
        # process img ...
        await asyncio.sleep_ms(100)

async def handle(reader, writer):
    ...

async def main():
    server = await asyncio.start_server(handle, "0.0.0.0", 9000)
    await asyncio.gather(
        server.serve_forever(),
        capture_loop(),
    )

asyncio.run(main())

asyncio.gather(), iki eşyordamı aynı olay döngüsünde çalıştırır. Kamera çerçeveler arasında sleep_ms() ile uyurken, sunucu ağ trafiğini yönlendirme fırsatı bulur. Sunucu bir sonraki baytı beklerken, kamera yakalama fırsatı bulur. Her ikisi de tek bir MicroPython iş parçacığında ilerleme kaydeder.

9.14.4. asyncio ile UDP

asyncio modülü, UDP için aynı üst düzey akışları sunmaz – datagramlar bir akışın okuma/yazma biçimine uymaz. Kamerada pratik yaklaşım, UDP işini kendi eşyordamına koymak, soketi engellemeyen moda geçirmek ve okuma denemeleri arasında olay döngüsüne denetimi bırakmaktır:

import asyncio
import socket

async def udp_listener(port):
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.setblocking(False)
    s.bind(("0.0.0.0", port))

    while True:
        try:
            data, src = s.recvfrom(1024)
        except OSError:
            await asyncio.sleep_ms(10)
            continue
        print("got", data, "from", src)

Soket, s.setblocking(False) ile engellemeyen yapılır, böylece recvfrom(), bekleyen bir datagram olmadığında tüm olay döngüsünü engellemek yerine hemen OSError fırlatır. Boş daldaki await asyncio.sleep_ms(10), bir sonraki yoklamaya kadar denetimi olay döngüsüne geri verir.

Gönderme aynı biçimi izler: engellemeyen bir soket üzerindeki sendto() ya hemen başarılı olur ya da fırlatır. sendallto yoktur – UDP datagramları atomiktir, dolayısıyla her gönderme ya bir bütün datagramdır ya da hiçbiridir. Gönderme arabelleği doluysa, UDP için doğru hamle genellikle datagramı düşürmek ve bir sonrakini döngünün bir sonraki turunda göndermektir:

async def udp_telemetry(target_addr, period_ms):
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.setblocking(False)

    while True:
        payload = collect_telemetry()
        try:
            s.sendto(payload, target_addr)
        except OSError:
            pass                # buffer full -- skip this one

        await asyncio.sleep_ms(period_ms)

Başarısız olan dal pratikte nadirdir. UDP’nin akış kontrolü yoktur, dolayısıyla sendto() neredeyse her zaman ilk denemede başarılı olur; except, çoğunlukla kısa bir ağ kesintisinin eşyordamı çökertmemesi için vardır.

Asyncio bölümü, engelleyen G/Ç’yi bir asyncio programına harmanlamanın daha geniş kalıplarını kapsar; aynı kalıplar doğrudan bir UDP soketine de uygulanır.

9.14.5. Zaman aşımları ve iptal

Bir ağ çağrısını asyncio.wait_for() ile sarmak ona bir süre sınırı koyar:

try:
    reply = await asyncio.wait_for(reader.readline(), timeout=2.0)
except asyncio.TimeoutError:
    print("server is slow")

Çok uzun süren bir eşyordam, başka bir yerden de cancel() ile iptal edilebilir. Her iki mekanizma da koordinasyon bölümünde ayrıntılı olarak ele alınır; bunlar asyncio.open_connection() ve asyncio.start_server() tarafından döndürülen akışlara değişmeden uygulanır.

Tam Stream referansı için (okuyucuların ve yazıcıların arkasındaki sınıf, ayrıca bu sayfanın gelişigüzel kullandığı yardımcılar), bkz. asyncio — asenkron G/Ç zamanlayıcısı.