9.14. Gniazda z asyncio¶
Blokujące wywołanie recv() zamraża cały skrypt do czasu nadejścia bajtu. Blokujące wywołanie accept() obsługuje tylko jednego klienta naraz. Oba te przypadki to dokładnie ten rodzaj sytuacji „czekania na wejście/wyjście”, do której obsługi istnieje asyncio. Rozdział o asyncio omawia pętlę zdarzeń, korutyny i prymitywy synchronizacji; ta strona opisuje elementy charakterystyczne dla sieci.
Moduł asyncio udostępnia obsługę sieci poprzez niewielką liczbę pomocników, które przyjmują i zwracają strumienie – obiekty wysokiego poziomu opakowujące gniazdo i oferujące wersje odczytu i zapisu, na które można wykonać await. Bazowe gniazdo nadal istnieje; aplikacja po prostu nie dotyka go bezpośrednio.
9.14.1. Klient z asyncio¶
asyncio.open_connection() to asyncio-wy odpowiednik socket.socket.connect(). Otwiera połączenie TCP i zwraca dwa obiekty strumienia: reader i writer
import asyncio
async def client():
reader, writer = await asyncio.open_connection("192.168.1.20", 9000)
writer.write(b"hello\n")
await writer.drain() # wait until bytes have been sent
reply = await reader.readline()
print("reply:", reply)
writer.close()
await writer.wait_closed()
asyncio.run(client())
Trzy rzeczy warte odnotowania:
Konfiguracja połączenia to jedno
awaitzamiast wywołania blokującego. Podczas gdy uzgadnianie jest w toku, pętla zdarzeń może swobodnie wykonywać inne korutyny.write()umieszcza bajty w buforze wychodzącym;drain()toawait, które oddaje sterowanie pętli, dopóki te bajty nie zostaną faktycznie wysłane przez sieć.readline()czyta bajty do momentu nadejścia znaku nowej linii. Klasa strumienia zawiera równieżread()(czyta do N bajtów) orazreadexactly()(czyta dokładnie N bajtów), które rozwiązują problem granic wiadomości w TCP bez ręcznego pisania pętli ramkowania.
9.14.2. Serwer z asyncio¶
asyncio.start_server() to asyncio-wy odpowiednik sekwencji bind/listen/accept. Przyjmuje wywołanie zwrotne, które zostanie uruchomione raz na każde przychodzące połączenie, z tą samą parą reader/writer, której używa strona klienta:
import asyncio
async def handle(reader, writer):
addr = writer.get_extra_info("peername")
print("connection from", addr)
while True:
data = await reader.read(1024)
if not data:
break
writer.write(data) # echo back
await writer.drain()
writer.close()
await writer.wait_closed()
async def main():
server = await asyncio.start_server(handle, "0.0.0.0", 9000)
print("listening on", server.sockets[0].getsockname())
async with server:
await server.serve_forever()
asyncio.run(main())
Każde zaakceptowane połączenie staje się własnym zadaniem uruchamiającym handle. Pętla zdarzeń naturalnie przełącza się między nimi – jeden powolny klient nie może zablokować pozostałych, ponieważ gdy czeka on na await reader.read(...), pętla może swobodnie posuwać naprzód każde inne połączenie. Dodanie dziesięciu jednoczesnych klientów nie wymaga dziesięciu wątków; ta sama jednowątkowa pętla zdarzeń obsługuje je wszystkie.
To praktyczny powód, dla którego aplikacje sieciowe dla kamery napisane z użyciem asyncio skalują się znacznie lepiej niż równoważny kod blokujący: obraz serwera na Gniazda TCP obsługiwał jednego klienta naraz; ten obsługuje wielu klientów jednocześnie bez dodatkowego wysiłku.
9.14.3. Praca równoległa obok obsługi sieci¶
Największą korzyścią jest łączenie obsługi sieci z resztą pracy kamery w tej samej pętli. Kamera może przechwycić ramkę, wykonać przetwarzanie obrazu oraz obsłużyć protokół sieciowy, wszystko z przeplotem:
import asyncio
async def capture_loop():
while True:
img = await camera.snapshot()
# process img ...
await asyncio.sleep_ms(100)
async def handle(reader, writer):
...
async def main():
server = await asyncio.start_server(handle, "0.0.0.0", 9000)
await asyncio.gather(
server.serve_forever(),
capture_loop(),
)
asyncio.run(main())
asyncio.gather() uruchamia obie korutyny w tej samej pętli zdarzeń. Podczas gdy kamera śpi w sleep_ms() między ramkami, serwer może rozdzielać ruch sieciowy. Podczas gdy serwer czeka na kolejny bajt, kamera może przechwytywać. Oba posuwają się naprzód na jednym wątku MicroPython.
9.14.4. UDP z asyncio¶
Moduł asyncio nie oferuje tych samych wysokopoziomowych strumieni dla UDP – datagramy nie pasują do modelu odczytu/zapisu strumienia. Praktycznym podejściem na kamerze jest umieszczenie pracy z UDP we własnej korutynie, przełączenie gniazda w tryb nieblokujący i oddawanie sterowania pętli zdarzeń między próbami odczytu:
import asyncio
import socket
async def udp_listener(port):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.setblocking(False)
s.bind(("0.0.0.0", port))
while True:
try:
data, src = s.recvfrom(1024)
except OSError:
await asyncio.sleep_ms(10)
continue
print("got", data, "from", src)
Gniazdo jest ustawione jako nieblokujące za pomocą s.setblocking(False), więc recvfrom() natychmiast zgłasza OSError, gdy nie czeka żaden datagram, zamiast blokować całą pętlę zdarzeń. await asyncio.sleep_ms(10) w pustej gałęzi oddaje sterowanie pętli zdarzeń aż do kolejnego odpytania.
Wysyłanie wygląda tak samo: sendto() na gnieździe nieblokującym albo od razu się powodzi, albo zgłasza wyjątek. Nie ma sendallto – datagramy UDP są atomowe, więc każde wysłanie to jeden cały datagram albo żaden. Jeśli bufor wysyłania jest pełny, właściwym posunięciem dla UDP jest zwykle porzucenie datagramu i wysłanie kolejnego przy następnym przejściu przez pętlę:
async def udp_telemetry(target_addr, period_ms):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.setblocking(False)
while True:
payload = collect_telemetry()
try:
s.sendto(payload, target_addr)
except OSError:
pass # buffer full -- skip this one
await asyncio.sleep_ms(period_ms)
Gałąź niepowodzenia jest w praktyce rzadka. UDP nie ma kontroli przepływu, więc sendto() niemal zawsze powodzi się za pierwszym razem; klauzula except istnieje głównie po to, aby krótka usterka sieciowa nie spowodowała awarii korutyny.
Sekcja Asyncio omawia szersze wzorce łączenia blokującego wejścia/wyjścia z programem asyncio; te same wzorce stosują się bezpośrednio do gniazda UDP.
9.14.5. Limity czasu i anulowanie¶
Opakowanie wywołania sieciowego w asyncio.wait_for() nakłada na nie termin:
try:
reply = await asyncio.wait_for(reader.readline(), timeout=2.0)
except asyncio.TimeoutError:
print("server is slow")
Korutyna, która trwa zbyt długo, może też zostać anulowana z zewnątrz za pomocą cancel(). Oba mechanizmy omówiono szczegółowo w rozdziale o koordynacji; stosują się one bez zmian do strumieni zwracanych przez asyncio.open_connection() i asyncio.start_server().
Pełny opis Stream (klasy stojącej za readerami i writerami oraz pomocników użytych mimochodem na tej stronie) znajdziesz w asyncio — asynchroniczny harmonogram operacji we/wy.