11.10. Hoạt động như một central

Phía còn lại của cuộc trao đổi là central -- thiết bị quét tìm các ngoại vi đang quảng bá, chọn một thiết bị để kết nối, mở kết nối, duyệt cơ sở dữ liệu GATT từ xa, và đọc hoặc đăng ký theo dõi các đặc tính trên đó. Một camera thu thập dữ liệu từ cảm biến đeo được, lắng nghe beacon, hoặc giao tiếp với vi điều khiển phụ là một central.

Mô hình central trong aioble trải qua bốn giai đoạn: quét, kết nối, khám phá, vận hành.

11.10.1. Quét

aioble.scan() trả về một async context manager đồng thời hoạt động như một async iterator duyệt qua các thiết bị được phát hiện. Cách sử dụng thông thường là quét cho đến khi thiết bị cần tìm xuất hiện, rồi thoát khỏi vòng lặp:

import aioble
import asyncio
import bluetooth

HR_SERVICE = bluetooth.UUID(0x180D)

async def find_heart_rate():
    async with aioble.scan(duration_ms=5000, active=True) as scanner:
        async for result in scanner:
            if HR_SERVICE in result.services():
                return result.device
    return None

duration_ms=5000 giới hạn thời gian quét; duration_ms=0 quét mãi mãi (cho đến khi context manager thoát). active=True yêu cầu scan response, nhân đôi kích thước payload mỗi thiết bị với chi phí thêm một lần truyền nhỏ từ cả hai phía. Các tham số từ khóa còn lại interval_us / window_us điều chỉnh chu kỳ hoạt động của radio máy quét và hiếm khi cần thay đổi so với mặc định.

Mỗi aioble.ScanResult cung cấp địa chỉ thiết bị, RSSI gần nhất, dữ liệu quảng bá và scan response thô, và các hàm hỗ trợ phân tích các trường chuẩn:

  • result.device -- một aioble.Device sẵn sàng để gọi connect().

  • result.rssi -- chỉ số cường độ tín hiệu nhận được tính bằng dBm, hữu ích cho logic "chọn thiết bị gần nhất".

  • result.name() -- chuỗi tên cục bộ, hoặc None nếu không được quảng bá.

  • result.services() -- một generator của bluetooth.UUID cho mỗi dịch vụ mà thiết bị quảng bá.

  • result.manufacturer() -- một generator của các tuple (company_id, data) cho các trường đặc thù của nhà sản xuất.

  • result.connectable -- liệu quảng bá gần nhất có phải là quảng bá có thể kết nối hay không.

Cùng một ScanResult được trả về lại khi dữ liệu quảng bá mới đến từ cùng thiết bị đó, vì vậy một listener thụ động chỉ muốn theo dõi thiết bị vô thời hạn có thể chạy async iterator mãi mãi và xử lý từng sự kiện.

11.10.2. Kết nối

Khi đã xác định được thiết bị đích, mở kết nối chỉ cần một lệnh await

async def talk_to(device):
    connection = await device.connect()           # 10 s timeout
    async with connection:
        # ... do GATT work ...
        pass

aioble.Device.connect() nhận timeout_ms (thời gian chờ kết nối thiết lập; mặc định 10 giây), và min_conn_interval_us / max_conn_interval_us (khoảng thời gian kết nối yêu cầu từ Kết nối).

11.10.2.1. Kết nối lại với một peer đã biết mà không cần quét

Khi đã có bond với một peer, địa chỉ đã được biết và việc quét-và-chọn lại là lãng phí thời gian radio. Tạo trực tiếp một aioble.Device với địa chỉ đã lưu và bỏ qua bước quét, đi thẳng đến connect()

import aioble

KITCHEN_CAM = aioble.Device(aioble.ADDR_PUBLIC,
                            "aa:bb:cc:dd:ee:ff")

async def talk_to_kitchen():
    async with await KITCHEN_CAM.connect() as connection:
        # ... GATT work ...
        pass

