9.14. Сокети з asyncio¶
Блокуючий виклик recv() заморожує весь скрипт до надходження байта. Блокуючий виклик accept() обслуговує лише одного клієнта за раз. Обидва випадки – це саме ті ситуації «очікування на I/O», для яких існує asyncio. Глава asyncio охоплює цикл подій, корутини та примітиви синхронізації; ця сторінка охоплює специфічні для мережі частини.
Модуль asyncio надає мережеву функціональність через невелику кількість допоміжних функцій, які приймають і повертають потоки – високорівневі об’єкти, що огортають сокет і пропонують await-здатні версії читання та запису. Базовий сокет залишається; застосунок просто не звертається до нього безпосередньо.
9.14.1. Клієнт з asyncio¶
asyncio.open_connection() – аналог socket.socket.connect() у asyncio. Він відкриває TCP-з’єднання і повертає два об’єкти потоку: reader (зчитувач) і writer (записувач):
import asyncio
async def client():
reader, writer = await asyncio.open_connection("192.168.1.20", 9000)
writer.write(b"hello\n")
await writer.drain() # wait until bytes have been sent
reply = await reader.readline()
print("reply:", reply)
writer.close()
await writer.wait_closed()
asyncio.run(client())
Три речі, на які варто звернути увагу:
Налаштування з’єднання – це один
awaitзамість блокуючого виклику. Поки відбувається рукостискання, цикл подій може виконувати інші корутини.write()розміщує байти у вихідному буфері;drain()– цеawait, який передає управління циклу, доки байти не будуть фактично надіслані через мережу.readline()зчитує байти до появи символу нового рядка. Клас потоку також міститьread()(зчитати до N байт) іreadexactly()(зчитати рівно N байт), які вирішують проблему меж повідомлень TCP без написання циклів кадрування вручну.
9.14.2. Сервер з asyncio¶
asyncio.start_server() – аналог послідовності bind/listen/accept у asyncio. Він приймає зворотний виклик, який буде запускатися для кожного вхідного з’єднання, з тією самою парою reader/writer, що використовується на стороні клієнта:
import asyncio
async def handle(reader, writer):
addr = writer.get_extra_info("peername")
print("connection from", addr)
while True:
data = await reader.read(1024)
if not data:
break
writer.write(data) # echo back
await writer.drain()
writer.close()
await writer.wait_closed()
async def main():
server = await asyncio.start_server(handle, "0.0.0.0", 9000)
print("listening on", server.sockets[0].getsockname())
async with server:
await server.serve_forever()
asyncio.run(main())
Кожне прийняте з’єднання стає власним завданням, що виконує handle. Цикл подій природно перемикається між ними – повільний клієнт не може заблокувати інших, оскільки поки він очікує на await reader.read(...), цикл вільний просувати кожне інше з’єднання. Додавання десяти паралельних клієнтів не вимагає десяти потоків; той самий однопоточний цикл подій керує ними всіма.
Це практична причина, чому мережеві застосунки для камери, написані для asyncio, масштабуються значно краще, ніж еквівалентний блокуючий код: приклад сервера на сторінці TCP-сокети обслуговував одного клієнта за раз; цей – багатьох клієнтів одночасно без додаткових зусиль.
9.14.3. Паралельна робота поряд з мережею¶
Головна перевага – поєднання мережевої роботи з іншою роботою камери в одному циклі. Камера може захоплювати кадр, виконувати обробку зображення і обслуговувати мережевий протокол – все перемежовано:
import asyncio
async def capture_loop():
while True:
img = await camera.snapshot()
# process img ...
await asyncio.sleep_ms(100)
async def handle(reader, writer):
...
async def main():
server = await asyncio.start_server(handle, "0.0.0.0", 9000)
await asyncio.gather(
server.serve_forever(),
capture_loop(),
)
asyncio.run(main())
asyncio.gather() запускає дві корутини в одному циклі подій. Поки камера спить в sleep_ms() між кадрами, сервер може обробляти мережевий трафік. Поки сервер очікує наступного байта, камера може робити знімок. Обидва просуваються в одному потоці MicroPython.
9.14.4. UDP з asyncio¶
Модуль asyncio не пропонує такі ж високорівневі потоки для UDP – датаграми не вписуються в модель читання/запису потоку. Практичний підхід на камері полягає в тому, щоб помістити UDP-роботу у власну корутину, переключити сокет у неблокуючий режим і передавати управління циклу подій між спробами читання:
import asyncio
import socket
async def udp_listener(port):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.setblocking(False)
s.bind(("0.0.0.0", port))
while True:
try:
data, src = s.recvfrom(1024)
except OSError:
await asyncio.sleep_ms(10)
continue
print("got", data, "from", src)
Сокет встановлюється неблокуючим за допомогою s.setblocking(False), тому recvfrom() негайно викидає OSError, коли датаграма не очікує, замість того щоб блокувати весь цикл подій. await asyncio.sleep_ms(10) в порожній гілці передає управління циклу подій до наступного опитування.
Надсилання відбувається аналогічно: sendto() на неблокуючому сокеті або відразу успішно завершується, або викидає виняток. Немає sendallto – UDP-датаграми атомарні, тому кожне надсилання – це або ціла датаграма, або нічого. Якщо буфер надсилання заповнений, правильним рішенням для UDP зазвичай є відкинути датаграму і надіслати наступну в наступному проході циклу:
async def udp_telemetry(target_addr, period_ms):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.setblocking(False)
while True:
payload = collect_telemetry()
try:
s.sendto(payload, target_addr)
except OSError:
pass # buffer full -- skip this one
await asyncio.sleep_ms(period_ms)
Гілка з помилкою рідко спрацьовує на практиці. UDP не має контролю потоку, тому sendto() майже завжди успішно виконується з першої спроби; except здебільшого потрібен для того, щоб короткий мережевий збій не аварійно завершив корутину.
Розділ Asyncio охоплює ширші шаблони для змішування блокуючого I/O з asyncio-програмою; ті самі шаблони застосовуються безпосередньо до UDP-сокета.
9.14.5. Тайм-аути та скасування¶
Обгортання мережевого виклику в asyncio.wait_for() встановлює для нього дедлайн:
try:
reply = await asyncio.wait_for(reader.readline(), timeout=2.0)
except asyncio.TimeoutError:
print("server is slow")
Корутину, що виконується надто довго, також можна скасувати (cancel()) ззовні. Обидва механізми детально описані в розділі координації; вони застосовуються без змін до потоків, повернутих asyncio.open_connection() і asyncio.start_server().
Повний довідник Stream (клас, що стоїть за зчитувачами й записувачами, а також допоміжні функції, використані на цій сторінці), дивіться в asyncio — асинхронний планувальник вводу/виводу.