11.10. セントラルとして動作する

会話のもう一方の側が セントラル です。これは、アドバタイズしているペリフェラルをスキャンし、通信相手を 1 つ選び、接続を開き、リモートの GATT データベースをたどり、その上のキャラクタリスティックを読み取ったりサブスクライブしたりするデバイスです。ウェアラブルセンサーから測定値を収集したり、ビーコンを受信したり、コンパニオンのマイクロコントローラと通信したりするカメラはセントラルです。

aioble におけるセントラルのパターンは、スキャン、接続、ディスカバリ、操作という 4 つの段階を経て進行します。

11.10.1. スキャン

aioble.scan() は、発見されたデバイスに対する非同期イテレータも兼ねる非同期コンテキストマネージャを返します。典型的な使い方は、目的のデバイスが現れるまでスキャンし、その後イテレーションから抜け出すというものです:

import aioble
import asyncio
import bluetooth

HR_SERVICE = bluetooth.UUID(0x180D)

async def find_heart_rate():
    async with aioble.scan(duration_ms=5000, active=True) as scanner:
        async for result in scanner:
            if HR_SERVICE in result.services():
                return result.device
    return None

duration_ms=5000 はスキャンの実行時間の上限を定めます。duration_ms=0 は(コンテキストマネージャを抜けるまで)永久にスキャンします。active=True はスキャンレスポンスを要求し、双方からのわずかな追加送信と引き換えに、デバイスごとのペイロードサイズを倍にします。残りの interval_us / window_us キーワード引数は、スキャナ自身の無線デューティサイクルを調整するもので、デフォルトから変更されることはまれです。

aioble.ScanResult は、デバイスアドレス、直近の RSSI、生のアドバタイジングおよびスキャンレスポンスのバイト列、そして標準フィールドを解析するヘルパーを公開します:

  • result.device -- connect() を呼び出す準備のできた aioble.Device です。

  • result.rssi -- dBm 単位の受信信号強度インジケータで、「最も近いものを選ぶ」ロジックに役立ちます。

  • result.name() -- ローカル名の文字列、またはアドバタイズされていない場合は None です。

  • result.services() -- デバイスがアドバタイズするすべてのサービスについての bluetooth.UUID のジェネレータです。

  • result.manufacturer() -- 製造者固有フィールドの (company_id, data) タプルのジェネレータです。

  • result.connectable -- 直近のアドバタイズメントが接続可能なものだったかどうかです。

同じデバイスについて新しいアドバタイジングデータが届くと、同じ ScanResult が再び yield されます。そのため、デバイスを無期限に追跡したいだけのパッシブなリスナーは、非同期イテレータを永久に実行し、各イベントで処理を分配できます。

11.10.2. 接続

対象のデバイスが特定できたら、接続を開くのは 1 回の await です:

async def talk_to(device):
    connection = await device.connect()           # 10 s timeout
    async with connection:
        # ... do GATT work ...
        pass

aioble.Device.connect()timeout_ms(接続が確立するまで待つ時間。デフォルトは 10 秒)と、min_conn_interval_us / max_conn_interval_us接続 で説明した、要求する接続間隔の範囲)を取ります。

11.10.2.1. スキャンせずに既知のピアへ再接続する

ピアとの間にすでにボンドが存在する場合、アドレスはすでにわかっているため、スキャンして選ぶラウンドをもう一度行うのは無線時間の無駄です。保存したアドレスから aioble.Device を直接構築し、そのまま connect() へ進みます:

import aioble

KITCHEN_CAM = aioble.Device(aioble.ADDR_PUBLIC,
                            "aa:bb:cc:dd:ee:ff")

async def talk_to_kitchen():
    async with await KITCHEN_CAM.connect() as connection:
        # ... GATT work ...
        pass

最初の引数は aioble.ADDR_PUBLIC(コントローラの工場出荷アドレス)または aioble.ADDR_RANDOM(生成された静的アドレスまたは解決可能なプライベートアドレス)のいずれかです。2 番目は、6 バイトの bytes 値か、コロン区切りの 16 進数文字列のいずれかです。任意の Device(たとえば以前に ScanResult から取得したもの)の addr_type 属性と addr 属性を永続化し、ここに渡し戻すことができます。

返される aioble.DeviceConnection は、セントラルの残りの作業がぶら下がる対象です。async with は、ブロックを抜けるとき(成功時、キャンセル時、あるいはピアが消えたことによる aioble.DeviceDisconnectedError を含む任意の例外時)に接続が確実に閉じられるようにします。

セントラルがデフォルトの 23 バイトの MTU が許す以上に大きなキャラクタリスティック値を必要とする場合、ここがそれを交渉する場所です:

await connection.exchange_mtu(512)

exchange_mtu() は実際に交渉された MTU を返します。これは要求した値とピアがサポートする値の最小値です。)

11.10.3. ディスカバリ

ディスカバリは、リモートの GATT データベースをたどって、サービスとキャラクタリスティックをそれらの UUID で見つけます。やり方は 2 種類あります。ターゲット型(UUID がわかっていて特定の 1 つが欲しい)と網羅型(すべてが欲しい)です。

ターゲット型 -- よくあるケースです:

service = await connection.service(HR_SERVICE)
if service is None:
    return                                        # no such service

char = await service.characteristic(HR_MEASUREMENT)
if char is None:
    return                                        # no such characteristic

aioble.DeviceConnection.service()aioble.ClientService.characteristic() はそれぞれ bluetooth.UUID を取り、一致するオブジェクト(または None)を返します。どちらも、ディスカバリごとの timeout_ms キーワードを持ち、デフォルトは 2 秒です。

