9.14. Сокети з asyncio

Блокуючий виклик recv() заморожує весь скрипт до надходження байта. Блокуючий виклик accept() обслуговує лише одного клієнта за раз. Обидва випадки – це саме ті ситуації «очікування на I/O», для яких існує asyncio. Глава asyncio охоплює цикл подій, корутини та примітиви синхронізації; ця сторінка охоплює специфічні для мережі частини.

Модуль asyncio надає мережеву функціональність через невелику кількість допоміжних функцій, які приймають і повертають потоки – високорівневі об’єкти, що огортають сокет і пропонують await-здатні версії читання та запису. Базовий сокет залишається; застосунок просто не звертається до нього безпосередньо.

9.14.1. Клієнт з asyncio

asyncio.open_connection() – аналог socket.socket.connect() у asyncio. Він відкриває TCP-з’єднання і повертає два об’єкти потоку: reader (зчитувач) і writer (записувач):

import asyncio

async def client():
    reader, writer = await asyncio.open_connection("192.168.1.20", 9000)

    writer.write(b"hello\n")
    await writer.drain()                   # wait until bytes have been sent

    reply = await reader.readline()
    print("reply:", reply)

    writer.close()
    await writer.wait_closed()

asyncio.run(client())

Три речі, на які варто звернути увагу:

  • Налаштування з’єднання – це один await замість блокуючого виклику. Поки відбувається рукостискання, цикл подій може виконувати інші корутини.

  • write() розміщує байти у вихідному буфері; drain() – це await, який передає управління циклу, доки байти не будуть фактично надіслані через мережу.

  • readline() зчитує байти до появи символу нового рядка. Клас потоку також містить read() (зчитати до N байт) і readexactly() (зчитати рівно N байт), які вирішують проблему меж повідомлень TCP без написання циклів кадрування вручну.

9.14.2. Сервер з asyncio

asyncio.start_server() – аналог послідовності bind/listen/accept у asyncio. Він приймає зворотний виклик, який буде запускатися для кожного вхідного з’єднання, з тією самою парою reader/writer, що використовується на стороні клієнта:

import asyncio

async def handle(reader, writer):
    addr = writer.get_extra_info("peername")
    print("connection from", addr)

    while True:
        data = await reader.read(1024)
        if not data:
            break
        writer.write(data)                 # echo back
        await writer.drain()

    writer.close()
    await writer.wait_closed()

async def main():
    server = await asyncio.start_server(handle, "0.0.0.0", 9000)
    print("listening on", server.sockets[0].getsockname())
    async with server:
        await server.serve_forever()

asyncio.run(main())

Кожне прийняте з’єднання стає власним завданням, що виконує handle. Цикл подій природно перемикається між ними – повільний клієнт не може заблокувати інших, оскільки поки він очікує на await reader.read(...), цикл вільний просувати кожне інше з’єднання. Додавання десяти паралельних клієнтів не вимагає десяти потоків; той самий однопоточний цикл подій керує ними всіма.

Це практична причина, чому мережеві застосунки для камери, написані для asyncio, масштабуються значно краще, ніж еквівалентний блокуючий код: приклад сервера на сторінці TCP-сокети обслуговував одного клієнта за раз; цей – багатьох клієнтів одночасно без додаткових зусиль.

9.14.3. Паралельна робота поряд з мережею

Головна перевага – поєднання мережевої роботи з іншою роботою камери в одному циклі. Камера може захоплювати кадр, виконувати обробку зображення і обслуговувати мережевий протокол – все перемежовано:

import asyncio

async def capture_loop():
    while True:
        img = await camera.snapshot()
        # process img ...
        await asyncio.sleep_ms(100)

async def handle(reader, writer):
    ...

async def main():
    server = await asyncio.start_server(handle, "0.0.0.0", 9000)
    await asyncio.gather(
        server.serve_forever(),
        capture_loop(),
    )

asyncio.run(main())

asyncio.gather() запускає дві корутини в одному циклі подій. Поки камера спить в sleep_ms() між кадрами, сервер може обробляти мережевий трафік. Поки сервер очікує наступного байта, камера може робити знімок. Обидва просуваються в одному потоці MicroPython.

9.14.4. UDP з asyncio

Модуль asyncio не пропонує такі ж високорівневі потоки для UDP – датаграми не вписуються в модель читання/запису потоку. Практичний підхід на камері полягає в тому, щоб помістити UDP-роботу у власну корутину, переключити сокет у неблокуючий режим і передавати управління циклу подій між спробами читання:

import asyncio
import socket

async def udp_listener(port):
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.setblocking(False)
    s.bind(("0.0.0.0", port))

    while True:
        try:
            data, src = s.recvfrom(1024)
        except OSError:
            await asyncio.sleep_ms(10)
            continue
        print("got", data, "from", src)

Сокет встановлюється неблокуючим за допомогою s.setblocking(False), тому recvfrom() негайно викидає OSError, коли датаграма не очікує, замість того щоб блокувати весь цикл подій. await asyncio.sleep_ms(10) в порожній гілці передає управління циклу подій до наступного опитування.

Надсилання відбувається аналогічно: sendto() на неблокуючому сокеті або відразу успішно завершується, або викидає виняток. Немає sendallto – UDP-датаграми атомарні, тому кожне надсилання – це або ціла датаграма, або нічого. Якщо буфер надсилання заповнений, правильним рішенням для UDP зазвичай є відкинути датаграму і надіслати наступну в наступному проході циклу:

async def udp_telemetry(target_addr, period_ms):
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.setblocking(False)

    while True:
        payload = collect_telemetry()
        try:
            s.sendto(payload, target_addr)
        except OSError:
            pass                # buffer full -- skip this one

        await asyncio.sleep_ms(period_ms)

Гілка з помилкою рідко спрацьовує на практиці. UDP не має контролю потоку, тому sendto() майже завжди успішно виконується з першої спроби; except здебільшого потрібен для того, щоб короткий мережевий збій не аварійно завершив корутину.

Розділ Asyncio охоплює ширші шаблони для змішування блокуючого I/O з asyncio-програмою; ті самі шаблони застосовуються безпосередньо до UDP-сокета.

9.14.5. Тайм-аути та скасування

Обгортання мережевого виклику в asyncio.wait_for() встановлює для нього дедлайн:

try:
    reply = await asyncio.wait_for(reader.readline(), timeout=2.0)
except asyncio.TimeoutError:
    print("server is slow")

Корутину, що виконується надто довго, також можна скасувати (cancel()) ззовні. Обидва механізми детально описані в розділі координації; вони застосовуються без змін до потоків, повернутих asyncio.open_connection() і asyncio.start_server().

Повний довідник Stream (клас, що стоїть за зчитувачами й записувачами, а також допоміжні функції, використані на цій сторінці), дивіться в asyncio — асинхронний планувальник вводу/виводу.