9.14. Sockets กับ asyncio

การเรียก recv() แบบบล็อกจะหยุดสคริปต์ทั้งหมดจนกว่าไบต์จะมาถึง การเรียก accept() แบบบล็อกรองรับไคลเอนต์ได้ทีละคนเท่านั้น ทั้งสองกรณีนี้คือสถานการณ์แบบ "รอ I/O" ที่ asyncio ถูกสร้างขึ้นมาเพื่อจัดการ บท asyncio ครอบคลุมเรื่อง event loop, coroutine และ synchronisation primitive ส่วนหน้านี้ครอบคลุมส่วนที่เกี่ยวข้องกับเครือข่ายโดยเฉพาะ

โมดูล asyncio เปิดเผยฟังก์ชันเครือข่ายผ่านตัวช่วยจำนวนน้อยที่รับและคืนค่า stream -- อ็อบเจกต์ระดับสูงที่ห่อ socket และมีเวอร์ชันที่ await-ได้สำหรับการอ่านและเขียน โดย socket พื้นฐานยังคงมีอยู่ แต่แอปพลิเคชันเพียงแค่ไม่ต้องสัมผัสมันโดยตรง

9.14.1. ไคลเอนต์ด้วย asyncio

asyncio.open_connection() คือคู่เทียบของ asyncio สำหรับ socket.socket.connect() โดยเปิดการเชื่อมต่อ TCP และคืนค่าอ็อบเจกต์ stream สองตัว: reader และ writer

import asyncio

async def client():
    reader, writer = await asyncio.open_connection("192.168.1.20", 9000)

    writer.write(b"hello\n")
    await writer.drain()                   # wait until bytes have been sent

    reply = await reader.readline()
    print("reply:", reply)

    writer.close()
    await writer.wait_closed()

asyncio.run(client())

สามสิ่งที่ควรสังเกต:

  • การตั้งค่าการเชื่อมต่อใช้ await เพียงครั้งเดียวแทนการเรียกแบบบล็อก ขณะที่การ handshake กำลังดำเนินอยู่ event loop ยังคงสามารถรัน coroutine อื่น ๆ ได้

  • write() ใส่ไบต์เข้าในบัฟเฟอร์ขาออก และ drain() คือ await ที่ yield กลับสู่ loop จนกว่าไบต์เหล่านั้นจะถูกส่งออกไปทางเครือข่ายจริง ๆ

  • readline() อ่านไบต์จนกว่าจะพบ newline class stream ยังรวมถึง read() (อ่านได้ถึง N ไบต์) และ readexactly() (อ่านให้ครบ N ไบต์พอดี) ซึ่งช่วยแก้ปัญหาขอบเขตข้อความของ TCP โดยไม่ต้องเขียน framing loop ด้วยมือ

9.14.2. เซิร์ฟเวอร์ด้วย asyncio

asyncio.start_server() คือคู่เทียบของ asyncio สำหรับขั้นตอน bind/listen/accept โดยรับ callback ที่จะถูกเรียกหนึ่งครั้งต่อการเชื่อมต่อที่เข้ามา ด้วยคู่ reader/writer เดียวกันกับที่ฝั่งไคลเอนต์ใช้:

import asyncio

async def handle(reader, writer):
    addr = writer.get_extra_info("peername")
    print("connection from", addr)

    while True:
        data = await reader.read(1024)
        if not data:
            break
        writer.write(data)                 # echo back
        await writer.drain()

    writer.close()
    await writer.wait_closed()

async def main():
    server = await asyncio.start_server(handle, "0.0.0.0", 9000)
    print("listening on", server.sockets[0].getsockname())
    async with server:
        await server.serve_forever()

asyncio.run(main())

ทุกการเชื่อมต่อที่ยอมรับจะกลายเป็น task ของตัวเองที่รัน handle event loop จะสลับการทำงานระหว่างกันตามธรรมชาติ -- ไคลเอนต์ที่ช้าไม่สามารถบล็อกไคลเอนต์อื่นได้ เพราะขณะที่มันรออยู่ที่ await reader.read(...) loop ก็สามารถทำงานต่อกับการเชื่อมต่ออื่น ๆ ทุกรายการได้ การเพิ่มสิบไคลเอนต์พร้อมกันไม่ต้องการสิบ thread เพราะ event loop แบบ single-thread เดียวกันขับเคลื่อนทั้งหมด

นี่คือเหตุผลที่ใช้งานได้จริงว่าทำไมแอปพลิเคชันเครือข่ายกล้องที่เขียนด้วย asyncio จึงขยายขนาดได้ดีกว่าโค้ดแบบบล็อกที่เทียบเท่ามาก: ภาพเซิร์ฟเวอร์ใน TCP sockets รองรับไคลเอนต์ได้ทีละคน ส่วนแบบนี้รองรับหลายไคลเอนต์พร้อมกันโดยไม่ต้องพยายามเพิ่มเติม

9.14.3. การทำงานพร้อมกันควบคู่กับการทำงานเครือข่าย

