11.10. การทำงานในฐานะ central

อีกด้านหนึ่งของการสื่อสารคือ central -- อุปกรณ์ที่สแกนหา peripheral ที่กำลังโฆษณา เลือกอุปกรณ์ที่ต้องการคุยด้วย เปิดการเชื่อมต่อ เดินผ่านฐานข้อมูล GATT ระยะไกล และอ่านหรือสมัครสมาชิกลักษณะเด่นบนนั้น กล้องที่รวบรวมค่าอ่านจาก wearable sensor ฟัง beacon หรือสื่อสารกับ companion microcontroller ถือเป็น central

รูปแบบ central ใน aioble ดำเนินไปสี่ขั้นตอน ได้แก่ สแกน เชื่อมต่อ ค้นพบ และดำเนินการ

11.10.1. การสแกน

aioble.scan() ส่งคืน async context manager ที่ทำหน้าที่เป็น async iterator สำหรับอุปกรณ์ที่ค้นพบด้วย การใช้งานทั่วไปคือสแกนจนกว่าจะพบอุปกรณ์ที่ต้องการ แล้วหยุดการวนซ้ำ:

import aioble
import asyncio
import bluetooth

HR_SERVICE = bluetooth.UUID(0x180D)

async def find_heart_rate():
    async with aioble.scan(duration_ms=5000, active=True) as scanner:
        async for result in scanner:
            if HR_SERVICE in result.services():
                return result.device
    return None

duration_ms=5000 จำกัดระยะเวลาสแกน; duration_ms=0 สแกนตลอดไป (จนกว่า context manager จะออก) active=True ร้องขอ scan responses ซึ่งทำให้ขนาด payload ต่ออุปกรณ์เพิ่มขึ้นเป็นสองเท่าแลกกับการส่งข้อมูลเพิ่มเติมเล็กน้อยจากทั้งสองฝ่าย อาร์กิวเมนต์คีย์เวิร์ด interval_us / window_us ที่เหลือปรับแต่ง radio duty cycle ของ scanner เองและแทบไม่ค่อยมีการเปลี่ยนแปลงจากค่าเริ่มต้น

aioble.ScanResult แต่ละรายการเปิดเผยที่อยู่อุปกรณ์ RSSI ล่าสุด ไบต์โฆษณาและ scan response ดิบ และตัวช่วยที่แยกวิเคราะห์ฟิลด์มาตรฐาน:

  • result.device -- aioble.Device ที่พร้อมเรียก connect()

  • result.rssi -- ตัวบ่งชี้ความแรงของสัญญาณที่รับได้ในหน่วย dBm มีประโยชน์สำหรับตรรกะ "เลือกอุปกรณ์ที่ใกล้ที่สุด"

  • result.name() -- สตริงชื่อเฉพาะถิ่น หรือ None หากไม่ได้โฆษณา

  • result.services() -- generator ของ bluetooth.UUID สำหรับทุกบริการที่อุปกรณ์โฆษณา

  • result.manufacturer() -- generator ของ tuple (company_id, data) สำหรับฟิลด์เฉพาะผู้ผลิต

  • result.connectable -- ว่าโฆษณาล่าสุดเป็นแบบเชื่อมต่อได้หรือไม่

ScanResult เดิมจะถูกส่งคืนซ้ำเมื่อมีข้อมูลโฆษณาใหม่สำหรับอุปกรณ์เดียวกัน ดังนั้น listener แบบ passive ที่ต้องการติดตามอุปกรณ์อย่างไม่มีกำหนดสามารถรัน async iterator ตลอดไปและประมวลผลแต่ละ event ได้

11.10.2. การเชื่อมต่อ

เมื่อระบุอุปกรณ์เป้าหมายได้แล้ว การเปิดการเชื่อมต่อทำได้ด้วย await เดียว:

async def talk_to(device):
    connection = await device.connect()           # 10 s timeout
    async with connection:
        # ... do GATT work ...
        pass

aioble.Device.connect() รับ timeout_ms (ระยะเวลาที่รอการเชื่อมต่อ; ค่าเริ่มต้น 10 วินาที) และ min_conn_interval_us / max_conn_interval_us (ช่วง connection-interval ที่ร้องขอจาก การเชื่อมต่อ)

11.10.2.1. การเชื่อมต่อใหม่ไปยัง peer ที่รู้จักโดยไม่ต้องสแกน

เมื่อมี bond กับ peer แล้ว ที่อยู่เป็นที่รู้จักแล้วและรอบการสแกนและเลือกอีกครั้งเป็นการเสียเวลาส่งสัญญาณวิทยุ สร้าง aioble.Device โดยตรงด้วยที่อยู่ที่บันทึกไว้และข้ามไปยัง connect() ทันที:

import aioble

KITCHEN_CAM = aioble.Device(aioble.ADDR_PUBLIC,
                            "aa:bb:cc:dd:ee:ff")

