9.14. Сокеты с asyncio¶
Блокирующий вызов recv() замораживает весь скрипт, пока не придёт байт. Блокирующий вызов accept() обслуживает только одного клиента за раз. И то и другое — именно тот случай «ожидания на вводе-выводе», для обработки которого существует asyncio. Глава об asyncio охватывает цикл событий, корутины и примитивы синхронизации; эта страница охватывает специфичные для сети части.
Модуль asyncio предоставляет работу с сетью через небольшое число вспомогательных функций, которые принимают и возвращают потоки — высокоуровневые объекты, обёртывающие сокет и предлагающие await-совместимые версии чтения и записи. Базовый сокет по-прежнему присутствует; приложение просто не обращается к нему напрямую.
9.14.1. Клиент с asyncio¶
asyncio.open_connection() — это аналог socket.socket.connect() в asyncio. Он открывает TCP-соединение и возвращает два объекта-потока: reader и writer:
import asyncio
async def client():
reader, writer = await asyncio.open_connection("192.168.1.20", 9000)
writer.write(b"hello\n")
await writer.drain() # wait until bytes have been sent
reply = await reader.readline()
print("reply:", reply)
writer.close()
await writer.wait_closed()
asyncio.run(client())
Три момента, которые стоит отметить:
Установка соединения — это один
awaitвместо блокирующего вызова. Пока рукопожатие выполняется, цикл событий свободен для запуска других корутин.write()помещает байты в исходящий буфер;drain()— этоawait, который уступает управление циклу, пока эти байты действительно не будут отправлены по сети.readline()читает байты, пока не придёт перевод строки. Класс потока также включаетread()(прочитать до N байтов) иreadexactly()(прочитать ровно N байтов), которые решают проблему границ сообщений в TCP без написания циклов разбора кадров вручную.
9.14.2. Сервер с asyncio¶
asyncio.start_server() — это аналог последовательности bind/listen/accept в asyncio. Он принимает функцию обратного вызова, которая будет запускаться один раз для каждого входящего соединения, с той же парой reader/writer, что использует клиентская сторона:
import asyncio
async def handle(reader, writer):
addr = writer.get_extra_info("peername")
print("connection from", addr)
while True:
data = await reader.read(1024)
if not data:
break
writer.write(data) # echo back
await writer.drain()
writer.close()
await writer.wait_closed()
async def main():
server = await asyncio.start_server(handle, "0.0.0.0", 9000)
print("listening on", server.sockets[0].getsockname())
async with server:
await server.serve_forever()
asyncio.run(main())
Каждое принятое соединение становится собственной задачей, выполняющей handle. Цикл событий естественным образом распределяет управление между ними — один медленный клиент не может заблокировать остальных, потому что, пока он ждёт на await reader.read(...), цикл свободен для продвижения по всем остальным соединениям. Добавление десяти параллельных клиентов не требует десяти потоков; всеми ими управляет один и тот же однопоточный цикл событий.
Это практическая причина, по которой сетевые приложения для камеры, написанные для asyncio, масштабируются настолько лучше, чем эквивалентный блокирующий код: серверная картина на TCP-сокеты обслуживала одного клиента за раз; эта — обслуживает множество клиентов одновременно без дополнительных усилий.
9.14.3. Параллельная работа наряду с сетью¶
Большая выгода — в совмещении работы с сетью и остальной работы камеры в одном цикле. Камера может захватить кадр, выполнить обработку изображений и обслуживать сетевой протокол, всё чередуясь:
import asyncio
async def capture_loop():
while True:
img = await camera.snapshot()
# process img ...
await asyncio.sleep_ms(100)
async def handle(reader, writer):
...
async def main():
server = await asyncio.start_server(handle, "0.0.0.0", 9000)
await asyncio.gather(
server.serve_forever(),
capture_loop(),
)
asyncio.run(main())
asyncio.gather() запускает обе корутины в одном цикле событий. Пока камера спит в sleep_ms() между кадрами, сервер получает возможность распределять сетевой трафик. Пока сервер ждёт следующего байта, камера получает возможность захватить кадр. Оба продвигаются в одном потоке MicroPython.
9.14.4. UDP с asyncio¶
Модуль asyncio не предлагает таких же высокоуровневых потоков для UDP — датаграммы не вписываются в схему чтения/записи потока. Практичный подход на камере — поместить работу с UDP в отдельную корутину, переключить сокет в неблокирующий режим и уступать управление циклу событий между попытками чтения:
import asyncio
import socket
async def udp_listener(port):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.setblocking(False)
s.bind(("0.0.0.0", port))
while True:
try:
data, src = s.recvfrom(1024)
except OSError:
await asyncio.sleep_ms(10)
continue
print("got", data, "from", src)
Сокет переводится в неблокирующий режим с помощью s.setblocking(False), поэтому recvfrom() немедленно вызывает OSError, когда ни одна датаграмма не ждёт, вместо блокировки всего цикла событий. Вызов await asyncio.sleep_ms(10) в пустой ветке возвращает управление циклу событий до следующего опроса.
Отправка следует той же схеме: sendto() на неблокирующем сокете либо немедленно завершается успехом, либо вызывает исключение. Нет sendallto — UDP-датаграммы атомарны, поэтому каждая отправка — это либо одна целая датаграмма, либо ничего. Если буфер отправки заполнен, для UDP правильным решением обычно будет отбросить датаграмму и позволить следующей уйти на следующем проходе цикла:
async def udp_telemetry(target_addr, period_ms):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.setblocking(False)
while True:
payload = collect_telemetry()
try:
s.sendto(payload, target_addr)
except OSError:
pass # buffer full -- skip this one
await asyncio.sleep_ms(period_ms)
Ветка с ошибкой на практике встречается редко. У UDP нет управления потоком, поэтому sendto() почти всегда завершается успехом с первой попытки; except существует в основном для того, чтобы кратковременный сбой в сети не привёл к аварийному завершению корутины.
Раздел Asyncio охватывает более общие шаблоны для совмещения блокирующего ввода-вывода с программой на asyncio; те же шаблоны применяются напрямую к UDP-сокету.
9.14.5. Тайм-ауты и отмена¶
Оборачивание сетевого вызова в asyncio.wait_for() устанавливает на него крайний срок:
try:
reply = await asyncio.wait_for(reader.readline(), timeout=2.0)
except asyncio.TimeoutError:
print("server is slow")
Слишком долго выполняющуюся корутину также можно отменить из другого места с помощью cancel(). Оба механизма подробно рассмотрены в главе о координации; они без изменений применяются к потокам, возвращаемым asyncio.open_connection() и asyncio.start_server().
Полное описание Stream (класса, лежащего в основе reader и writer, а также вспомогательных функций, которые эта страница использовала вскользь) см. в asyncio — планировщик асинхронного ввода-вывода.