11.10. 作為中央裝置(central)

對話的另一端是 中央裝置(central) ——掃描正在廣播的周邊裝置、挑選其中一個進行通訊、開啟連線、巡走遠端的 GATT 資料庫,並讀取或訂閱其上特徵值的裝置。一台從穿戴式感測器收集讀數、聆聽信標,或與夥伴微控制器通訊的相機,就是一個中央裝置。

aioble 中的中央模式經歷四個階段:掃描、連線、探索、操作。

11.10.1. 掃描

aioble.scan() 回傳一個非同步情境管理器,它同時也充當對已發現裝置進行迭代的非同步迭代器。典型用法是持續掃描直到出現感興趣的裝置,然後跳出迭代::

import aioble
import asyncio
import bluetooth

HR_SERVICE = bluetooth.UUID(0x180D)

async def find_heart_rate():
    async with aioble.scan(duration_ms=5000, active=True) as scanner:
        async for result in scanner:
            if HR_SERVICE in result.services():
                return result.device
    return None

duration_ms=5000 限制掃描執行的時間長度;duration_ms=0 則永久掃描(直到情境管理器離開)。active=True 會請求掃描回應,這會使每個裝置的酬載大小加倍,代價是雙方各多一次少量的傳輸。其餘的 interval_us / window_us 關鍵字參數用於調整掃描器自身無線電的工作週期,很少需要從預設值更動。

每個 aioble.ScanResult 都揭露裝置位址、最新的 RSSI、原始的廣播與掃描回應位元組,以及解析標準欄位的輔助函式:

  • result.device ——一個可在其上呼叫 connect()aioble.Device

  • result.rssi ——以 dBm 為單位的接收訊號強度指標,適用於「挑選最近者」的邏輯。

  • result.name() ——本地名稱字串,若未廣播則為 None

  • result.services() ——對裝置所廣播的每個服務產生 bluetooth.UUID 的產生器。

  • result.manufacturer() ——對製造商特定欄位產生 (company_id, data) 元組的產生器。

  • result.connectable ——最近一次廣播是否為可連線的廣播。

當同一裝置有新的廣播資料抵達時,同一個 ScanResult 會被重新產生,因此一個只想無限期追蹤裝置的被動聆聽者,可以永久執行該非同步迭代器並對每個事件進行分派。

11.10.2. 連線

一旦辨識出目標裝置,開啟連線只需一次 await::

async def talk_to(device):
    connection = await device.connect()           # 10 s timeout
    async with connection:
        # ... do GATT work ...
        pass

aioble.Device.connect() 接受 timeout_ms(等待連線建立的時間長度;預設為 10 秒),以及 min_conn_interval_us / max_conn_interval_us(取自 連線 的所請求連線間隔範圍)。

11.10.2.1. 在不掃描的情況下重新連線至已知對端

一旦與某個對端建立了配對綁定,其位址便已知,再進行一輪掃描並挑選只是浪費無線電時間。可直接以儲存的位址建構一個 aioble.Device,並直接跳到 connect()::

import aioble

KITCHEN_CAM = aioble.Device(aioble.ADDR_PUBLIC,
                            "aa:bb:cc:dd:ee:ff")

async def talk_to_kitchen():
    async with await KITCHEN_CAM.connect() as connection:
        # ... GATT work ...
        pass

第一個引數為 aioble.ADDR_PUBLIC(控制器的出廠位址)或 aioble.ADDR_RANDOM(產生的靜態或可解析私密位址)之一;第二個引數則為六位元組的 bytes 值或以冒號分隔的十六進位字串。任何 Device(例如稍早從 ScanResult 取得的那個)的 addr_typeaddr 屬性都可被持久化並在此處回饋使用。

回傳的 aioble.DeviceConnection 正是中央裝置其餘工作所依附的對象。async with 確保區塊離開時連線會被關閉——無論是成功、被取消,或任何例外,包括對端離線所造成的 aioble.DeviceDisconnectedError

如果中央裝置需要比預設 23 位元組 MTU 所允許更大的特徵值,這裡就是協商它的地方::

await connection.exchange_mtu(512)

exchange_mtu() 回傳實際協商出的 MTU,即所請求值與對端所支援值兩者的最小值。)

11.10.3. 探索

探索會巡走遠端的 GATT 資料庫,依 UUID 找出服務與特徵值。有兩種方式:針對性的(你知道 UUID 並想要某個特定項目)與窮盡式的(你想要全部)。

針對性——常見情況::

service = await connection.service(HR_SERVICE)
if service is None:
    return                                        # no such service

char = await service.characteristic(HR_MEASUREMENT)
if char is None:
    return                                        # no such characteristic

aioble.DeviceConnection.service()aioble.ClientService.characteristic() 各接受一個 bluetooth.UUID 並回傳相符的物件(或 None)。兩者都有一個每次探索的 timeout_ms 關鍵字,預設為 2 秒。

窮盡式::

