11.10. 센트럴로 동작하기¶
대화의 반대편은 센트럴(central) 입니다. 광고하는 페리페럴을 스캔하고, 통신할 하나를 선택하고, 연결을 열고, 원격 GATT 데이터베이스를 탐색하며, 그 위의 특성을 읽거나 구독하는 장치입니다. 웨어러블 센서에서 측정값을 수집하거나, 비콘을 청취하거나, 동반 마이크로컨트롤러와 통신하는 카메라가 센트럴입니다.
aioble 의 센트럴 패턴은 네 단계로 진행됩니다: 스캔, 연결, 탐색, 운용.
11.10.1. 스캔¶
aioble.scan() 은 비동기 컨텍스트 관리자를 반환하며, 이것은 발견된 장치들에 대한 비동기 이터레이터 역할도 겸합니다. 전형적인 사용법은 관심 있는 장치가 나타날 때까지 스캔한 뒤 이터레이션에서 빠져나오는 것입니다:
import aioble
import asyncio
import bluetooth
HR_SERVICE = bluetooth.UUID(0x180D)
async def find_heart_rate():
async with aioble.scan(duration_ms=5000, active=True) as scanner:
async for result in scanner:
if HR_SERVICE in result.services():
return result.device
return None
duration_ms=5000 은 스캔이 실행되는 시간의 상한을 정합니다. duration_ms=0 은 (컨텍스트 관리자가 종료될 때까지) 영원히 스캔합니다. active=True 는 스캔 응답을 요청하는데, 이는 양쪽 모두에서의 약간의 추가 전송을 대가로 장치당 페이로드 크기를 두 배로 만듭니다. 나머지 interval_us / window_us 키워드 인자는 스캐너 자체 라디오 듀티 사이클을 조정하며, 기본값에서 변경하는 경우는 드뭅니다.
각 aioble.ScanResult 는 장치 주소, 마지막 RSSI, 원시 광고 및 스캔 응답 바이트, 그리고 표준 필드를 파싱하는 헬퍼를 노출합니다:
result.device–connect()를 호출할 준비가 된aioble.Device입니다.result.rssi– dBm 단위의 수신 신호 강도 표시기로, “가장 가까운 장치 선택” 로직에 유용합니다.result.name()– 로컬 이름 문자열이거나, 광고되지 않았으면None입니다.result.services()– 장치가 광고하는 모든 서비스에 대한bluetooth.UUID의 제너레이터입니다.result.manufacturer()– 제조사별 필드에 대한(company_id, data)튜플의 제너레이터입니다.result.connectable– 가장 최근 광고가 연결 가능한 것이었는지 여부입니다.
동일한 장치에 대해 새로운 광고 데이터가 도착하면 같은 ScanResult 가 다시 yield되므로, 그저 장치를 무기한 추적하려는 수동적 리스너는 비동기 이터레이터를 영원히 실행하며 각 이벤트에 대해 디스패치할 수 있습니다.
11.10.2. 연결¶
대상 장치가 식별되면, 연결을 여는 것은 한 번의 await 입니다:
async def talk_to(device):
connection = await device.connect() # 10 s timeout
async with connection:
# ... do GATT work ...
pass
aioble.Device.connect() 는 timeout_ms (연결이 수립되기를 기다리는 시간, 기본값 10초)와 min_conn_interval_us / max_conn_interval_us (연결 에서 다룬 요청 연결 간격 범위)를 받습니다.
11.10.2.1. 스캔 없이 알려진 피어에 재연결하기¶
피어와 본딩이 이미 존재한다면 주소는 이미 알려져 있으므로 또 한 번의 스캔-선택 과정은 라디오 시간 낭비입니다. 저장된 주소로 aioble.Device 를 직접 생성하고 곧바로 connect() 로 건너뛰세요:
import aioble
KITCHEN_CAM = aioble.Device(aioble.ADDR_PUBLIC,
"aa:bb:cc:dd:ee:ff")
async def talk_to_kitchen():
async with await KITCHEN_CAM.connect() as connection:
# ... GATT work ...
pass
첫 번째 인자는 aioble.ADDR_PUBLIC (컨트롤러의 공장 주소) 또는 aioble.ADDR_RANDOM (생성된 정적 또는 해석 가능한 비공개 주소) 중 하나입니다. 두 번째는 6바이트 bytes 값이거나 콜론으로 구분된 16진수 문자열입니다. 임의의 Device (예: 앞서 ScanResult 에서 얻은 것)의 addr_type 및 addr 속성을 영속화하여 여기에 다시 넣을 수 있습니다.
반환된 aioble.DeviceConnection 은 센트럴의 나머지 작업이 매달리는 대상입니다. async with 는 블록이 종료될 때(성공 시, 취소 시, 또는 피어가 사라져 발생하는 aioble.DeviceDisconnectedError 를 포함한 모든 예외 시) 연결이 닫히도록 보장합니다.
센트럴이 기본 23바이트 MTU가 허용하는 것보다 더 큰 특성 값을 필요로 한다면, 여기가 그것을 협상할 곳입니다:
await connection.exchange_mtu(512)
(exchange_mtu() 는 실제로 협상된 MTU를 반환하며, 이는 요청 값과 피어가 지원하는 값 중 최솟값입니다.)
11.10.3. 탐색¶
탐색은 원격 GATT 데이터베이스를 탐색하여 UUID로 서비스와 특성을 찾습니다. 두 가지 방식이 있습니다: 표적형(UUID를 알고 있고 특정한 하나를 원하는 경우)과 전수형(모든 것을 원하는 경우)입니다.
표적형 – 일반적인 경우:
service = await connection.service(HR_SERVICE)
if service is None:
return # no such service
char = await service.characteristic(HR_MEASUREMENT)
if char is None:
return # no such characteristic
aioble.DeviceConnection.service() 와 aioble.ClientService.characteristic() 는 각각 bluetooth.UUID 를 받아 일치하는 객체(또는 None)를 반환합니다. 둘 다 탐색마다 적용되는 timeout_ms 키워드를 가지며 기본값은 2초입니다.
전수형:
async for service in connection.services():
print("service:", service.uuid)
async for char in service.characteristics():
print(" characteristic:", char.uuid, "properties:", hex(char.properties))
이것은 범용 Bluetooth 탐색기 앱이 하는 일입니다. 개발에는 유용하지만, 어떤 UUID를 기대하는지 아는 프로덕션 코드에는 덜 유용합니다.
11.10.3.1. 특성이 무엇을 지원하는지 검사하기¶
탐색은 피어가 각 특성에 대해 광고한 GATT 속성 비트마스크를 properties 로 반환합니다. 비트는 GATT가 정의한 것들로, 읽기(0x02), 응답 없는 쓰기(0x04), 쓰기(0x08), 알림(0x10), 표시(0x20) 등입니다. 연산을 실행하기 전에 비트마스크를 검사하면, 미리 능력을 알지 못하는 특성에 대해 범용 클라이언트가 적응할 수 있습니다:
_PROP_READ = const(0x02)
_PROP_NOTIFY = const(0x10)
char = await service.characteristic(STATUS_UUID)
if char.properties & _PROP_NOTIFY:
await char.subscribe(notify=True)
value = await char.notified()
elif char.properties & _PROP_READ:
value = await char.read()
else:
value = None # nothing the client can do
피어의 GATT 프로파일을 이미 아는 프로덕션 코드는 보통 이것이 필요하지 않습니다. UUID가 미리 문서화되어 있기 때문입니다. 범용/탐색용 클라이언트(알 수 없는 장치를 탐색하는 설정 페이지, 플러그인 호스트)가 이것에 의존합니다.
11.10.4. 운용¶
센트럴이 ClientCharacteristic 를 보유하게 되면, 각 GATT 연산은 한 번의 코루틴 호출입니다:
읽기. GATT 읽기를 실행하고 값을 돌려받습니다:
value = await char.read() print("value:", value)
긴 읽기(MTU보다 큰 값)는 투명하게 처리됩니다.
쓰기. 서버에 새 값을 보냅니다:
await char.write(b"\\x01")response=True는 쓰기 응답을 기다리고, 서버가 쓰기를 거부하면aioble.GattError를 일으킵니다.response=False는 응답 없는 쓰기로, 보내고 잊는 방식입니다.response=None(기본값)은 피어가 광고한 내용에 따라 자동으로 선택합니다.구독. 특성의 CCCD에 써서 알림 또는 표시를 활성화합니다:
await char.subscribe(notify=True)이것이 반환된 후, 센트럴은 들어오는 푸시를 기다릴 수 있습니다.
알림 / 표시 수신. 서버로부터의 다음 푸시를 기다립니다:
while True: data = await char.notified() print("push:", data)
timeout_ms=None(기본값)은 영원히 기다립니다. 일정 시간 후 포기하려면 밀리초 단위의 정수를 전달하세요.
이 네 가지를 함께 묶으면 정형적인 “연결, 구독, 스트리밍” 센트럴 프로그램이 됩니다:
async def stream_heart_rate():
async with aioble.scan(duration_ms=5000, active=True) as scanner:
async for result in scanner:
if HR_SERVICE in result.services():
device = result.device
break
else:
return
async with await device.connect() as connection:
service = await connection.service(HR_SERVICE)
char = await service.characteristic(HR_MEASUREMENT)
await char.subscribe(notify=True)
while connection.is_connected():
data = await char.notified()
print("hr push:", data)
asyncio.run(stream_heart_rate())
전체가 약 열두 줄 정도이며, “Bluetooth가 실행되지 않은 상태”에서 “실시간 데이터 스트리밍”까지의 흐름을 다룹니다. 스캔 이터레이터는 브로드캐스터/옵저버 패턴에 대응하고, connect 는 GAP 연결을 열고, service / characteristic 는 GATT 트리를 탐색하고, subscribe 는 CCCD를 쓰며, notified 는 푸시를 기다립니다.
11.10.5. 연결 끊김과 재연결¶
라디오 링크에 일어나는 일은 모두 그것을 기다리던 코루틴에 나타납니다. aioble.DeviceDisconnectedError 는 피어가 사라졌거나 감독 타임아웃이 발생했다는 신호입니다. 이 예외는 진행 중이던 read(), write(), 또는 notified() 호출을 종료시키며, 모든 async with connection 블록이 깔끔하게 종료됩니다.
손실 시 재연결해야 하는 센트럴은 작업을 자체 외부 루프로 감쌉니다:
async def keep_streaming():
while True:
try:
await stream_heart_rate()
except aioble.DeviceDisconnectedError:
print("disconnected, retrying...")
await asyncio.sleep(2)
11.10.5.1. timeout()으로 시퀀스 묶기¶
여러 GATT 연산이 연이어 모두 하나의 예산 안에서 완료되어야 할 때(각각이 개별 timeout_ms 로 따로가 아니라), aioble.DeviceConnection.timeout() 을 사용해 그것들을 감싸세요. 반환된 컨텍스트 관리자는 예산이 경과하면(asyncio.TimeoutError 를 일으키며) 또는 피어가 연결을 끊으면(aioble.DeviceDisconnectedError 를 일으키며) 본문을 취소합니다:
async with await device.connect() as connection:
try:
with connection.timeout(2000): # 2 s for the whole block
service = await connection.service(HR_SERVICE)
char = await service.characteristic(HR_MEASUREMENT)
await char.subscribe(notify=True)
except asyncio.TimeoutError:
print("discovery + subscribe took too long")
이것은 각 호출을 개별적으로 asyncio.wait_for() 로 감싸는 것보다 깔끔한 대안이며, 각 호출은 자신의 마감을 지켰지만 시퀀스 전체로는 초과 실행되는 가짜 성공을 방지합니다. timeout() 에 timeout_ms=None 을 전달하면 마감이 비활성화되고 연결 끊김 가드만 활성 상태로 남습니다.