11.10. 充当中心设备

对话的另一方是中心设备(central)——它扫描正在广播的外围设备,挑选一个进行通信,打开连接,遍历远端 GATT 数据库,并读取或订阅其上的特征值。一台从可穿戴传感器收集读数、监听信标,或与配套微控制器通信的摄像头就是中心设备。

aioble 中的中心设备模式经历四个阶段:扫描、连接、发现、操作。

11.10.1. 扫描

aioble.scan() 返回一个异步上下文管理器,它同时充当遍历已发现设备的异步迭代器。典型用法是扫描直到感兴趣的设备出现,然后跳出迭代:

import aioble
import asyncio
import bluetooth

HR_SERVICE = bluetooth.UUID(0x180D)

async def find_heart_rate():
    async with aioble.scan(duration_ms=5000, active=True) as scanner:
        async for result in scanner:
            if HR_SERVICE in result.services():
                return result.device
    return None

duration_ms=5000 限制扫描运行多长时间;duration_ms=0 则永久扫描(直到上下文管理器退出)。active=True 请求扫描响应,这会使每台设备的有效载荷大小翻倍,代价是双方各有一次小的额外传输。其余的 interval_us / window_us 关键字参数用于调节扫描器自身的无线电占空比,很少会从默认值改动。

每个 aioble.ScanResult 暴露设备地址、最近一次 RSSI、原始的广播和扫描响应字节,以及解析标准字段的辅助方法:

  • result.device ——一个可在其上调用 connect()aioble.Device

  • result.rssi ——以 dBm 表示的接收信号强度指示,对“挑选最近的设备”逻辑很有用。

  • result.name() ——本地名称字符串,如果未广播则为 None

  • result.services() ——一个生成器,为设备所广播的每个服务产出 bluetooth.UUID

  • result.manufacturer() ——一个生成器,为厂商专属字段产出 (company_id, data) 元组。

  • result.connectable ——最近一次广播是否为可连接的广播。

当同一设备有新的广播数据到来时,相同的 ScanResult 会被再次产出,因此一个只想无限期追踪设备的被动监听者可以让异步迭代器永远运行,并对每个事件进行分派处理。

11.10.2. 连接

一旦识别出目标设备,打开连接只需一次 await:

async def talk_to(device):
    connection = await device.connect()           # 10 s timeout
    async with connection:
        # ... do GATT work ...
        pass

aioble.Device.connect() 接受 timeout_ms(等待连接建立的时长;默认 10 秒),以及 min_conn_interval_us / max_conn_interval_us(来自 连接 的请求连接间隔范围)。

11.10.2.1. 无需扫描即可重连到已知对端

一旦与某个对端建立了绑定,地址就已经已知,再做一轮扫描并挑选就是浪费无线电时间。直接用保存的地址构造一个 aioble.Device,然后径直跳到 connect():

import aioble

KITCHEN_CAM = aioble.Device(aioble.ADDR_PUBLIC,
                            "aa:bb:cc:dd:ee:ff")

async def talk_to_kitchen():
    async with await KITCHEN_CAM.connect() as connection:
        # ... GATT work ...
        pass

第一个参数是 aioble.ADDR_PUBLIC(控制器的出厂地址)或 aioble.ADDR_RANDOM(生成的静态或可解析私有地址)之一;第二个参数是一个六字节的 bytes 值,或者一个以冒号分隔的十六进制字符串。任何 Device(例如先前从某个 ScanResult 获得的设备)的 addr_typeaddr 属性都可以持久化保存,并在此处回填。

返回的 aioble.DeviceConnection 是中心设备其余工作所依托的对象。async with 确保在代码块退出时连接会被关闭——无论是成功退出、被取消,还是抛出任何异常(包括对端离开导致的 aioble.DeviceDisconnectedError)。

如果中心设备需要的特征值比默认的 23 字节 MTU 所允许的更大,这里就是协商它的地方:

await connection.exchange_mtu(512)

exchange_mtu() 返回实际协商出的 MTU,即请求值与对端所支持值之间的较小者。)

11.10.3. 发现

发现会遍历远端 GATT 数据库,按 UUID 查找服务和特征值。有两种方式:定向式(你知道 UUID,想要某个特定的东西)和穷举式(你想要全部)。

定向式——常见情形:

service = await connection.service(HR_SERVICE)
if service is None:
    return                                        # no such service

char = await service.characteristic(HR_MEASUREMENT)
if char is None:
    return                                        # no such characteristic

aioble.DeviceConnection.service()aioble.ClientService.characteristic() 各自接受一个 bluetooth.UUID 并返回匹配的对象(或 None)。两者都有一个每次发现的 timeout_ms 关键字参数,默认为 2 秒。

穷举式:

