9.14. Sockets với asyncio

Lệnh gọi recv() chặn sẽ đóng băng toàn bộ tập lệnh cho đến khi một byte đến. Lệnh gọi accept() chặn chỉ phục vụ một client tại một thời điểm. Cả hai đều chính xác là loại tình huống "chờ I/O" mà asyncio tồn tại để xử lý. Chương asyncio bao gồm vòng lặp sự kiện, coroutine và các nguyên thủy đồng bộ hóa; trang này bao gồm các phần dành riêng cho mạng.

Module asyncio cung cấp khả năng mạng thông qua một số lượng nhỏ các hàm trợ giúp nhận và trả về stream -- các đối tượng cấp cao bọc một socket và cung cấp các phiên bản có thể await của đọc và ghi. Socket bên dưới vẫn ở đó; ứng dụng chỉ không trực tiếp chạm vào nó.

9.14.1. Client với asyncio

asyncio.open_connection() là đối tác asyncio của socket.socket.connect(). Nó mở một kết nối TCP và trả về hai đối tượng stream: một reader và một writer

import asyncio

async def client():
    reader, writer = await asyncio.open_connection("192.168.1.20", 9000)

    writer.write(b"hello\n")
    await writer.drain()                   # wait until bytes have been sent

    reply = await reader.readline()
    print("reply:", reply)

    writer.close()
    await writer.wait_closed()

asyncio.run(client())

Ba điều cần lưu ý:

  • Thiết lập kết nối là một await thay vì một lệnh gọi chặn. Trong khi quá trình bắt tay đang diễn ra, vòng lặp sự kiện có thể tự do chạy các coroutine khác.

  • write() đặt byte vào bộ đệm đi ra; drain()await mà nhường cho vòng lặp cho đến khi các byte đó thực sự được gửi qua mạng.

  • readline() đọc byte cho đến khi xuất hiện ký tự xuống dòng. Lớp stream bao gồm read() (đọc tối đa N byte) và readexactly() (đọc chính xác N byte) cũng như vậy, giải quyết vấn đề ranh giới thông điệp của TCP mà không cần tự viết các vòng lặp đóng khung.

9.14.2. Server với asyncio

asyncio.start_server() là đối tác asyncio của bộ ba bind/listen/accept. Nó nhận một hàm gọi lại sẽ được chạy mỗi lần có kết nối đến, với cùng cặp reader/writer mà phía client sử dụng:

import asyncio

async def handle(reader, writer):
    addr = writer.get_extra_info("peername")
    print("connection from", addr)

    while True:
        data = await reader.read(1024)
        if not data:
            break
        writer.write(data)                 # echo back
        await writer.drain()

    writer.close()
    await writer.wait_closed()

async def main():
    server = await asyncio.start_server(handle, "0.0.0.0", 9000)
    print("listening on", server.sockets[0].getsockname())
    async with server:
        await server.serve_forever()

asyncio.run(main())

Mỗi kết nối được chấp nhận trở thành một task riêng chạy handle. Vòng lặp sự kiện điều phối giữa chúng một cách tự nhiên -- một client chậm không thể chặn các client khác, vì trong khi nó đang chờ await reader.read(...) vòng lặp có thể tự do tiến hành trên mọi kết nối khác. Thêm mười client đồng thời không yêu cầu mười luồng; cùng một vòng lặp sự kiện đơn luồng điều khiển tất cả chúng.

Đây là lý do thực tế khiến các ứng dụng mạng camera được viết cho asyncio mở rộng tốt hơn nhiều so với mã chặn tương đương: hình ảnh server trên TCP sockets là một-client-mỗi-lần; hình ảnh này là nhiều-client-cùng-lúc mà không cần thêm công sức.

9.14.3. Công việc đồng thời cùng với mạng

Lợi ích lớn là kết hợp mạng với phần còn lại của công việc của camera trong cùng một vòng lặp. Camera có thể chụp một khung hình, chạy xử lý ảnh, phục vụ một giao thức mạng, tất cả được xen kẽ:

