11.10. 센트럴로 동작하기

대화의 반대편은 센트럴(central) 입니다. 광고하는 페리페럴을 스캔하고, 통신할 하나를 선택하고, 연결을 열고, 원격 GATT 데이터베이스를 탐색하며, 그 위의 특성을 읽거나 구독하는 장치입니다. 웨어러블 센서에서 측정값을 수집하거나, 비콘을 청취하거나, 동반 마이크로컨트롤러와 통신하는 카메라가 센트럴입니다.

aioble 의 센트럴 패턴은 네 단계로 진행됩니다: 스캔, 연결, 탐색, 운용.

11.10.1. 스캔

aioble.scan() 은 비동기 컨텍스트 관리자를 반환하며, 이것은 발견된 장치들에 대한 비동기 이터레이터 역할도 겸합니다. 전형적인 사용법은 관심 있는 장치가 나타날 때까지 스캔한 뒤 이터레이션에서 빠져나오는 것입니다:

import aioble
import asyncio
import bluetooth

HR_SERVICE = bluetooth.UUID(0x180D)

async def find_heart_rate():
    async with aioble.scan(duration_ms=5000, active=True) as scanner:
        async for result in scanner:
            if HR_SERVICE in result.services():
                return result.device
    return None

duration_ms=5000 은 스캔이 실행되는 시간의 상한을 정합니다. duration_ms=0 은 (컨텍스트 관리자가 종료될 때까지) 영원히 스캔합니다. active=True 는 스캔 응답을 요청하는데, 이는 양쪽 모두에서의 약간의 추가 전송을 대가로 장치당 페이로드 크기를 두 배로 만듭니다. 나머지 interval_us / window_us 키워드 인자는 스캐너 자체 라디오 듀티 사이클을 조정하며, 기본값에서 변경하는 경우는 드뭅니다.

aioble.ScanResult 는 장치 주소, 마지막 RSSI, 원시 광고 및 스캔 응답 바이트, 그리고 표준 필드를 파싱하는 헬퍼를 노출합니다:

  • result.deviceconnect() 를 호출할 준비가 된 aioble.Device 입니다.

  • result.rssi – dBm 단위의 수신 신호 강도 표시기로, “가장 가까운 장치 선택” 로직에 유용합니다.

  • result.name() – 로컬 이름 문자열이거나, 광고되지 않았으면 None 입니다.

  • result.services() – 장치가 광고하는 모든 서비스에 대한 bluetooth.UUID 의 제너레이터입니다.

  • result.manufacturer() – 제조사별 필드에 대한 (company_id, data) 튜플의 제너레이터입니다.

  • result.connectable – 가장 최근 광고가 연결 가능한 것이었는지 여부입니다.

동일한 장치에 대해 새로운 광고 데이터가 도착하면 같은 ScanResult 가 다시 yield되므로, 그저 장치를 무기한 추적하려는 수동적 리스너는 비동기 이터레이터를 영원히 실행하며 각 이벤트에 대해 디스패치할 수 있습니다.

11.10.2. 연결

대상 장치가 식별되면, 연결을 여는 것은 한 번의 await 입니다:

async def talk_to(device):
    connection = await device.connect()           # 10 s timeout
    async with connection:
        # ... do GATT work ...
        pass

aioble.Device.connect()timeout_ms (연결이 수립되기를 기다리는 시간, 기본값 10초)와 min_conn_interval_us / max_conn_interval_us (연결 에서 다룬 요청 연결 간격 범위)를 받습니다.

11.10.2.1. 스캔 없이 알려진 피어에 재연결하기

피어와 본딩이 이미 존재한다면 주소는 이미 알려져 있으므로 또 한 번의 스캔-선택 과정은 라디오 시간 낭비입니다. 저장된 주소로 aioble.Device 를 직접 생성하고 곧바로 connect() 로 건너뛰세요:

import aioble

KITCHEN_CAM = aioble.Device(aioble.ADDR_PUBLIC,
                            "aa:bb:cc:dd:ee:ff")

async def talk_to_kitchen():
    async with await KITCHEN_CAM.connect() as connection:
        # ... GATT work ...
        pass

