9.14. Sockets עם asyncio

קריאת recv() חוסמת מקפיאה את כל הסקריפט עד שמגיע בית. קריאת accept() חוסמת משרתת רק לקוח אחד בכל פעם. שתיהן הן בדיוק סוג מצב ה“המתנה על קלט/פלט“ ש-asyncio קיים כדי לטפל בו. פרק asyncio מכסה את לולאת האירועים, הקורוטינות ופרימיטיבי הסנכרון; דף זה מכסה את החלקים הספציפיים לרשת.

מודול asyncio חושף עבודת רשת באמצעות מספר קטן של פונקציות עזר המקבלות ומחזירות streams – אובייקטים ברמה גבוהה העוטפים socket ומציעים גרסאות הניתנות ל-await של קריאה וכתיבה. ה-socket הבסיסי עדיין שם; היישום פשוט אינו נוגע בו ישירות.

9.14.1. לקוח עם asyncio

asyncio.open_connection() הוא המקבילה של asyncio ל-socket.socket.connect(). היא פותחת חיבור TCP ומחזירה שני אובייקטי stream: reader ו-writer

import asyncio

async def client():
    reader, writer = await asyncio.open_connection("192.168.1.20", 9000)

    writer.write(b"hello\n")
    await writer.drain()                   # wait until bytes have been sent

    reply = await reader.readline()
    print("reply:", reply)

    writer.close()
    await writer.wait_closed()

asyncio.run(client())

שלושה דברים שכדאי לשים לב אליהם:

  • הקמת החיבור היא await יחיד במקום קריאה חוסמת. בעוד לחיצת היד בתהליך, לולאת האירועים חופשית להריץ קורוטינות אחרות.

  • write() מכניס בתים לחוצץ (buffer) יוצא; drain() הוא ה-await שמוותר ללולאה עד שאותם בתים נשלחו בפועל ברשת.

  • readline() קוראת בתים עד שמגיע תו שורה חדשה. מחלקת ה-stream כוללת גם את read() (קריאה של עד N בתים) ו-readexactly() (קריאה של בדיוק N בתים), הפותרות את בעיית גבולות ההודעה של TCP מבלי לכתוב את לולאות המסגור ביד.

9.14.2. שרת עם asyncio

asyncio.start_server() הוא המקבילה של asyncio לריקוד bind/listen/accept. הוא מקבל פונקציית callback שתורץ פעם אחת עבור כל חיבור נכנס, עם אותו זוג reader/writer שצד הלקוח משתמש בו:

import asyncio

async def handle(reader, writer):
    addr = writer.get_extra_info("peername")
    print("connection from", addr)

    while True:
        data = await reader.read(1024)
        if not data:
            break
        writer.write(data)                 # echo back
        await writer.drain()

    writer.close()
    await writer.wait_closed()

async def main():
    server = await asyncio.start_server(handle, "0.0.0.0", 9000)
    print("listening on", server.sockets[0].getsockname())
    async with server:
        await server.serve_forever()

asyncio.run(main())

כל חיבור מתקבל הופך למשימה משלו המריצה את handle. לולאת האירועים מתזמנת ביניהם באופן טבעי – לקוח איטי אחד אינו יכול לחסום את האחרים, כיוון שבעוד הוא ממתין על await reader.read(...) הלולאה חופשית להתקדם על כל חיבור אחר. הוספת עשרה לקוחות מקבילים אינה דורשת עשרה threads; אותה לולאת אירועים חד-threadית מניעה את כולם.

זוהי הסיבה המעשית לכך שיישומי עבודת רשת של מצלמה הכתובים עבור asyncio מתרחבים הרבה יותר טוב מהקוד החוסם המקביל: תמונת השרת ב-שקעי TCP הייתה לקוח-אחד-בכל-פעם; זו היא ריבוי-לקוחות-בו-זמנית ללא מאמץ נוסף.

9.14.3. עבודה מקבילה לצד עבודת רשת