async for service in connection.services():
    print("service:", service.uuid)
    async for char in service.characteristics():
        print("  characteristic:", char.uuid, "properties:", hex(char.properties))

這正是通用的藍牙探索 app 所做的事——對開發很有用,但對於已知預期 UUID 的正式環境程式碼則用處較小。

11.10.3.1. 檢視某特徵值支援哪些功能

探索會以 properties 的形式,回傳對端為每個特徵值所廣播的 GATT 屬性位元遮罩。這些位元為 GATT 定義的那些——讀取(0x02)、無回應寫入(0x04)、寫入(0x08)、通知(0x10)、指示(0x20)等等。在發出操作之前檢視這個位元遮罩,可讓通用用戶端適應其能力事先未知的特徵值::

_PROP_READ = const(0x02)
_PROP_NOTIFY = const(0x10)

char = await service.characteristic(STATUS_UUID)
if char.properties & _PROP_NOTIFY:
    await char.subscribe(notify=True)
    value = await char.notified()
elif char.properties & _PROP_READ:
    value = await char.read()
else:
    value = None                                  # nothing the client can do

已知對端 GATT 設定檔的正式環境程式碼通常不需要這個——這些 UUID 在一開始就已記載。通用/探索性用戶端(巡走未知裝置的設定頁面、外掛主機)則仰賴它。

11.10.4. 操作

一旦中央裝置持有一個 ClientCharacteristic,每個 GATT 操作都是一次協程呼叫:

  • 讀取。 發出一次 GATT 讀取並取回該值::

    value = await char.read()
    print("value:", value)
    

    長讀取(值大於 MTU)會被透明地處理。

  • 寫入。 將新值傳送至伺服器::

    await char.write(b"\\x01")
    

    response=True 會等待寫入回應,並在伺服器拒絕寫入時拋出 aioble.GattErrorresponse=False 為無回應寫入:發出後即不理會。response=None(預設)會根據對端所廣播的內容自動挑選。

  • 訂閱。 透過寫入特徵值的 CCCD 來啟用通知或指示::

    await char.subscribe(notify=True)
    

    在此返回之後,中央裝置便可等待傳入的推送。

  • 收到通知/指示。 等待來自伺服器的下一次推送::

    while True:
        data = await char.notified()
        print("push:", data)
    

    timeout_ms=None(預設)會永久等待;傳入一個以毫秒為單位的整數,便會在一段時間後放棄。

將這四者組合起來,便構成典型的「連線、訂閱、串流」中央裝置程式::

async def stream_heart_rate():
    async with aioble.scan(duration_ms=5000, active=True) as scanner:
        async for result in scanner:
            if HR_SERVICE in result.services():
                device = result.device
                break
        else:
            return

    async with await device.connect() as connection:
        service = await connection.service(HR_SERVICE)
        char = await service.characteristic(HR_MEASUREMENT)
        await char.subscribe(notify=True)
        while connection.is_connected():
            data = await char.notified()
            print("hr push:", data)

asyncio.run(stream_heart_rate())

整段程式大約十來行,涵蓋了從「沒有任何藍牙在執行」到「即時資料串流」的整個流程。掃描迭代器對應廣播者/觀察者模式,connect 開啟 GAP 連線,service / characteristic 巡走 GATT 樹,subscribe 寫入 CCCD,而 notified 等待推送。

11.10.5. 斷線與重新連線

任何發生在無線電鏈路上的事,都會浮現在當時正在其上等待的協程中。aioble.DeviceDisconnectedError 是對端離線或監督逾時觸發的訊號;該例外會終止當時進行中的任何 read()write()notified() 呼叫,而任何 async with connection 區塊都會乾淨地離開。

一個應在失去連線時重新連線的中央裝置,會把工作包在它自己的外層迴圈中::

async def keep_streaming():
    while True:
        try:
            await stream_heart_rate()
        except aioble.DeviceDisconnectedError:
            print("disconnected, retrying...")
            await asyncio.sleep(2)

11.10.5.1. 用 timeout() 包覆一段操作序列

當連續數個 GATT 操作都應在同一份預算內完成——而非各自依其自身的 timeout_ms——時,可用 aioble.DeviceConnection.timeout() 來包覆它們。回傳的情境管理器會在預算耗盡時(拋出 asyncio.TimeoutError)或對端斷線時(拋出 aioble.DeviceDisconnectedError)取消其主體::

async with await device.connect() as connection:
    try:
        with connection.timeout(2000):                    # 2 s for the whole block
            service = await connection.service(HR_SERVICE)
            char = await service.characteristic(HR_MEASUREMENT)
            await char.subscribe(notify=True)
    except asyncio.TimeoutError:
        print("discovery + subscribe took too long")

比起以 asyncio.wait_for() 個別包覆每次呼叫,這是更乾淨的替代方案,並可避免那種每次呼叫各自符合其期限、但整段序列卻超時的虛假成功。對 timeout() 傳入 timeout_ms=None 會停用期限,僅保留斷線防護處於作用中。