첫 번째 인자는 aioble.ADDR_PUBLIC (컨트롤러의 공장 주소) 또는 aioble.ADDR_RANDOM (생성된 정적 또는 해석 가능한 비공개 주소) 중 하나입니다. 두 번째는 6바이트 bytes 값이거나 콜론으로 구분된 16진수 문자열입니다. 임의의 Device (예: 앞서 ScanResult 에서 얻은 것)의 addr_typeaddr 속성을 영속화하여 여기에 다시 넣을 수 있습니다.

반환된 aioble.DeviceConnection 은 센트럴의 나머지 작업이 매달리는 대상입니다. async with 는 블록이 종료될 때(성공 시, 취소 시, 또는 피어가 사라져 발생하는 aioble.DeviceDisconnectedError 를 포함한 모든 예외 시) 연결이 닫히도록 보장합니다.

센트럴이 기본 23바이트 MTU가 허용하는 것보다 더 큰 특성 값을 필요로 한다면, 여기가 그것을 협상할 곳입니다:

await connection.exchange_mtu(512)

(exchange_mtu() 는 실제로 협상된 MTU를 반환하며, 이는 요청 값과 피어가 지원하는 값 중 최솟값입니다.)

11.10.3. 탐색

탐색은 원격 GATT 데이터베이스를 탐색하여 UUID로 서비스와 특성을 찾습니다. 두 가지 방식이 있습니다: 표적형(UUID를 알고 있고 특정한 하나를 원하는 경우)과 전수형(모든 것을 원하는 경우)입니다.

표적형 – 일반적인 경우:

service = await connection.service(HR_SERVICE)
if service is None:
    return                                        # no such service

char = await service.characteristic(HR_MEASUREMENT)
if char is None:
    return                                        # no such characteristic

aioble.DeviceConnection.service()aioble.ClientService.characteristic() 는 각각 bluetooth.UUID 를 받아 일치하는 객체(또는 None)를 반환합니다. 둘 다 탐색마다 적용되는 timeout_ms 키워드를 가지며 기본값은 2초입니다.

전수형:

async for service in connection.services():
    print("service:", service.uuid)
    async for char in service.characteristics():
        print("  characteristic:", char.uuid, "properties:", hex(char.properties))

이것은 범용 Bluetooth 탐색기 앱이 하는 일입니다. 개발에는 유용하지만, 어떤 UUID를 기대하는지 아는 프로덕션 코드에는 덜 유용합니다.

11.10.3.1. 특성이 무엇을 지원하는지 검사하기

탐색은 피어가 각 특성에 대해 광고한 GATT 속성 비트마스크를 properties 로 반환합니다. 비트는 GATT가 정의한 것들로, 읽기(0x02), 응답 없는 쓰기(0x04), 쓰기(0x08), 알림(0x10), 표시(0x20) 등입니다. 연산을 실행하기 전에 비트마스크를 검사하면, 미리 능력을 알지 못하는 특성에 대해 범용 클라이언트가 적응할 수 있습니다:

_PROP_READ = const(0x02)
_PROP_NOTIFY = const(0x10)

char = await service.characteristic(STATUS_UUID)
if char.properties & _PROP_NOTIFY:
    await char.subscribe(notify=True)
    value = await char.notified()
elif char.properties & _PROP_READ:
    value = await char.read()
else:
    value = None                                  # nothing the client can do

피어의 GATT 프로파일을 이미 아는 프로덕션 코드는 보통 이것이 필요하지 않습니다. UUID가 미리 문서화되어 있기 때문입니다. 범용/탐색용 클라이언트(알 수 없는 장치를 탐색하는 설정 페이지, 플러그인 호스트)가 이것에 의존합니다.

11.10.4. 운용

