9.14. Sockets กับ asyncio¶
การเรียก recv() แบบบล็อกจะหยุดสคริปต์ทั้งหมดจนกว่าไบต์จะมาถึง การเรียก accept() แบบบล็อกรองรับไคลเอนต์ได้ทีละคนเท่านั้น ทั้งสองกรณีนี้คือสถานการณ์แบบ "รอ I/O" ที่ asyncio ถูกสร้างขึ้นมาเพื่อจัดการ บท asyncio ครอบคลุมเรื่อง event loop, coroutine และ synchronisation primitive ส่วนหน้านี้ครอบคลุมส่วนที่เกี่ยวข้องกับเครือข่ายโดยเฉพาะ
โมดูล asyncio เปิดเผยฟังก์ชันเครือข่ายผ่านตัวช่วยจำนวนน้อยที่รับและคืนค่า stream -- อ็อบเจกต์ระดับสูงที่ห่อ socket และมีเวอร์ชันที่ await-ได้สำหรับการอ่านและเขียน โดย socket พื้นฐานยังคงมีอยู่ แต่แอปพลิเคชันเพียงแค่ไม่ต้องสัมผัสมันโดยตรง
9.14.1. ไคลเอนต์ด้วย asyncio¶
asyncio.open_connection() คือคู่เทียบของ asyncio สำหรับ socket.socket.connect() โดยเปิดการเชื่อมต่อ TCP และคืนค่าอ็อบเจกต์ stream สองตัว: reader และ writer
import asyncio
async def client():
reader, writer = await asyncio.open_connection("192.168.1.20", 9000)
writer.write(b"hello\n")
await writer.drain() # wait until bytes have been sent
reply = await reader.readline()
print("reply:", reply)
writer.close()
await writer.wait_closed()
asyncio.run(client())
สามสิ่งที่ควรสังเกต:
การตั้งค่าการเชื่อมต่อใช้
awaitเพียงครั้งเดียวแทนการเรียกแบบบล็อก ขณะที่การ handshake กำลังดำเนินอยู่ event loop ยังคงสามารถรัน coroutine อื่น ๆ ได้write()ใส่ไบต์เข้าในบัฟเฟอร์ขาออก และdrain()คือawaitที่ yield กลับสู่ loop จนกว่าไบต์เหล่านั้นจะถูกส่งออกไปทางเครือข่ายจริง ๆreadline()อ่านไบต์จนกว่าจะพบ newline class stream ยังรวมถึงread()(อ่านได้ถึง N ไบต์) และreadexactly()(อ่านให้ครบ N ไบต์พอดี) ซึ่งช่วยแก้ปัญหาขอบเขตข้อความของ TCP โดยไม่ต้องเขียน framing loop ด้วยมือ
9.14.2. เซิร์ฟเวอร์ด้วย asyncio¶
asyncio.start_server() คือคู่เทียบของ asyncio สำหรับขั้นตอน bind/listen/accept โดยรับ callback ที่จะถูกเรียกหนึ่งครั้งต่อการเชื่อมต่อที่เข้ามา ด้วยคู่ reader/writer เดียวกันกับที่ฝั่งไคลเอนต์ใช้:
import asyncio
async def handle(reader, writer):
addr = writer.get_extra_info("peername")
print("connection from", addr)
while True:
data = await reader.read(1024)
if not data:
break
writer.write(data) # echo back
await writer.drain()
writer.close()
await writer.wait_closed()
async def main():
server = await asyncio.start_server(handle, "0.0.0.0", 9000)
print("listening on", server.sockets[0].getsockname())
async with server:
await server.serve_forever()
asyncio.run(main())
ทุกการเชื่อมต่อที่ยอมรับจะกลายเป็น task ของตัวเองที่รัน handle event loop จะสลับการทำงานระหว่างกันตามธรรมชาติ -- ไคลเอนต์ที่ช้าไม่สามารถบล็อกไคลเอนต์อื่นได้ เพราะขณะที่มันรออยู่ที่ await reader.read(...) loop ก็สามารถทำงานต่อกับการเชื่อมต่ออื่น ๆ ทุกรายการได้ การเพิ่มสิบไคลเอนต์พร้อมกันไม่ต้องการสิบ thread เพราะ event loop แบบ single-thread เดียวกันขับเคลื่อนทั้งหมด
นี่คือเหตุผลที่ใช้งานได้จริงว่าทำไมแอปพลิเคชันเครือข่ายกล้องที่เขียนด้วย asyncio จึงขยายขนาดได้ดีกว่าโค้ดแบบบล็อกที่เทียบเท่ามาก: ภาพเซิร์ฟเวอร์ใน TCP sockets รองรับไคลเอนต์ได้ทีละคน ส่วนแบบนี้รองรับหลายไคลเอนต์พร้อมกันโดยไม่ต้องพยายามเพิ่มเติม
9.14.3. การทำงานพร้อมกันควบคู่กับการทำงานเครือข่าย¶
ผลตอบแทนที่ยิ่งใหญ่คือการผสมการทำงานเครือข่ายกับงานอื่น ๆ ของกล้องใน loop เดียวกัน กล้องสามารถจับภาพเฟรม ประมวลผลภาพ และ ให้บริการโปรโตคอลเครือข่ายได้พร้อมกัน สลับกันทำงาน:
import asyncio
async def capture_loop():
while True:
img = await camera.snapshot()
# process img ...
await asyncio.sleep_ms(100)
async def handle(reader, writer):
...
async def main():
server = await asyncio.start_server(handle, "0.0.0.0", 9000)
await asyncio.gather(
server.serve_forever(),
capture_loop(),
)
asyncio.run(main())
asyncio.gather() รัน coroutine สองตัวบน event loop เดียวกัน ขณะที่กล้องกำลังนอนหลับใน sleep_ms() ระหว่างเฟรม เซิร์ฟเวอร์ก็ได้รับโอกาสจัดการ network traffic ขณะที่เซิร์ฟเวอร์กำลังรอไบต์ถัดไป กล้องก็ได้รับโอกาสจับภาพ ทั้งคู่ทำงานไปพร้อมกันบน MicroPython thread เดียว
9.14.4. UDP กับ asyncio¶
โมดูล asyncio ไม่ได้ มี stream ระดับสูงสำหรับ UDP -- datagram ไม่เหมาะกับรูปแบบการอ่าน/เขียนของ stream วิธีที่ใช้ได้จริงบนกล้องคือวาง UDP ไว้ใน coroutine ของตัวเอง สลับ socket เป็นโหมด non-blocking และ yield ให้ event loop ระหว่างการพยายามอ่าน:
import asyncio
import socket
async def udp_listener(port):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.setblocking(False)
s.bind(("0.0.0.0", port))
while True:
try:
data, src = s.recvfrom(1024)
except OSError:
await asyncio.sleep_ms(10)
continue
print("got", data, "from", src)
socket ถูกตั้งค่าเป็น non-blocking ด้วย s.setblocking(False) ดังนั้น recvfrom() จะ raise OSError ทันทีเมื่อไม่มี datagram รออยู่แทนที่จะบล็อก event loop ทั้งหมด await asyncio.sleep_ms(10) ในส่วนที่ว่างเปล่าจะส่งมอบการควบคุมกลับไปยัง event loop จนกว่าจะถึงรอบ poll ครั้งถัดไป
การส่งใช้รูปแบบเดียวกัน: sendto() บน socket แบบ non-blocking จะสำเร็จทันทีหรือ raise exception ไม่มี sendallto -- UDP datagram เป็น atomic ดังนั้นแต่ละการส่งคือ datagram ทั้งอันหรือไม่มีเลย ถ้า send buffer เต็ม ทางที่ถูกต้องสำหรับ UDP มักจะทิ้ง datagram และส่ง datagram ถัดไปออกไปในรอบถัดไปของ loop:
async def udp_telemetry(target_addr, period_ms):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.setblocking(False)
while True:
payload = collect_telemetry()
try:
s.sendto(payload, target_addr)
except OSError:
pass # buffer full -- skip this one
await asyncio.sleep_ms(period_ms)
ส่วนที่ล้มเหลวนั้นเกิดขึ้นได้ยากในทางปฏิบัติ UDP ไม่มี flow control ดังนั้น sendto() มักจะสำเร็จในครั้งแรกเสมอ except มีไว้เพื่อให้ network hiccup สั้น ๆ ไม่ทำให้ coroutine crash
ส่วน Asyncio ครอบคลุมรูปแบบที่กว้างขึ้นสำหรับการผสม I/O แบบบล็อกเข้ากับโปรแกรม asyncio รูปแบบเดียวกันนี้ใช้ได้โดยตรงกับ UDP socket
9.14.5. Timeout และการยกเลิก¶
การห่อการเรียกเครือข่ายด้วย asyncio.wait_for() จะกำหนด deadline ให้กับมัน:
try:
reply = await asyncio.wait_for(reader.readline(), timeout=2.0)
except asyncio.TimeoutError:
print("server is slow")
coroutine ที่ใช้เวลานานเกินไปยังสามารถถูก cancel()-ยกเลิกจากที่อื่นได้ ทั้งสองกลไกนี้ถูกอธิบายอย่างละเอียดในบท coordination และใช้ได้โดยไม่เปลี่ยนแปลงกับ stream ที่คืนมาจาก asyncio.open_connection() และ asyncio.start_server()
สำหรับข้อมูลอ้างอิงเต็มรูปแบบของ Stream (class เบื้องหลัง reader และ writer รวมถึงตัวช่วยที่หน้านี้ใช้ไปตลอด) ดูได้ที่ asyncio --- ตัวกำหนดเวลา I/O แบบอะซิงโครนัส