import asyncio

async def capture_loop():
    while True:
        img = await camera.snapshot()
        # process img ...
        await asyncio.sleep_ms(100)

async def handle(reader, writer):
    ...

async def main():
    server = await asyncio.start_server(handle, "0.0.0.0", 9000)
    await asyncio.gather(
        server.serve_forever(),
        capture_loop(),
    )

asyncio.run(main())

asyncio.gather() chạy hai coroutine trên cùng một vòng lặp sự kiện. Trong khi camera đang ngủ trong sleep_ms() giữa các khung hình, server có cơ hội điều phối lưu lượng mạng. Trong khi server đang chờ byte tiếp theo, camera có cơ hội chụp. Cả hai đều tiến triển trên một luồng MicroPython duy nhất.

9.14.4. UDP với asyncio

Module asyncio không cung cấp các stream cấp cao tương tự cho UDP -- datagram không phù hợp với hình dạng đọc/ghi của một stream. Cách tiếp cận thực tế trên camera là đặt công việc UDP trong coroutine riêng của nó, chuyển socket sang chế độ không chặn và nhường cho vòng lặp sự kiện giữa các lần thử đọc:

import asyncio
import socket

async def udp_listener(port):
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.setblocking(False)
    s.bind(("0.0.0.0", port))

    while True:
        try:
            data, src = s.recvfrom(1024)
        except OSError:
            await asyncio.sleep_ms(10)
            continue
        print("got", data, "from", src)

Socket được đặt không chặn với s.setblocking(False), vì vậy recvfrom() sẽ ngay lập tức raise OSError khi không có datagram nào đang chờ thay vì chặn toàn bộ vòng lặp sự kiện. await asyncio.sleep_ms(10) trong nhánh trống chuyển quyền kiểm soát trở lại vòng lặp sự kiện đến lần thăm dò tiếp theo.

Việc gửi tuân theo cùng hình dạng: sendto() trên socket không chặn hoặc thành công ngay lập tức hoặc raise. Không có sendallto -- UDP datagram là nguyên tử, vì vậy mỗi lần gửi là một datagram hoàn chỉnh hoặc không có gì. Nếu bộ đệm gửi đầy, động thái đúng đắn cho UDP thường là bỏ datagram và để cái tiếp theo gửi vào lần tiếp theo trong vòng lặp:

async def udp_telemetry(target_addr, period_ms):
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.setblocking(False)

    while True:
        payload = collect_telemetry()
        try:
            s.sendto(payload, target_addr)
        except OSError:
            pass                # buffer full -- skip this one

        await asyncio.sleep_ms(period_ms)

Nhánh lỗi hiếm khi xảy ra trong thực tế. UDP không có kiểm soát luồng, vì vậy sendto() hầu như luôn thành công trong lần thử đầu tiên; except chủ yếu tồn tại để một sự cố mạng ngắn không làm crash coroutine.

Phần Asyncio bao gồm các mô hình rộng hơn để trộn I/O chặn vào chương trình asyncio; các mô hình tương tự áp dụng trực tiếp cho socket UDP.

9.14.5. Timeout và hủy bỏ

Bọc một lệnh gọi mạng trong asyncio.wait_for() đặt thời hạn cho nó:

try:
    reply = await asyncio.wait_for(reader.readline(), timeout=2.0)
except asyncio.TimeoutError:
    print("server is slow")

Một coroutine mất quá nhiều thời gian cũng có thể bị cancel() từ nơi khác. Cả hai cơ chế được đề cập chi tiết trong chương phối hợp; chúng áp dụng không thay đổi cho các stream được trả về bởi asyncio.open_connection()asyncio.start_server().

Để biết tài liệu tham khảo đầy đủ về Stream (lớp đằng sau các reader và writer, cộng với các hàm trợ giúp mà trang này đã sử dụng khi lướt qua), xem asyncio --- bộ lập lịch I/O bất đồng bộ.