10.9. プログラムによるクライアント向けの認証¶
ベアラートークンは、スマートフォン、コンパニオンマイコン、クラウドワーカーがサーバーに認証する方法です。クライアントはすべてのリクエストの Authorization ヘッダーにトークンを入れ、サーバーはそれをチェックします。コンパニオンのスマートフォンアプリは、オーナーが動体通知で「確認済み」をタップすると「ack」をPOSTします。それらのリクエストにはトークンが必要です。
10.9.1. 署名用シークレット¶
シークレットは、トークンの署名と検証に使うカメラの秘密鍵です。カメラの初回起動時にハードウェアRNGから32バイトのランダムなバイトを生成し、ファイルシステムに永続化して、以降の起動ごとに同じバイトを再利用します。
# auth/tokens.py
import os
try:
with open('secret.bin', 'rb') as f:
SECRET = f.read()
except OSError:
SECRET = os.urandom(32)
with open('secret.bin', 'wb') as f:
f.write(SECRET)
os.urandom() は各カメラポートで暗号学的に適切なランダムソースです。ほとんどのポートでは、チップのハードウェア乱数発生器を直接読み取ります。このファイルはカメラの作業ディレクトリ(SDカードがマウントされていれば /sdcard、そうでなければ /flash)に存在し、決してカメラの外に出ません。secret.bin を削除して再起動するとシークレットがローテーションされ、古いシークレットの下で発行されたすべてのトークンが無効になります。
注釈
machine.unique_id() は代用にはなりません。一部のカメラポートではネットワークMACアドレスの導出にも使われており、つまりその値はカメラが送信するすべてのパケットとともに伝わります。署名用シークレットに必要な性質ではありません。
10.9.2. トークンの発行¶
スマートフォンには、そもそもトークンを取得する手段が必要です。短命のJSON Web Token(JWT)、つまりbase64エンコードされたJSONペイロードに署名を付加したものが、最初の一歩には十分です。オーナーはパスワードをトークンと交換し、その後トークンが期限切れになるまでそれを使います。
import jwt
import time
@app.post('/api/login')
async def api_login(request):
creds = request.json
if creds.get('user') != 'owner' or creds.get('pass') != load_password():
abort(401)
token = jwt.encode({
'sub': 'owner',
'exp': int(time.time()) + 3600,
}, SECRET)
return {'token': token}
スマートフォンは一度 {user, pass} をPOSTし、返されたトークンをローカルストレージに保存し、以降のすべてのリクエストで Authorization: Bearer <token> として含めます。
exp クレームはUnixタイムスタンプです。次のセクションの検証器はこのクレームに依存して期限切れトークンを自動的に拒否します。つまりカメラの実時間クロックが設定されている必要があります。さもないとすべてのトークンが遠い未来か既に期限切れに見えてしまうからです。NTP同期のレシピについては 時刻とNTP を参照してください。
10.9.3. TokenAuth によるトークンの検証¶
microdot.auth.TokenAuth は、Authorization: Bearer <token> ヘッダーを必要とするルート用のデコレータファクトリです。検証コールバックはトークン文字列を受け取り、リクエストに付加すべき任意のアイデンティティオブジェクトを返します。拒否する場合は None を返します。
from microdot.auth import TokenAuth
tokens = TokenAuth()
@tokens.authenticate
async def check_token(request, token):
try:
claims = jwt.decode(token, SECRET)
except jwt.exceptions.PyJWTError:
return None
return claims['sub']
@app.post('/api/ack')
@tokens
async def ack(request):
body = request.json
if not body or 'count' not in body:
abort(400, 'missing count')
return {'ok': True, 'user': request.g.current_user}
jwt.decode() は、署名が誤っている場合、トークンが不正な形式の場合、または exp クレームが過ぎている場合に jwt.exceptions.PyJWTError(またはそのサブクラス)を送出します。検証器はそれらすべてを飲み込んで None を返すため、microdotはハンドラに何も見せることなく401を返します。
検証器が None 以外の値を返すと、microdotはそれを request.g.current_user に保存するため、ハンドラはそれを読み取れます。この場合はJWTの sub(subject)クレームです。
10.9.4. 閉じたネットワーク向けの BasicAuth¶
信頼できるネットワークからのみアクセスされる管理用エンドポイントには、microdot.auth.BasicAuth がよりシンプルです。ブラウザがネイティブのユーザー名/パスワードダイアログをポップアップし、すべてのリクエストの Authorization: Basic ... ヘッダーでそれらを送信します。
from microdot.auth import BasicAuth
basic = BasicAuth(realm='Backyard cam admin')
@basic.authenticate
async def check_basic(request, username, password):
if username == 'owner' and password == load_password():
return 'owner'
return None
@app.get('/admin')
@basic
async def admin(request):
return 'hi ' + request.g.current_user
接続がHTTPSでない限り認証情報は平文で伝わるため、このアプローチは信頼できるネットワーク上か、HTTPSが導入されている場合にのみ意味があります。
/api/ack と /api/login はトークンを理解するようになりました。/admin はBasic認証を必要とします。