async for service in connection.services():
    print("service:", service.uuid)
    async for char in service.characteristics():
        print("  characteristic:", char.uuid, "properties:", hex(char.properties))

这就是通用蓝牙浏览器应用所做的事——对开发有用,但对那些已经知道自己期望哪些 UUID 的生产代码则用处不大。

11.10.3.1. 查看某个特征值支持什么

发现会把对端为每个特征值所广播的 GATT 属性位掩码作为 properties 返回。这些位是 GATT 所定义的——读(0x02)、无响应写(0x04)、写(0x08)、通知(0x10)、指示(0x20)等等。在发起操作之前查看位掩码,可以让通用客户端适配那些它事先不了解其能力的特征值:

_PROP_READ = const(0x02)
_PROP_NOTIFY = const(0x10)

char = await service.characteristic(STATUS_UUID)
if char.properties & _PROP_NOTIFY:
    await char.subscribe(notify=True)
    value = await char.notified()
elif char.properties & _PROP_READ:
    value = await char.read()
else:
    value = None                                  # nothing the client can do

已经知道对端 GATT 配置文件的生产代码通常不需要这个——这些 UUID 在一开始就已记录在案。通用/探索型客户端(遍历未知设备的设置页面、插件宿主)则依赖它。

11.10.4. 操作

一旦中心设备持有了一个 ClientCharacteristic,每个 GATT 操作都是一次协程调用:

  • 读取。 发起一次 GATT 读取并取回值:

    value = await char.read()
    print("value:", value)
    

    长读取(值大于 MTU)会被透明处理。

  • 写入。 向服务器发送一个新值:

    await char.write(b"\\x01")
    

    response=True 会等待写响应,如果服务器拒绝该写入则抛出 aioble.GattErrorresponse=False 是无响应写:即发即弃。response=None(默认值)会根据对端所广播的内容自动选择。

  • 订阅。 通过写入特征值的 CCCD 来启用通知或指示:

    await char.subscribe(notify=True)
    

    在这一步返回之后,中心设备就可以等待传入的推送了。

  • 接收通知/指示。 等待来自服务器的下一次推送:

    while True:
        data = await char.notified()
        print("push:", data)
    

    timeout_ms=None(默认值)会永久等待;传入一个以毫秒为单位的整数则会在一段时间后放弃。

把这四步组合起来,就得到了规范的“连接、订阅、流式接收”中心设备程序:

async def stream_heart_rate():
    async with aioble.scan(duration_ms=5000, active=True) as scanner:
        async for result in scanner:
            if HR_SERVICE in result.services():
                device = result.device
                break
        else:
            return

    async with await device.connect() as connection:
        service = await connection.service(HR_SERVICE)
        char = await service.characteristic(HR_MEASUREMENT)
        await char.subscribe(notify=True)
        while connection.is_connected():
            data = await char.notified()
            print("hr push:", data)

asyncio.run(stream_heart_rate())

整个程序大约十几行,覆盖了从“没有运行任何蓝牙”到“实时数据流式传输”的流程。扫描迭代器对应广播者/观察者模式,connect 打开 GAP 连接,service / characteristic 遍历 GATT 树,subscribe 写入 CCCD,notified 等待推送。

11.10.5. 断开连接与重连

发生在无线电链路上的任何事情都会显现在当时正在其上等待的协程中。aioble.DeviceDisconnectedError 是对端离开或监督超时触发的信号;该异常会终止当时正在进行的任何 read()write()notified() 调用,并且任何 async with connection 代码块都会干净地退出。

一个应在丢失连接时重连的中心设备会把工作包裹在它自己的外层循环中:

async def keep_streaming():
    while True:
        try:
            await stream_heart_rate()
        except aioble.DeviceDisconnectedError:
            print("disconnected, retrying...")
            await asyncio.sleep(2)

11.10.5.1. 用 timeout() 为一段序列设界

当连续几个 GATT 操作应当全部在同一个预算内完成时——而不是每个操作各自有自己的 timeout_ms——请用 aioble.DeviceConnection.timeout() 来包裹它们。返回的上下文管理器会在预算耗尽时(抛出 asyncio.TimeoutError)或在对端断开连接时(抛出 aioble.DeviceDisconnectedError)取消其主体。

async with await device.connect() as connection:
    try:
        with connection.timeout(2000):                    # 2 s for the whole block
            service = await connection.service(HR_SERVICE)
            char = await service.characteristic(HR_MEASUREMENT)
            await char.subscribe(notify=True)
    except asyncio.TimeoutError:
        print("discovery + subscribe took too long")

这是比用 asyncio.wait_for() 单独包裹每次调用更清晰的替代方案,并且避免了那种每次调用都满足各自截止时间、但整个序列却超时的虚假成功。把 timeout_ms=None 传给 timeout() 会禁用截止时间,只保留断开连接的保护。