11.10. 作為中央裝置(central)¶
對話的另一端是 中央裝置(central) ——掃描正在廣播的周邊裝置、挑選其中一個進行通訊、開啟連線、巡走遠端的 GATT 資料庫,並讀取或訂閱其上特徵值的裝置。一台從穿戴式感測器收集讀數、聆聽信標,或與夥伴微控制器通訊的相機,就是一個中央裝置。
aioble 中的中央模式經歷四個階段:掃描、連線、探索、操作。
11.10.1. 掃描¶
aioble.scan() 回傳一個非同步情境管理器,它同時也充當對已發現裝置進行迭代的非同步迭代器。典型用法是持續掃描直到出現感興趣的裝置,然後跳出迭代::
import aioble
import asyncio
import bluetooth
HR_SERVICE = bluetooth.UUID(0x180D)
async def find_heart_rate():
async with aioble.scan(duration_ms=5000, active=True) as scanner:
async for result in scanner:
if HR_SERVICE in result.services():
return result.device
return None
duration_ms=5000 限制掃描執行的時間長度;duration_ms=0 則永久掃描(直到情境管理器離開)。active=True 會請求掃描回應,這會使每個裝置的酬載大小加倍,代價是雙方各多一次少量的傳輸。其餘的 interval_us / window_us 關鍵字參數用於調整掃描器自身無線電的工作週期,很少需要從預設值更動。
每個 aioble.ScanResult 都揭露裝置位址、最新的 RSSI、原始的廣播與掃描回應位元組,以及解析標準欄位的輔助函式:
result.device——一個可在其上呼叫connect()的aioble.Device。result.rssi——以 dBm 為單位的接收訊號強度指標,適用於「挑選最近者」的邏輯。result.name()——本地名稱字串,若未廣播則為None。result.services()——對裝置所廣播的每個服務產生bluetooth.UUID的產生器。result.manufacturer()——對製造商特定欄位產生(company_id, data)元組的產生器。result.connectable——最近一次廣播是否為可連線的廣播。
當同一裝置有新的廣播資料抵達時,同一個 ScanResult 會被重新產生,因此一個只想無限期追蹤裝置的被動聆聽者,可以永久執行該非同步迭代器並對每個事件進行分派。
11.10.2. 連線¶
一旦辨識出目標裝置,開啟連線只需一次 await::
async def talk_to(device):
connection = await device.connect() # 10 s timeout
async with connection:
# ... do GATT work ...
pass
aioble.Device.connect() 接受 timeout_ms(等待連線建立的時間長度;預設為 10 秒),以及 min_conn_interval_us / max_conn_interval_us(取自 連線 的所請求連線間隔範圍)。
11.10.2.1. 在不掃描的情況下重新連線至已知對端¶
一旦與某個對端建立了配對綁定,其位址便已知,再進行一輪掃描並挑選只是浪費無線電時間。可直接以儲存的位址建構一個 aioble.Device,並直接跳到 connect()::
import aioble
KITCHEN_CAM = aioble.Device(aioble.ADDR_PUBLIC,
"aa:bb:cc:dd:ee:ff")
async def talk_to_kitchen():
async with await KITCHEN_CAM.connect() as connection:
# ... GATT work ...
pass
第一個引數為 aioble.ADDR_PUBLIC(控制器的出廠位址)或 aioble.ADDR_RANDOM(產生的靜態或可解析私密位址)之一;第二個引數則為六位元組的 bytes 值或以冒號分隔的十六進位字串。任何 Device(例如稍早從 ScanResult 取得的那個)的 addr_type 與 addr 屬性都可被持久化並在此處回饋使用。
回傳的 aioble.DeviceConnection 正是中央裝置其餘工作所依附的對象。async with 確保區塊離開時連線會被關閉——無論是成功、被取消,或任何例外,包括對端離線所造成的 aioble.DeviceDisconnectedError。
如果中央裝置需要比預設 23 位元組 MTU 所允許更大的特徵值,這裡就是協商它的地方::
await connection.exchange_mtu(512)
(exchange_mtu() 回傳實際協商出的 MTU,即所請求值與對端所支援值兩者的最小值。)
11.10.3. 探索¶
探索會巡走遠端的 GATT 資料庫,依 UUID 找出服務與特徵值。有兩種方式:針對性的(你知道 UUID 並想要某個特定項目)與窮盡式的(你想要全部)。
針對性——常見情況::
service = await connection.service(HR_SERVICE)
if service is None:
return # no such service
char = await service.characteristic(HR_MEASUREMENT)
if char is None:
return # no such characteristic
aioble.DeviceConnection.service() 與 aioble.ClientService.characteristic() 各接受一個 bluetooth.UUID 並回傳相符的物件(或 None)。兩者都有一個每次探索的 timeout_ms 關鍵字,預設為 2 秒。
窮盡式::
async for service in connection.services():
print("service:", service.uuid)
async for char in service.characteristics():
print(" characteristic:", char.uuid, "properties:", hex(char.properties))
這正是通用的藍牙探索 app 所做的事——對開發很有用,但對於已知預期 UUID 的正式環境程式碼則用處較小。
11.10.3.1. 檢視某特徵值支援哪些功能¶
探索會以 properties 的形式,回傳對端為每個特徵值所廣播的 GATT 屬性位元遮罩。這些位元為 GATT 定義的那些——讀取(0x02)、無回應寫入(0x04)、寫入(0x08)、通知(0x10)、指示(0x20)等等。在發出操作之前檢視這個位元遮罩,可讓通用用戶端適應其能力事先未知的特徵值::
_PROP_READ = const(0x02)
_PROP_NOTIFY = const(0x10)
char = await service.characteristic(STATUS_UUID)
if char.properties & _PROP_NOTIFY:
await char.subscribe(notify=True)
value = await char.notified()
elif char.properties & _PROP_READ:
value = await char.read()
else:
value = None # nothing the client can do
已知對端 GATT 設定檔的正式環境程式碼通常不需要這個——這些 UUID 在一開始就已記載。通用/探索性用戶端(巡走未知裝置的設定頁面、外掛主機)則仰賴它。
11.10.4. 操作¶
一旦中央裝置持有一個 ClientCharacteristic,每個 GATT 操作都是一次協程呼叫:
讀取。 發出一次 GATT 讀取並取回該值::
value = await char.read() print("value:", value)
長讀取(值大於 MTU)會被透明地處理。
寫入。 將新值傳送至伺服器::
await char.write(b"\\x01")response=True會等待寫入回應,並在伺服器拒絕寫入時拋出aioble.GattError。response=False為無回應寫入:發出後即不理會。response=None(預設)會根據對端所廣播的內容自動挑選。訂閱。 透過寫入特徵值的 CCCD 來啟用通知或指示::
await char.subscribe(notify=True)在此返回之後,中央裝置便可等待傳入的推送。
收到通知/指示。 等待來自伺服器的下一次推送::
while True: data = await char.notified() print("push:", data)
timeout_ms=None(預設)會永久等待;傳入一個以毫秒為單位的整數,便會在一段時間後放棄。
將這四者組合起來,便構成典型的「連線、訂閱、串流」中央裝置程式::
async def stream_heart_rate():
async with aioble.scan(duration_ms=5000, active=True) as scanner:
async for result in scanner:
if HR_SERVICE in result.services():
device = result.device
break
else:
return
async with await device.connect() as connection:
service = await connection.service(HR_SERVICE)
char = await service.characteristic(HR_MEASUREMENT)
await char.subscribe(notify=True)
while connection.is_connected():
data = await char.notified()
print("hr push:", data)
asyncio.run(stream_heart_rate())
整段程式大約十來行,涵蓋了從「沒有任何藍牙在執行」到「即時資料串流」的整個流程。掃描迭代器對應廣播者/觀察者模式,connect 開啟 GAP 連線,service / characteristic 巡走 GATT 樹,subscribe 寫入 CCCD,而 notified 等待推送。
11.10.5. 斷線與重新連線¶
任何發生在無線電鏈路上的事,都會浮現在當時正在其上等待的協程中。aioble.DeviceDisconnectedError 是對端離線或監督逾時觸發的訊號;該例外會終止當時進行中的任何 read()、write() 或 notified() 呼叫,而任何 async with connection 區塊都會乾淨地離開。
一個應在失去連線時重新連線的中央裝置,會把工作包在它自己的外層迴圈中::
async def keep_streaming():
while True:
try:
await stream_heart_rate()
except aioble.DeviceDisconnectedError:
print("disconnected, retrying...")
await asyncio.sleep(2)
11.10.5.1. 用 timeout() 包覆一段操作序列¶
當連續數個 GATT 操作都應在同一份預算內完成——而非各自依其自身的 timeout_ms——時,可用 aioble.DeviceConnection.timeout() 來包覆它們。回傳的情境管理器會在預算耗盡時(拋出 asyncio.TimeoutError)或對端斷線時(拋出 aioble.DeviceDisconnectedError)取消其主體::
async with await device.connect() as connection:
try:
with connection.timeout(2000): # 2 s for the whole block
service = await connection.service(HR_SERVICE)
char = await service.characteristic(HR_MEASUREMENT)
await char.subscribe(notify=True)
except asyncio.TimeoutError:
print("discovery + subscribe took too long")
比起以 asyncio.wait_for() 個別包覆每次呼叫,這是更乾淨的替代方案,並可避免那種每次呼叫各自符合其期限、但整段序列卻超時的虛假成功。對 timeout() 傳入 timeout_ms=None 會停用期限,僅保留斷線防護處於作用中。