התמורה הגדולה היא שילוב עבודת רשת עם שאר עבודת המצלמה באותה לולאה. המצלמה יכולה ללכוד פריים, להריץ עיבוד תמונה, וגם לשרת פרוטוקול רשת, הכל בשזירה:

import asyncio

async def capture_loop():
    while True:
        img = await camera.snapshot()
        # process img ...
        await asyncio.sleep_ms(100)

async def handle(reader, writer):
    ...

async def main():
    server = await asyncio.start_server(handle, "0.0.0.0", 9000)
    await asyncio.gather(
        server.serve_forever(),
        capture_loop(),
    )

asyncio.run(main())

asyncio.gather() מריץ את שתי הקורוטינות על אותה לולאת אירועים. בעוד המצלמה ישנה ב-sleep_ms() בין פריימים, השרת זוכה לתזמן תעבורת רשת. בעוד השרת ממתין לבית הבא, המצלמה זוכה ללכוד. שניהם מתקדמים על thread יחיד של MicroPython.

9.14.4. UDP עם asyncio

מודול asyncio אינו מציע את אותם streams ברמה גבוהה עבור UDP – datagrams אינם מתאימים לצורת הקריאה/כתיבה של stream. הגישה המעשית על המצלמה היא להכניס את עבודת ה-UDP לקורוטינה משלה, להעביר את ה-socket למצב לא-חוסם, ולוותר ללולאת האירועים בין ניסיונות קריאה:

import asyncio
import socket

async def udp_listener(port):
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.setblocking(False)
    s.bind(("0.0.0.0", port))

    while True:
        try:
            data, src = s.recvfrom(1024)
        except OSError:
            await asyncio.sleep_ms(10)
            continue
        print("got", data, "from", src)

ה-socket מוגדר כלא-חוסם באמצעות s.setblocking(False), כך ש-recvfrom() מעלה OSError מיד כאשר אין datagram ממתין במקום לחסום את כל לולאת האירועים. ה-await asyncio.sleep_ms(10) בענף הריק מחזיר שליטה ללולאת האירועים עד הסקירה הבאה.

שליחה עוקבת אחר אותה צורה: sendto() על socket לא-חוסם או מצליחה מיד או מעלה שגיאה. אין sendallto – datagrams של UDP הם אטומיים, כך שכל שליחה היא datagram שלם אחד או אף אחד. אם חוצץ השליחה מלא, המהלך הנכון עבור UDP הוא בדרך כלל להשליך את ה-datagram ולתת לבא בתור לצאת בסבב הבא של הלולאה:

async def udp_telemetry(target_addr, period_ms):
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.setblocking(False)

    while True:
        payload = collect_telemetry()
        try:
            s.sendto(payload, target_addr)
        except OSError:
            pass                # buffer full -- skip this one

        await asyncio.sleep_ms(period_ms)

הענף הכושל נדיר בפועל. ל-UDP אין בקרת זרימה, כך ש-sendto() כמעט תמיד מצליחה בניסיון הראשון; ה-except קיים בעיקר כדי שתקלת רשת קצרה לא תקרוס את הקורוטינה.

המקטע Asyncio מכסה את הדפוסים הרחבים יותר לשילוב קלט/פלט חוסם בתוך תוכנית asyncio; אותם דפוסים חלים ישירות על socket של UDP.

9.14.5. פסקי זמן וביטול

עטיפת קריאת רשת ב-asyncio.wait_for() מציבה לה מועד אחרון:

try:
    reply = await asyncio.wait_for(reader.readline(), timeout=2.0)
except asyncio.TimeoutError:
    print("server is slow")

קורוטינה שלוקחת לה יותר מדי זמן יכולה גם להיות מבוטלת באמצעות cancel() ממקום אחר. שני המנגנונים מכוסים בפירוט ב-פרק התיאום; הם חלים ללא שינוי על streams המוחזרים על ידי asyncio.open_connection() ו-asyncio.start_server().

עבור התיעוד המלא של Stream (המחלקה שמאחורי ה-readers וה-writers, בנוסף לפונקציות העזר שדף זה השתמש בהן בדרך אגב), ראה asyncio — מתזמן קלט/פלט אסינכרוני.