async def talk_to_kitchen():
    async with await KITCHEN_CAM.connect() as connection:
        # ... GATT work ...
        pass

อาร์กิวเมนต์แรกคือหนึ่งใน aioble.ADDR_PUBLIC (ที่อยู่จากโรงงานของ controller) หรือ aioble.ADDR_RANDOM (ที่อยู่ static หรือ resolvable private ที่สร้างขึ้น); อาร์กิวเมนต์ที่สองเป็นค่า bytes หกไบต์หรือสตริง hex คั่นด้วยเครื่องหมายโคลอน แอตทริบิวต์ addr_type และ addr ของ Device ใดๆ (เช่น ที่ได้มาก่อนหน้านี้จาก ScanResult) สามารถบันทึกและนำกลับมาใช้ที่นี่ได้

aioble.DeviceConnection ที่ส่งคืนคือสิ่งที่งานที่เหลือของ central ขึ้นอยู่กับ async with รับประกันว่าการเชื่อมต่อจะถูกปิดเมื่อ block ออก -- เมื่อสำเร็จ เมื่อถูกยกเลิก หรือเมื่อเกิด exception ใดๆ รวมถึง aioble.DeviceDisconnectedError จากการที่ peer หายไป

หาก central ต้องการค่า characteristic ที่ใหญ่กว่าที่ MTU เริ่มต้น 23 ไบต์อนุญาต ที่นี่คือจุดที่ควรเจรจา:

await connection.exchange_mtu(512)

(exchange_mtu() ส่งคืน MTU ที่เจรจาจริง ซึ่งเป็นค่าต่ำสุดของค่าที่ร้องขอและสิ่งที่ peer รองรับ)

11.10.3. การค้นพบ

การค้นพบเดินผ่านฐานข้อมูล GATT ระยะไกลเพื่อค้นหาบริการและ characteristic ตาม UUID มีสองแบบ ได้แก่ แบบมีเป้าหมาย (คุณรู้ UUID และต้องการสิ่งหนึ่งโดยเฉพาะ) และแบบครอบคลุม (คุณต้องการทุกอย่าง)

แบบมีเป้าหมาย -- กรณีทั่วไป:

service = await connection.service(HR_SERVICE)
if service is None:
    return                                        # no such service

char = await service.characteristic(HR_MEASUREMENT)
if char is None:
    return                                        # no such characteristic

aioble.DeviceConnection.service() และ aioble.ClientService.characteristic() แต่ละอันรับ bluetooth.UUID และส่งคืนออบเจกต์ที่ตรงกัน (หรือ None) ทั้งคู่มีคีย์เวิร์ด timeout_ms ต่อการค้นพบที่ค่าเริ่มต้นคือ 2 วินาที

แบบครอบคลุม:

async for service in connection.services():
    print("service:", service.uuid)
    async for char in service.characteristics():
        print("  characteristic:", char.uuid, "properties:", hex(char.properties))

นี่คือสิ่งที่แอป Bluetooth-explorer ทั่วไปทำ -- มีประโยชน์สำหรับการพัฒนา แต่ไม่มากนักสำหรับโค้ดในการผลิตที่รู้ว่าคาดหวัง UUID ใด

11.10.3.1. การตรวจสอบสิ่งที่ characteristic รองรับ

การค้นพบส่งคืน GATT property bitmask ที่ peer โฆษณาสำหรับ characteristic แต่ละตัวเป็น properties บิตเหล่านี้คือบิตที่กำหนดโดย GATT -- อ่าน (0x02) เขียนโดยไม่ตอบสนอง (0x04) เขียน (0x08) แจ้งเตือน (0x10) บ่งชี้ (0x20) และอื่นๆ การตรวจสอบ bitmask ก่อนดำเนินการช่วยให้ client ทั่วไปปรับตัวกับ characteristic ที่ไม่รู้ความสามารถล่วงหน้า:

_PROP_READ = const(0x02)
_PROP_NOTIFY = const(0x10)

char = await service.characteristic(STATUS_UUID)
if char.properties & _PROP_NOTIFY:
    await char.subscribe(notify=True)
    value = await char.notified()
elif char.properties & _PROP_READ:
    value = await char.read()
else:
    value = None                                  # nothing the client can do

โค้ดในการผลิตที่รู้ GATT profile ของ peer อยู่แล้วมักไม่จำเป็นต้องทำสิ่งนี้ -- UUID ถูกบันทึกไว้ล่วงหน้าแล้ว client แบบทั่วไป/สำรวจ (หน้าการตั้งค่าที่เดินผ่านอุปกรณ์ที่ไม่รู้จัก, plugin host) อาศัยสิ่งนี้

11.10.4. การดำเนินการ

