8.4. Gather

В предыдущей главе было показано, как asyncio.create_task() планирует сопрограмму и не ждёт её завершения. Сопутствующая операция – запустить несколько awaitable-объектов параллельно и дождаться их всех – это asyncio.gather().

8.4.1. Базовая схема

import asyncio

async def fetch(name, delay_ms):
    await asyncio.sleep_ms(delay_ms)
    return name

async def main():
    results = await asyncio.gather(
        fetch("a", 100),
        fetch("b", 200),
        fetch("c", 50),
    )
    print(results)

asyncio.run(main())

Вывод:

['a', 'b', 'c']

Стоит отметить две вещи. Во-первых, список результатов соответствует порядку аргументов gather, а не порядку, в котором сопрограммы завершились – "c" вернулся первым, но всё равно является третьей записью. Во-вторых, вызов занял в общей сложности 200 ms, а не 350 ms: три задержки выполнялись параллельно, и gather() возвращается, как только завершается самая медленная.

Оба факта вытекают из одного и того же источника. gather() оборачивает каждый аргумент, который ещё не является задачей (сопрограммы в примере), планирует их в цикле, приостанавливает вызывающую сопрограмму до тех пор, пока все они не завершатся, затем возвращает их результаты в исходном порядке.

8.4.2. Когда к этому прибегать

Везде, где у приложения есть N awaitable-операций и нужны результаты всех из них перед продолжением. Типичные примеры:

  • Отправка нескольких сетевых запросов параллельно и ожидание всех ответов.

  • Чтение с нескольких датчиков параллельно перед обработкой объединённого результата.

  • Объединение нескольких короткоживущих вспомогательных задач в конце фазы приложения.

Это не подходящий инструмент для долгоживущих фоновых задач, которые приложение запускает и оставляет работать на всё время жизни программы – это всё ещё работа для create_task(). gather() предназначен для шаблона fan-out / fan-in: разделить работу, выполнить её параллельно, снова объединить.

8.4.3. Исключения в группе

Если какой-либо из собранных awaitable-объектов возбуждает исключение, поведением по умолчанию является повторное возбуждение исключения наружу из вызова gather. Соседние объекты, которые ещё не завершились, отменяются в фоновом режиме.

Обычно это то, чего хочет приложение – один из N параллельных заданий завершился неудачей, поэтому объединённая операция провалилась, поэтому нужно прекратить тратить время на остальные. Иногда приложению нужно обратное: позволить каждому awaitable-объекту завершиться (или провалиться) независимо, а результаты изучить потом. Для этого передайте return_exceptions=True:

results = await asyncio.gather(
    fetch_or_fail("ok"),
    fetch_or_fail("bad"),
    return_exceptions=True,
)
# results == ["ok-value", OSError(...)]

Каждая запись в возвращаемом списке теперь является либо обычным возвращаемым значением, либо исключением, которое возбудил соответствующий awaitable-объект. Вызывающая сторона проверяет isinstance(r, Exception), чтобы их различить.

8.4.4. Отмена

Отмена самого gather – путём отмены той задачи, которая его ожидала – отменяет каждый awaitable-объект, ещё выполняющийся внутри него. Страница тайм-ауты и отмена подробно описывает, как отмена распространяется по цепочке вызовов.