9.14. المقابس مع asyncio¶
إن استدعاء recv() الحاجب يجمّد البرنامج النصي بأكمله حتى تصل بايت. واستدعاء accept() الحاجب لا يخدم سوى عميل واحد في كل مرة. وكلاهما تمامًا من نوع حالات "الانتظار على دخل/خرج" التي وُجد asyncio للتعامل معها. ويغطي فصل asyncio حلقة الأحداث والإجراءات المتزامنة (coroutines) وبدائيات المزامنة؛ وتغطي هذه الصفحة الأجزاء الخاصة بالشبكة.
تتيح وحدة asyncio التعامل مع الشبكات عبر عدد قليل من الدوال المساعدة التي تأخذ وتُرجع تدفقات (streams) -- وهي كائنات عالية المستوى تغلّف مقبسًا وتقدّم نسخًا قابلة لـ await من القراءة والكتابة. ويظل المقبس الأساسي موجودًا؛ غير أن التطبيق ببساطة لا يلمسه مباشرة.
9.14.1. عميل مع asyncio¶
asyncio.open_connection() هي نظيرة socket.socket.connect() في asyncio. فهي تفتح اتصال TCP وتُرجع كائني تدفق: قارئ (reader) وكاتب (writer)
import asyncio
async def client():
reader, writer = await asyncio.open_connection("192.168.1.20", 9000)
writer.write(b"hello\n")
await writer.drain() # wait until bytes have been sent
reply = await reader.readline()
print("reply:", reply)
writer.close()
await writer.wait_closed()
asyncio.run(client())
ثلاثة أمور جديرة بالملاحظة:
إعداد الاتصال هو
awaitواحد بدلًا من استدعاء حاجب. وأثناء جريان المصافحة، تكون حلقة الأحداث حرّة لتشغيل إجراءات متزامنة أخرى.write()تضع البايتات في مخزن مؤقت صادر؛ وdrain()هيawaitالتي تتنازل عن التنفيذ للحلقة حتى تكون تلك البايتات قد أُرسلت فعلًا عبر الشبكة.readline()تقرأ البايتات حتى وصول سطر جديد. ويتضمن صنف التدفق أيضًاread()(تقرأ حتى N بايت) وreadexactly()(تقرأ N بايت بالضبط)، وكلاهما يحلّ مشكلة حدود الرسائل في TCP دون كتابة حلقات التأطير يدويًا.
9.14.2. خادم مع asyncio¶
asyncio.start_server() هي نظيرة سلسلة bind/listen/accept في asyncio. فهي تأخذ دالة رد نداء تُشغَّل مرة واحدة لكل اتصال وارد، مع زوج القارئ/الكاتب نفسه الذي تستخدمه جهة العميل:
import asyncio
async def handle(reader, writer):
addr = writer.get_extra_info("peername")
print("connection from", addr)
while True:
data = await reader.read(1024)
if not data:
break
writer.write(data) # echo back
await writer.drain()
writer.close()
await writer.wait_closed()
async def main():
server = await asyncio.start_server(handle, "0.0.0.0", 9000)
print("listening on", server.sockets[0].getsockname())
async with server:
await server.serve_forever()
asyncio.run(main())
كل اتصال مقبول يصبح مهمته الخاصة التي تشغّل handle. وتوزّع حلقة الأحداث التنفيذ بينها بشكل طبيعي -- فلا يستطيع عميل بطيء واحد أن يحجب الآخرين، لأنه أثناء انتظاره على await reader.read(...) تكون الحلقة حرّة لإحراز تقدم على كل اتصال آخر. وإضافة عشرة عملاء متزامنين لا تتطلب عشرة خيوط؛ فحلقة الأحداث ذات الخيط الواحد نفسها تقودهم جميعًا.
هذا هو السبب العملي في أن تطبيقات شبكات الكاميرا المكتوبة لـ asyncio تتوسّع أفضل بكثير من الشيفرة الحاجبة المكافئة: فصورة الخادم في مقابس TCP كانت عميلًا واحدًا في كل مرة؛ وهذه الصورة عملاء كثيرون في آن واحد دون أي جهد إضافي.
9.14.3. العمل المتزامن جنبًا إلى جنب مع الشبكات¶
المكسب الأكبر هو مزج الشبكات مع بقية عمل الكاميرا في الحلقة نفسها. فتستطيع الكاميرا التقاط إطار، وتشغيل معالجة الصور، وخدمة بروتوكول شبكي، كل ذلك متداخلًا:
import asyncio
async def capture_loop():
while True:
img = await camera.snapshot()
# process img ...
await asyncio.sleep_ms(100)
async def handle(reader, writer):
...
async def main():
server = await asyncio.start_server(handle, "0.0.0.0", 9000)
await asyncio.gather(
server.serve_forever(),
capture_loop(),
)
asyncio.run(main())
asyncio.gather() تشغّل الإجراءين المتزامنين على حلقة الأحداث نفسها. فبينما تكون الكاميرا نائمة في sleep_ms() بين الإطارات، يحصل الخادم على فرصة لتوزيع حركة المرور الشبكية. وبينما ينتظر الخادم البايت التالية، تحصل الكاميرا على فرصة للالتقاط. وكلاهما يحرز تقدمًا على خيط MicroPython واحد.
9.14.4. UDP مع asyncio¶
إن وحدة asyncio لا تقدّم التدفقات عالية المستوى نفسها لـ UDP -- فمخططات البيانات (datagrams) لا تتلاءم مع شكل القراءة/الكتابة الخاص بالتدفق. والنهج العملي على الكاميرا هو وضع عمل UDP في إجراء متزامن خاص به، وتحويل المقبس إلى وضع غير حاجب، والتنازل عن التنفيذ لحلقة الأحداث بين محاولات القراءة:
import asyncio
import socket
async def udp_listener(port):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.setblocking(False)
s.bind(("0.0.0.0", port))
while True:
try:
data, src = s.recvfrom(1024)
except OSError:
await asyncio.sleep_ms(10)
continue
print("got", data, "from", src)
يُضبط المقبس على وضع غير حاجب بـ s.setblocking(False)، بحيث تثير recvfrom() الاستثناء OSError فورًا عند عدم وجود مخطط بيانات في الانتظار بدلًا من حجب حلقة الأحداث بأكملها. ويعيد await asyncio.sleep_ms(10) في الفرع الفارغ التحكم إلى حلقة الأحداث حتى الاستطلاع التالي.
يتّبع الإرسال الشكل نفسه: فإما أن تنجح sendto() على مقبس غير حاجب فورًا أو تثير استثناءً. ولا يوجد sendallto -- فمخططات بيانات UDP ذرّية، فكل إرسال هو مخطط بيانات كامل واحد أو لا شيء. وإذا كان مخزن الإرسال ممتلئًا، فإن التصرف الصحيح لـ UDP يكون عادة إسقاط مخطط البيانات وترك التالي يخرج في المرة القادمة عبر الحلقة:
async def udp_telemetry(target_addr, period_ms):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.setblocking(False)
while True:
payload = collect_telemetry()
try:
s.sendto(payload, target_addr)
except OSError:
pass # buffer full -- skip this one
await asyncio.sleep_ms(period_ms)
الفرع الفاشل نادر في الممارسة العملية. فليس لـ UDP تحكم في التدفق، لذا تنجح sendto() دائمًا تقريبًا من المحاولة الأولى؛ وexcept موجود في الغالب كي لا تؤدي عثرة شبكية وجيزة إلى تعطّل الإجراء المتزامن.
يغطي قسم Asyncio الأنماط الأوسع لمزج الدخل/الخرج الحاجب داخل برنامج asyncio؛ وتنطبق الأنماط نفسها مباشرة على مقبس UDP.
9.14.5. المهل الزمنية والإلغاء¶
يضع تغليف استدعاء شبكي بـ asyncio.wait_for() موعدًا نهائيًا عليه:
try:
reply = await asyncio.wait_for(reader.readline(), timeout=2.0)
except asyncio.TimeoutError:
print("server is slow")
كما يمكن أيضًا cancel()-ليّ إجراء متزامن يستغرق وقتًا أطول من اللازم من مكان آخر. وكلتا الآليتين مغطّاة بالتفصيل في فصل التنسيق؛ وتنطبقان دون تغيير على التدفقات التي تُرجعها asyncio.open_connection() وasyncio.start_server().
للاطلاع على المرجع الكامل لـ Stream (الصنف الكامن خلف القرّاء والكتّاب، إضافة إلى الدوال المساعدة التي استخدمتها هذه الصفحة عرضًا)، انظر asyncio --- مجدول الإدخال/الإخراج غير المتزامن.