센트럴이 ClientCharacteristic 를 보유하게 되면, 각 GATT 연산은 한 번의 코루틴 호출입니다:

  • 읽기. GATT 읽기를 실행하고 값을 돌려받습니다:

    value = await char.read()
    print("value:", value)
    

    긴 읽기(MTU보다 큰 값)는 투명하게 처리됩니다.

  • 쓰기. 서버에 새 값을 보냅니다:

    await char.write(b"\\x01")
    

    response=True 는 쓰기 응답을 기다리고, 서버가 쓰기를 거부하면 aioble.GattError 를 일으킵니다. response=False 는 응답 없는 쓰기로, 보내고 잊는 방식입니다. response=None (기본값)은 피어가 광고한 내용에 따라 자동으로 선택합니다.

  • 구독. 특성의 CCCD에 써서 알림 또는 표시를 활성화합니다:

    await char.subscribe(notify=True)
    

    이것이 반환된 후, 센트럴은 들어오는 푸시를 기다릴 수 있습니다.

  • 알림 / 표시 수신. 서버로부터의 다음 푸시를 기다립니다:

    while True:
        data = await char.notified()
        print("push:", data)
    

    timeout_ms=None (기본값)은 영원히 기다립니다. 일정 시간 후 포기하려면 밀리초 단위의 정수를 전달하세요.

이 네 가지를 함께 묶으면 정형적인 “연결, 구독, 스트리밍” 센트럴 프로그램이 됩니다:

async def stream_heart_rate():
    async with aioble.scan(duration_ms=5000, active=True) as scanner:
        async for result in scanner:
            if HR_SERVICE in result.services():
                device = result.device
                break
        else:
            return

    async with await device.connect() as connection:
        service = await connection.service(HR_SERVICE)
        char = await service.characteristic(HR_MEASUREMENT)
        await char.subscribe(notify=True)
        while connection.is_connected():
            data = await char.notified()
            print("hr push:", data)

asyncio.run(stream_heart_rate())

전체가 약 열두 줄 정도이며, “Bluetooth가 실행되지 않은 상태”에서 “실시간 데이터 스트리밍”까지의 흐름을 다룹니다. 스캔 이터레이터는 브로드캐스터/옵저버 패턴에 대응하고, connect 는 GAP 연결을 열고, service / characteristic 는 GATT 트리를 탐색하고, subscribe 는 CCCD를 쓰며, notified 는 푸시를 기다립니다.

11.10.5. 연결 끊김과 재연결

라디오 링크에 일어나는 일은 모두 그것을 기다리던 코루틴에 나타납니다. aioble.DeviceDisconnectedError 는 피어가 사라졌거나 감독 타임아웃이 발생했다는 신호입니다. 이 예외는 진행 중이던 read(), write(), 또는 notified() 호출을 종료시키며, 모든 async with connection 블록이 깔끔하게 종료됩니다.

손실 시 재연결해야 하는 센트럴은 작업을 자체 외부 루프로 감쌉니다:

async def keep_streaming():
    while True:
        try:
            await stream_heart_rate()
        except aioble.DeviceDisconnectedError:
            print("disconnected, retrying...")
            await asyncio.sleep(2)

11.10.5.1. timeout()으로 시퀀스 묶기

여러 GATT 연산이 연이어 모두 하나의 예산 안에서 완료되어야 할 때(각각이 개별 timeout_ms 로 따로가 아니라), aioble.DeviceConnection.timeout() 을 사용해 그것들을 감싸세요. 반환된 컨텍스트 관리자는 예산이 경과하면(asyncio.TimeoutError 를 일으키며) 또는 피어가 연결을 끊으면(aioble.DeviceDisconnectedError 를 일으키며) 본문을 취소합니다:

async with await device.connect() as connection:
    try:
        with connection.timeout(2000):                    # 2 s for the whole block
            service = await connection.service(HR_SERVICE)
            char = await service.characteristic(HR_MEASUREMENT)
            await char.subscribe(notify=True)
    except asyncio.TimeoutError:
        print("discovery + subscribe took too long")

이것은 각 호출을 개별적으로 asyncio.wait_for() 로 감싸는 것보다 깔끔한 대안이며, 각 호출은 자신의 마감을 지켰지만 시퀀스 전체로는 초과 실행되는 가짜 성공을 방지합니다. timeout()timeout_ms=None 을 전달하면 마감이 비활성화되고 연결 끊김 가드만 활성 상태로 남습니다.