Tham số đầu tiên là một trong aioble.ADDR_PUBLIC (địa chỉ gốc của bộ điều khiển) hoặc aioble.ADDR_RANDOM (địa chỉ tĩnh được tạo ra hoặc địa chỉ riêng tư có thể phân giải); tham số thứ hai là giá trị bytes sáu byte hoặc chuỗi hex phân cách bằng dấu hai chấm. Các thuộc tính addr_typeaddr của bất kỳ Device nào (ví dụ: lấy trước đó từ ScanResult) đều có thể được lưu lại và truyền vào đây.

aioble.DeviceConnection được trả về là nơi phần còn lại của công việc central dựa vào. async with đảm bảo kết nối được đóng khi khối thoát -- thành công, bị hủy, hoặc bất kỳ ngoại lệ nào kể cả aioble.DeviceDisconnectedError khi peer ngắt kết nối.

Nếu central cần giá trị đặc tính lớn hơn MTU mặc định 23 byte cho phép, đây là nơi để thương lượng:

await connection.exchange_mtu(512)

(exchange_mtu() trả về MTU thực sự được thương lượng, là giá trị tối thiểu giữa giá trị yêu cầu và những gì peer hỗ trợ.)

11.10.3. Khám phá

Khám phá duyệt cơ sở dữ liệu GATT từ xa để tìm các dịch vụ và đặc tính theo UUID của chúng. Có hai dạng: có mục tiêu (bạn biết UUID và muốn một thứ cụ thể) và toàn diện (bạn muốn tất cả mọi thứ).

Có mục tiêu -- trường hợp thông thường:

service = await connection.service(HR_SERVICE)
if service is None:
    return                                        # no such service

char = await service.characteristic(HR_MEASUREMENT)
if char is None:
    return                                        # no such characteristic

aioble.DeviceConnection.service()aioble.ClientService.characteristic() đều nhận một bluetooth.UUID và trả về đối tượng khớp (hoặc None). Cả hai đều có từ khóa timeout_ms mỗi lần khám phá, mặc định là 2 giây.

Toàn diện:

async for service in connection.services():
    print("service:", service.uuid)
    async for char in service.characteristics():
        print("  characteristic:", char.uuid, "properties:", hex(char.properties))

Đây là điều mà các ứng dụng khám phá Bluetooth thông thường làm -- hữu ích khi phát triển, ít hữu ích hơn cho mã sản xuất đã biết UUID cần dùng.

11.10.3.1. Kiểm tra những gì một đặc tính hỗ trợ

Khám phá trả về bitmask thuộc tính GATT mà peer quảng bá cho mỗi đặc tính dưới dạng properties. Các bit là các bit được định nghĩa bởi GATT -- đọc (0x02), write-without-response (0x04), write (0x08), notify (0x10), indicate (0x20), và các bit khác. Kiểm tra bitmask trước khi thực hiện thao tác cho phép một client thông thường thích nghi với các đặc tính mà nó chưa biết trước khả năng:

_PROP_READ = const(0x02)
_PROP_NOTIFY = const(0x10)

char = await service.characteristic(STATUS_UUID)
if char.properties & _PROP_NOTIFY:
    await char.subscribe(notify=True)
    value = await char.notified()
elif char.properties & _PROP_READ:
    value = await char.read()
else:
    value = None                                  # nothing the client can do

Mã sản xuất đã biết profile GATT của peer thường không cần điều này -- các UUID đã được ghi chép từ trước. Các client thông thường/khám phá (một trang cài đặt duyệt thiết bị không rõ, một plugin host) dựa vào nó.

11.10.4. Vận hành