เมื่อ central ถือ ClientCharacteristic แล้ว การดำเนินการ GATT แต่ละอย่างคือการเรียก coroutine เดียว:

  • อ่าน. ส่งคำขออ่าน GATT และรับค่ากลับมา:

    value = await char.read()
    print("value:", value)
    

    การอ่านขนาดยาว (ค่าที่ใหญ่กว่า MTU) จัดการอย่างโปร่งใส

  • เขียน. ส่งค่าใหม่ไปยัง server:

    await char.write(b"\\x01")
    

    response=True รอการตอบสนองการเขียนและยก aioble.GattError หาก server ปฏิเสธการเขียน response=False คือการเขียนโดยไม่ตอบสนอง: ส่งแล้วลืม response=None (ค่าเริ่มต้น) เลือกอัตโนมัติตามสิ่งที่ peer โฆษณา

  • สมัครสมาชิก. เปิดใช้งานการแจ้งเตือนหรือการบ่งชี้โดยเขียนไปยัง CCCD ของ characteristic:

    await char.subscribe(notify=True)
    

    หลังจากนี้ส่งคืน central สามารถรอการ push ขาเข้าได้

  • แจ้งเตือน/บ่งชี้. รอการ push ครั้งถัดไปจาก server:

    while True:
        data = await char.notified()
        print("push:", data)
    

    timeout_ms=None (ค่าเริ่มต้น) รอตลอดไป; ส่งจำนวนเต็มเป็นมิลลิวินาทีเพื่อหยุดรอหลังจากระยะเวลาหนึ่ง

การนำทั้งสี่มารวมกันได้โปรแกรม central แบบ "เชื่อมต่อ สมัครสมาชิก สตรีม" ที่เป็นบรรทัดฐาน:

async def stream_heart_rate():
    async with aioble.scan(duration_ms=5000, active=True) as scanner:
        async for result in scanner:
            if HR_SERVICE in result.services():
                device = result.device
                break
        else:
            return

    async with await device.connect() as connection:
        service = await connection.service(HR_SERVICE)
        char = await service.characteristic(HR_MEASUREMENT)
        await char.subscribe(notify=True)
        while connection.is_connected():
            data = await char.notified()
            print("hr push:", data)

asyncio.run(stream_heart_rate())

ทั้งหมดนี้ใช้โค้ดประมาณหนึ่งโหลบรรทัดและครอบคลุมขั้นตอนจาก "ไม่มี Bluetooth ทำงาน" ถึง "สตรีมข้อมูลสด" scan iterator ตรงกับรูปแบบ broadcaster/observer, connect เปิดการเชื่อมต่อ GAP, service / characteristic เดินผ่านต้นไม้ GATT, subscribe เขียน CCCD และ notified รอการ push

11.10.5. การตัดการเชื่อมต่อและการเชื่อมต่อใหม่

ทุกสิ่งที่เกิดขึ้นกับลิงก์วิทยุจะปรากฏใน coroutine ที่กำลังรออยู่ aioble.DeviceDisconnectedError คือสัญญาณว่า peer หายไปหรือ supervision timeout ทำงาน; exception ยุติการเรียก read(), write() หรือ notified() ที่กำลังดำเนินการอยู่ และ block async with connection ใดๆ จะออกอย่างสะอาด

central ที่ควรเชื่อมต่อใหม่เมื่อสูญเสียการเชื่อมต่อจะห่อหุ้มงานในลูปด้านนอกของตัวเอง:

async def keep_streaming():
    while True:
        try:
            await stream_heart_rate()
        except aioble.DeviceDisconnectedError:
            print("disconnected, retrying...")
            await asyncio.sleep(2)

11.10.5.1. การล้อมลำดับด้วย timeout()

เมื่อการดำเนินการ GATT หลายรายการติดต่อกันควรเสร็จสิ้นภายในงบประมาณเดียว -- ไม่ใช่แต่ละรายการบน timeout_ms ของตัวเอง -- ใช้ aioble.DeviceConnection.timeout() เพื่อห่อหุ้มพวกมัน context manager ที่ส่งคืนจะยกเลิก body ของมันหากงบประมาณหมดอายุ (ยก asyncio.TimeoutError) หรือหาก peer ตัดการเชื่อมต่อ (ยก aioble.DeviceDisconnectedError):

async with await device.connect() as connection:
    try:
        with connection.timeout(2000):                    # 2 s for the whole block
            service = await connection.service(HR_SERVICE)
            char = await service.characteristic(HR_MEASUREMENT)
            await char.subscribe(notify=True)
    except asyncio.TimeoutError:
        print("discovery + subscribe took too long")

นี่คือทางเลือกที่สะอาดกว่าการห่อแต่ละการเรียกแยกกันใน asyncio.wait_for() และหลีกเลี่ยงความสำเร็จเทียมที่แต่ละการเรียกตรงตาม deadline ของตัวเองแต่ลำดับโดยรวมเกินเวลา การส่ง timeout_ms=None ไปยัง timeout() จะปิดใช้งาน deadline และเหลือเพียง disconnect guard ที่ยังคงทำงาน