8.4. Gather¶
上一章說明了 asyncio.create_task() 如何排程一個協程而不等待它。與之搭配的操作 -- 並行執行多個可等待物件並等待它們全部完成 -- 就是 asyncio.gather()。
8.4.1. 基本形式¶
import asyncio
async def fetch(name, delay_ms):
await asyncio.sleep_ms(delay_ms)
return name
async def main():
results = await asyncio.gather(
fetch("a", 100),
fetch("b", 200),
fetch("c", 50),
)
print(results)
asyncio.run(main())
輸出:
['a', 'b', 'c']
有兩點需要注意。第一,結果清單是依照傳給 gather 的引數順序排列,而非協程完成的順序 -- "c" 最先回傳,但仍是第三個項目。第二,這次呼叫總共花了 200 ms,而非 350 ms:這三個睡眠是並行執行的,而 gather() 會在最慢的那個完成後立即回傳。
這兩個事實都源自同一個原因。gather() 會包裝每個尚未成為任務的引數(範例中的協程),把它們排程到迴圈上,暫停呼叫端協程直到它們全部完成,然後依原始順序回傳它們的結果。
8.4.2. 何時該使用它¶
任何時候只要應用程式有 N 個可等待操作,並且想要在繼續之前取得它們全部的結果時。典型範例:
並行發出多個網路請求並等待所有回應。
在處理合併結果之前,並行讀取多個感測器。
在應用程式某個階段結束時,匯整多個短期的輔助任務。
對於應用程式啟動後讓其在程式生命週期內持續執行的長期背景任務,它並不是合適的工具 -- 那些仍然屬於 create_task() 的工作。gather() 是用於扇出/扇入(fan-out / fan-in)模式:分拆工作、並行執行、再重新匯合。
8.4.3. 群組中的例外¶
如果任一被匯集的可等待物件引發例外,預設行為是將該例外從 gather 呼叫中重新引發出來。尚未完成的同層級項目會在背景中被取消。
這通常正是應用程式想要的 -- N 個並行工作中有一個失敗了,因此整個合併操作就算失敗了,所以停止把時間花在其餘的工作上。有時應用程式想要相反的效果:讓每個可等待物件各自獨立完成(或失敗),之後再檢查結果。要達到此目的可傳入 return_exceptions=True:
results = await asyncio.gather(
fetch_or_fail("ok"),
fetch_or_fail("bad"),
return_exceptions=True,
)
# results == ["ok-value", OSError(...)]
回傳清單中的每個項目,現在不是正常的回傳值,就是對應可等待物件所引發的例外。呼叫端可透過 isinstance(r, Exception) 來區分兩者。
8.4.4. 取消¶
取消 gather 本身 -- 透過取消任何正在等待它的任務 -- 會取消其內部仍在執行的每個可等待物件。逾時與取消 頁面詳細說明了取消如何沿著呼叫鏈傳播。