網羅型:

async for service in connection.services():
    print("service:", service.uuid)
    async for char in service.characteristics():
        print("  characteristic:", char.uuid, "properties:", hex(char.properties))

これは汎用の Bluetooth エクスプローラアプリが行うことであり、開発には便利ですが、どんな UUID を期待しているかが分かっている本番コードにはそれほど役立ちません。

11.10.3.1. キャラクタリスティックが何をサポートするかを調べる

ディスカバリは、各キャラクタリスティックについてピアがアドバタイズした GATT プロパティのビットマスクを properties として返します。ビットは GATT で定義されたもので、read(0x02)、write-without-response(0x04)、write(0x08)、notify(0x10)、indicate(0x20)などです。操作を発行する前にビットマスクを調べることで、汎用クライアントは、能力を事前に知らないキャラクタリスティックに適応できます:

_PROP_READ = const(0x02)
_PROP_NOTIFY = const(0x10)

char = await service.characteristic(STATUS_UUID)
if char.properties & _PROP_NOTIFY:
    await char.subscribe(notify=True)
    value = await char.notified()
elif char.properties & _PROP_READ:
    value = await char.read()
else:
    value = None                                  # nothing the client can do

ピアの GATT プロファイルをすでに知っている本番コードは、通常これを必要としません。UUID は事前に文書化されているからです。汎用的・探索的なクライアント(未知のデバイスをたどる設定ページや、プラグインホストなど)がこれに依存します。

11.10.4. 操作する

セントラルが ClientCharacteristic を保持すると、各 GATT 操作は 1 回のコルーチン呼び出しになります:

  • Read。 GATT read を発行して値を取得します:

    value = await char.read()
    print("value:", value)
    

    ロングリード(MTU より大きな値)は透過的に処理されます。

  • Write。 新しい値をサーバーに送信します:

    await char.write(b"\\x01")
    

    response=True は write-response を待ち、サーバーが write を拒否すると aioble.GattError を送出します。response=False は write-without-response、すなわち送りっぱなしです。response=None(デフォルト)は、ピアがアドバタイズした内容に基づいて自動的に選択します。

  • Subscribe。 キャラクタリスティックの CCCD に書き込むことで、notify または indicate を有効にします:

    await char.subscribe(notify=True)
    

    これが返ったあと、セントラルは受信するプッシュを待つことができます。

  • Notified / indicated。 サーバーからの次のプッシュを待ちます:

    while True:
        data = await char.notified()
        print("push:", data)
    

    timeout_ms=None(デフォルト)は永久に待ちます。しばらくして諦めるには、ミリ秒単位の整数を渡します。

4 つを組み合わせると、典型的な「接続、サブスクライブ、ストリーム」のセントラルプログラムになります:

async def stream_heart_rate():
    async with aioble.scan(duration_ms=5000, active=True) as scanner:
        async for result in scanner:
            if HR_SERVICE in result.services():
                device = result.device
                break
        else:
            return

    async with await device.connect() as connection:
        service = await connection.service(HR_SERVICE)
        char = await service.characteristic(HR_MEASUREMENT)
        await char.subscribe(notify=True)
        while connection.is_connected():
            data = await char.notified()
            print("hr push:", data)

asyncio.run(stream_heart_rate())

全体でおよそ 10 数行であり、「Bluetooth が動いていない」状態から「ライブデータをストリーミングしている」状態までの流れをカバーします。スキャンイテレータはブロードキャスター/オブザーバーのパターンに対応し、connect は GAP 接続を開き、service / characteristic は GATT ツリーをたどり、subscribe は CCCD に書き込み、notified はプッシュを待ちます。

11.10.5. 切断と再接続

無線リンクに起きたことはすべて、それを待っていたコルーチンに現れます。aioble.DeviceDisconnectedError は、ピアが消えたか、または監視タイムアウトが発火したことを示すシグナルです。この例外は、進行中だった read()write()notified() の呼び出しを終了させ、async with connection ブロックはすべてきれいに抜けます。

切断時に再接続すべきセントラルは、その作業を独自の外側のループでラップします:

async def keep_streaming():
    while True:
        try:
            await stream_heart_rate()
        except aioble.DeviceDisconnectedError:
            print("disconnected, retrying...")
            await asyncio.sleep(2)

11.10.5.1. timeout() で一連の処理を囲む

連続する複数の GATT 操作が(それぞれ個別に自身の timeout_ms で完了するのではなく)1 つの予算の中ですべて完了すべき場合、aioble.DeviceConnection.timeout() を使ってそれらをラップします。返されるコンテキストマネージャは、予算が経過した場合(asyncio.TimeoutError を送出)、またはピアが切断した場合(aioble.DeviceDisconnectedError を送出)に、その本体をキャンセルします:

async with await device.connect() as connection:
    try:
        with connection.timeout(2000):                    # 2 s for the whole block
            service = await connection.service(HR_SERVICE)
            char = await service.characteristic(HR_MEASUREMENT)
            await char.subscribe(notify=True)
    except asyncio.TimeoutError:
        print("discovery + subscribe took too long")

これは、各呼び出しを個別に asyncio.wait_for() でラップするよりもすっきりした代替手段であり、各呼び出しは自身の期限を満たしているのにシーケンス全体としては超過してしまう、という偽りの成功を避けられます。timeout()timeout_ms=None を渡すと、期限が無効になり、切断ガードだけが有効なまま残ります。