ผลตอบแทนที่ยิ่งใหญ่คือการผสมการทำงานเครือข่ายกับงานอื่น ๆ ของกล้องใน loop เดียวกัน กล้องสามารถจับภาพเฟรม ประมวลผลภาพ และ ให้บริการโปรโตคอลเครือข่ายได้พร้อมกัน สลับกันทำงาน:

import asyncio

async def capture_loop():
    while True:
        img = await camera.snapshot()
        # process img ...
        await asyncio.sleep_ms(100)

async def handle(reader, writer):
    ...

async def main():
    server = await asyncio.start_server(handle, "0.0.0.0", 9000)
    await asyncio.gather(
        server.serve_forever(),
        capture_loop(),
    )

asyncio.run(main())

asyncio.gather() รัน coroutine สองตัวบน event loop เดียวกัน ขณะที่กล้องกำลังนอนหลับใน sleep_ms() ระหว่างเฟรม เซิร์ฟเวอร์ก็ได้รับโอกาสจัดการ network traffic ขณะที่เซิร์ฟเวอร์กำลังรอไบต์ถัดไป กล้องก็ได้รับโอกาสจับภาพ ทั้งคู่ทำงานไปพร้อมกันบน MicroPython thread เดียว

9.14.4. UDP กับ asyncio

โมดูล asyncio ไม่ได้ มี stream ระดับสูงสำหรับ UDP -- datagram ไม่เหมาะกับรูปแบบการอ่าน/เขียนของ stream วิธีที่ใช้ได้จริงบนกล้องคือวาง UDP ไว้ใน coroutine ของตัวเอง สลับ socket เป็นโหมด non-blocking และ yield ให้ event loop ระหว่างการพยายามอ่าน:

import asyncio
import socket

async def udp_listener(port):
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.setblocking(False)
    s.bind(("0.0.0.0", port))

    while True:
        try:
            data, src = s.recvfrom(1024)
        except OSError:
            await asyncio.sleep_ms(10)
            continue
        print("got", data, "from", src)

socket ถูกตั้งค่าเป็น non-blocking ด้วย s.setblocking(False) ดังนั้น recvfrom() จะ raise OSError ทันทีเมื่อไม่มี datagram รออยู่แทนที่จะบล็อก event loop ทั้งหมด await asyncio.sleep_ms(10) ในส่วนที่ว่างเปล่าจะส่งมอบการควบคุมกลับไปยัง event loop จนกว่าจะถึงรอบ poll ครั้งถัดไป

การส่งใช้รูปแบบเดียวกัน: sendto() บน socket แบบ non-blocking จะสำเร็จทันทีหรือ raise exception ไม่มี sendallto -- UDP datagram เป็น atomic ดังนั้นแต่ละการส่งคือ datagram ทั้งอันหรือไม่มีเลย ถ้า send buffer เต็ม ทางที่ถูกต้องสำหรับ UDP มักจะทิ้ง datagram และส่ง datagram ถัดไปออกไปในรอบถัดไปของ loop:

async def udp_telemetry(target_addr, period_ms):
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.setblocking(False)

    while True:
        payload = collect_telemetry()
        try:
            s.sendto(payload, target_addr)
        except OSError:
            pass                # buffer full -- skip this one

        await asyncio.sleep_ms(period_ms)

ส่วนที่ล้มเหลวนั้นเกิดขึ้นได้ยากในทางปฏิบัติ UDP ไม่มี flow control ดังนั้น sendto() มักจะสำเร็จในครั้งแรกเสมอ except มีไว้เพื่อให้ network hiccup สั้น ๆ ไม่ทำให้ coroutine crash

ส่วน Asyncio ครอบคลุมรูปแบบที่กว้างขึ้นสำหรับการผสม I/O แบบบล็อกเข้ากับโปรแกรม asyncio รูปแบบเดียวกันนี้ใช้ได้โดยตรงกับ UDP socket

9.14.5. Timeout และการยกเลิก

การห่อการเรียกเครือข่ายด้วย asyncio.wait_for() จะกำหนด deadline ให้กับมัน:

try:
    reply = await asyncio.wait_for(reader.readline(), timeout=2.0)
except asyncio.TimeoutError:
    print("server is slow")

coroutine ที่ใช้เวลานานเกินไปยังสามารถถูก cancel()-ยกเลิกจากที่อื่นได้ ทั้งสองกลไกนี้ถูกอธิบายอย่างละเอียดในบท coordination และใช้ได้โดยไม่เปลี่ยนแปลงกับ stream ที่คืนมาจาก asyncio.open_connection() และ asyncio.start_server()

สำหรับข้อมูลอ้างอิงเต็มรูปแบบของ Stream (class เบื้องหลัง reader และ writer รวมถึงตัวช่วยที่หน้านี้ใช้ไปตลอด) ดูได้ที่ asyncio --- ตัวกำหนดเวลา I/O แบบอะซิงโครนัส