8.4. Gather

上一章展示了 asyncio.create_task() 如何调度一个协程并等待它。与之配套的操作——并发运行多个可等待对象并等待它们全部完成——就是 asyncio.gather()

8.4.1. 基本结构

import asyncio

async def fetch(name, delay_ms):
    await asyncio.sleep_ms(delay_ms)
    return name

async def main():
    results = await asyncio.gather(
        fetch("a", 100),
        fetch("b", 200),
        fetch("c", 50),
    )
    print(results)

asyncio.run(main())

输出:

['a', 'b', 'c']

有两点需要注意。第一,结果列表是按传给 gather 的参数的顺序排列的,而不是按协程完成的顺序——"c" 最先返回,但它仍然是第三个条目。第二,该调用总共耗时 200 ms,而不是 350 ms:三个休眠是并发运行的,且 gather() 会在最慢的那个一完成时就返回。

这两个事实源自同一个原因。gather() 会包装每个尚不是任务的参数(示例中的那些协程),将它们调度到循环上,挂起调用方协程直到它们全部完成,然后按原始顺序返回它们的结果。

8.4.2. 何时使用它

凡是应用程序有 N 个可等待操作、并且想在继续之前获得它们全部的结果时。典型的例子有:

  • 并行发出多个网络请求并等待所有响应。

  • 在处理合并结果之前并发地从多个传感器读取数据。

  • 在应用程序某个阶段结束时汇合多个短期辅助任务。

对于应用程序启动后任其运行、贯穿程序整个生命周期的长期后台任务,它不是合适的工具——那些仍然是 create_task() 的活儿。gather() 适用于分发/汇聚(fan-out / fan-in)模式:拆分工作,并发完成,再重新汇合。

8.4.3. 组内的异常

如果所汇集的可等待对象中有任何一个抛出异常,默认行为是将该异常重新抛出,传出 gather 调用。尚未完成的同级对象会在后台被取消。

这通常正是应用程序想要的——N 个并行任务中有一个失败了,因此合并操作也就失败了,那么就不必再在其余的任务上花费时间。但有时应用程序想要相反的效果:让每个可等待对象各自独立地完成(或失败),事后再检查结果。为此可以传入 return_exceptions=True:

results = await asyncio.gather(
    fetch_or_fail("ok"),
    fetch_or_fail("bad"),
    return_exceptions=True,
)
# results == ["ok-value", OSError(...)]

现在返回列表中的每个条目要么是一个正常的返回值,要么是对应的可等待对象所抛出的异常。调用方通过检查 isinstance(r, Exception) 来区分它们。

8.4.4. 取消

取消 gather 本身——通过取消正在等待它的那个任务——会取消其内部仍在运行的每个可等待对象。超时与取消 页面详细介绍了取消如何沿调用链传播。