Khi central đã có ClientCharacteristic, mỗi thao tác GATT là một lệnh gọi coroutine:

  • Đọc. Thực hiện GATT read và nhận giá trị về:

    value = await char.read()
    print("value:", value)
    

    Long read (giá trị lớn hơn MTU) được xử lý một cách minh bạch.

  • Ghi. Gửi giá trị mới đến server:

    await char.write(b"\\x01")
    

    response=True chờ write-response và phát sinh aioble.GattError nếu server từ chối lần ghi. response=False là write-without-response: bắn-và-quên. response=None (mặc định) tự chọn dựa trên những gì peer quảng bá.

  • Đăng ký. Kích hoạt notification hoặc indication bằng cách ghi vào CCCD của đặc tính:

    await char.subscribe(notify=True)
    

    Sau khi trả về, central có thể chờ các push đến.

  • Notified / indicated. Chờ push tiếp theo từ server:

    while True:
        data = await char.notified()
        print("push:", data)
    

    timeout_ms=None (mặc định) chờ mãi mãi; truyền một số nguyên tính bằng mili giây để từ bỏ sau một khoảng thời gian.

Kết hợp cả bốn lại tạo ra chương trình central chuẩn "kết nối, đăng ký, stream":

async def stream_heart_rate():
    async with aioble.scan(duration_ms=5000, active=True) as scanner:
        async for result in scanner:
            if HR_SERVICE in result.services():
                device = result.device
                break
        else:
            return

    async with await device.connect() as connection:
        service = await connection.service(HR_SERVICE)
        char = await service.characteristic(HR_MEASUREMENT)
        await char.subscribe(notify=True)
        while connection.is_connected():
            data = await char.notified()
            print("hr push:", data)

asyncio.run(stream_heart_rate())

Toàn bộ chỉ khoảng một chục dòng và bao trùm luồng từ "chưa chạy Bluetooth" đến "đang stream dữ liệu trực tiếp". Iterator quét khớp với mẫu broadcaster/observer, connect mở kết nối GAP, service / characteristic duyệt cây GATT, subscribe ghi vào CCCD, và notified chờ push.

11.10.5. Ngắt kết nối và kết nối lại

Bất cứ điều gì xảy ra với liên kết radio sẽ hiển thị trong coroutine đang chờ nó. aioble.DeviceDisconnectedError là tín hiệu cho biết peer đã mất kết nối hoặc supervision timeout đã kích hoạt; ngoại lệ này chấm dứt bất kỳ lệnh gọi read(), write(), hoặc notified() nào đang thực hiện, và bất kỳ khối async with connection nào cũng thoát sạch.

Một central cần kết nối lại khi mất kết nối sẽ bọc công việc trong vòng lặp ngoài của riêng nó:

async def keep_streaming():
    while True:
        try:
            await stream_heart_rate()
        except aioble.DeviceDisconnectedError:
            print("disconnected, retrying...")
            await asyncio.sleep(2)

11.10.5.1. Bao bọc một chuỗi bằng timeout()

Khi một số thao tác GATT liên tiếp phải hoàn thành trong một ngân sách thời gian -- không phải từng cái riêng lẻ với timeout_ms của riêng nó -- hãy dùng aioble.DeviceConnection.timeout() để bọc chúng. Context manager được trả về sẽ hủy phần thân nếu ngân sách hết (phát sinh asyncio.TimeoutError) hoặc nếu peer ngắt kết nối (phát sinh aioble.DeviceDisconnectedError):

async with await device.connect() as connection:
    try:
        with connection.timeout(2000):                    # 2 s for the whole block
            service = await connection.service(HR_SERVICE)
            char = await service.characteristic(HR_MEASUREMENT)
            await char.subscribe(notify=True)
    except asyncio.TimeoutError:
        print("discovery + subscribe took too long")

Đây là phương án gọn hơn so với việc bọc từng lệnh gọi trong asyncio.wait_for() và tránh các thành công giả tạo khi mỗi lệnh gọi đáp ứng deadline riêng nhưng toàn bộ chuỗi lại vượt quá thời gian. Truyền timeout_ms=None cho timeout() vô hiệu hóa deadline và chỉ giữ lại guard ngắt kết nối.