10.9. Autentificare pentru clienții programatici¶
Token-urile de tip bearer reprezintă modul în care telefoanele, microcontrolerele însoțitoare și lucrătorii din cloud se autentifică la un server – clientul plasează un token în antetul Authorization la fiecare cerere, iar serverul îl verifică. Aplicația telefonului însoțitor trimite un POST cu un „ack” atunci când proprietarul apasă „văzut” pe o notificare de mișcare; acele cereri au nevoie de un token.
10.9.1. Secretul de semnare¶
Secretul este cheia privată a camerei pentru semnarea și verificarea token-urilor. Generează 32 de octeți aleatori din RNG-ul hardware la prima pornire a camerei, persistă-i în sistemul de fișiere și reutilizează aceiași octeți la fiecare pornire ulterioară:
# auth/tokens.py
import os
try:
with open('secret.bin', 'rb') as f:
SECRET = f.read()
except OSError:
SECRET = os.urandom(32)
with open('secret.bin', 'wb') as f:
f.write(SECRET)
os.urandom() este sursa de numere aleatoare adecvată criptografic pe fiecare port de cameră – pe majoritatea porturilor citește direct generatorul hardware de numere aleatoare al cipului. Fișierul rezidă în directorul de lucru al camerei (/sdcard dacă este montat un card SD, /flash în caz contrar) și nu părăsește niciodată camera. Ștergerea secret.bin și repornirea rotește secretul și invalidează fiecare token emis sub cel vechi.
Notă
machine.unique_id() nu este un substitut. Pe unele porturi de cameră este folosit și pentru a deriva adresa MAC de rețea, ceea ce înseamnă că valoarea sa călătorește cu fiecare pachet pe care camera îl trimite – nu proprietatea de care are nevoie un secret de semnare.
10.9.2. Emiterea token-urilor¶
Telefonul are nevoie de o modalitate de a obține un token în primul rând. Un JSON Web Token (JWT) cu durată scurtă de viață – un payload JSON codificat base64 cu o semnătură atașată – este suficient pentru o primă variantă: proprietarul schimbă o parolă pentru un token, apoi folosește token-ul până când expiră:
import jwt
import time
@app.post('/api/login')
async def api_login(request):
creds = request.json
if creds.get('user') != 'owner' or creds.get('pass') != load_password():
abort(401)
token = jwt.encode({
'sub': 'owner',
'exp': int(time.time()) + 3600,
}, SECRET)
return {'token': token}
Telefonul trimite un POST cu {user, pass} o singură dată, stochează token-ul returnat în stocarea locală și îl include ca Authorization: Bearer <token> la fiecare cerere ulterioară.
Declarația exp este un timestamp Unix. Verificatorul din secțiunea următoare se bazează pe această declarație pentru a respinge automat token-urile expirate – ceea ce înseamnă că ceasul de perete al camerei trebuie setat, deoarece altfel fiecare token va părea fie mult în viitor, fie deja expirat. Vezi Timpul și NTP pentru rețeta de sincronizare NTP.
10.9.3. Verificarea token-urilor cu TokenAuth¶
microdot.auth.TokenAuth este fabrica de decoratori pentru rutele care necesită un antet Authorization: Bearer <token>. Funcția de retroapelare (callback) de verificare primește șirul token-ului și returnează orice obiect de identitate care ar trebui atașat cererii – sau None pentru a respinge:
from microdot.auth import TokenAuth
tokens = TokenAuth()
@tokens.authenticate
async def check_token(request, token):
try:
claims = jwt.decode(token, SECRET)
except jwt.exceptions.PyJWTError:
return None
return claims['sub']
@app.post('/api/ack')
@tokens
async def ack(request):
body = request.json
if not body or 'count' not in body:
abort(400, 'missing count')
return {'ok': True, 'user': request.g.current_user}
jwt.decode() ridică jwt.exceptions.PyJWTError (sau o subclasă) când semnătura este greșită, token-ul este malformat sau declarația exp a trecut. Verificatorul le înghite pe toate acestea și returnează None, astfel încât microdot răspunde cu 401 fără ca handler-ul să vadă ceva.
Când verificatorul returnează o valoare diferită de None, microdot o stochează în request.g.current_user astfel încât handler-ul să o poată citi – în acest caz este declarația sub (subiect) a JWT-ului.
10.9.4. BasicAuth pentru rețele închise¶
Pentru un endpoint de administrare care este accesat doar dintr-o rețea de încredere, microdot.auth.BasicAuth este mai simplu – browserul afișează un dialog nativ de nume de utilizator/parolă și le trimite în antetul Authorization: Basic ... la fiecare cerere:
from microdot.auth import BasicAuth
basic = BasicAuth(realm='Backyard cam admin')
@basic.authenticate
async def check_basic(request, username, password):
if username == 'owner' and password == load_password():
return 'owner'
return None
@app.get('/admin')
@basic
async def admin(request):
return 'hi ' + request.g.current_user
Credențialele călătoresc în clar dacă conexiunea nu este HTTPS, așa că această abordare are sens doar într-o rețea de încredere sau cu HTTPS implementat.
/api/ack și /api/login înțeleg acum token-uri; /admin necesită autentificare Basic.