10.4. 여러 시청자가 하나의 캡처 루프 공유하기

연결된 각 클라이언트가 독립적으로 csi0.snapshot() 을 호출하는 것은 낭비이며, 두 개의 스트림이 동시에 열리면 더 나빠집니다. 센서는 자신의 속도로 프레임을 전달하는데, 중복된 모든 캡처가 모두를 느리게 만듭니다. 올바른 접근법은 “가장 최신 프레임”을 공유 슬롯에 게시하는 하나의 캡처 코루틴과, 그 슬롯에서 읽어 가는 클라이언트별 반복자입니다.

하나의 캡처 태스크가 JPEG 바이트를 단일 latest_jpeg 슬롯에 쓰고, 세 개의 스트림 클라이언트 반복자가 그 슬롯에서 읽으며 각각 공유 new_frame 이벤트를 기다립니다.

10.4.1. 캡처 태스크

백그라운드 코루틴이 센서가 전달하는 만큼 빠르게 프레임을 가져와, 각각을 공유 bytes 로 JPEG 압축하고, 대기 중인 클라이언트가 깨어나도록 이벤트를 펄스로 발생시킵니다:

latest_jpeg = None
new_frame = asyncio.Event()

async def capture_loop():
    global latest_jpeg
    while True:
        img = await csi0.snapshot()
        latest_jpeg = bytes(img.compress(quality=85).bytearray())
        new_frame.set()
        new_frame.clear()

set() / clear() 쌍이 펄스 패턴입니다. set() 은 현재 이벤트를 기다리는 모든 코루틴을 한 번에 풀어줍니다. clear() 는 즉시 이벤트를 재설정하여 다음 wait() 가 다시 블록되도록 합니다. 여러 소비자(시청자, 또 다른 시청자, 새 프레임에 반응해야 하는 그 밖의 코루틴)가 있을 때, 어떤 단일 소비자도 이벤트를 재설정할 책임을 지지 않으며, 아무도 다른 누군가의 깨어남을 가로채지 않습니다.

참고

여기서 JPEG를 둘러싼 bytes(...) 래핑은 핵심적입니다. bytearray() 는 카메라의 이미지 버퍼에 대한 뷰를 반환하며, 바로 다음 snapshot() 호출이 그 버퍼를 다음 프레임으로 제자리에서 다시 씁니다. latest_jpeg 는 지역 변수 img 보다 오래 살아남으므로, 복사가 없으면 모든 리더가 캡처할 때마다 슬롯이 발밑에서 바뀌는 것을 보게 됩니다.

10.4.2. 클라이언트별 반복자가 슬롯에서 읽기

MJPEG 스트림 핸들러는 더 이상 직접 csi0.snapshot() 을 호출하지 않습니다. 대신 각 FrameStream 인스턴스가 공유 이벤트를 기다리고 공유 bytes에서 읽습니다:

class FrameStream:
    # One instance per connected client. Each one independently
    # waits on the shared new_frame pulse; the capture loop is
    # responsible for resetting the event between frames.

    def __aiter__(self):
        return self

    async def __anext__(self):
        await new_frame.wait()
        if latest_jpeg is None:
            return b''
        return (b'--' + BOUNDARY + b'\r\n'
                b'Content-Type: image/jpeg\r\n\r\n'
                + latest_jpeg + b'\r\n')

스냅샷 라우트도 바뀝니다. 더 이상 캡처를 발생시키지 않고, latest_jpeg 가 현재 담고 있는 것을 반환합니다:

@app.get('/snapshot.jpg')
async def snapshot(request):
    if latest_jpeg is None:
        return 'no frame yet', 503
    return Response(
        body=latest_jpeg,
        headers={'Content-Type': 'image/jpeg'},
    )

(body, status) 튜플은 microdot.Response 를 구성하지 않고 HTTP 상태 코드를 설정하는 microdot의 축약 표기입니다. 503은 나는 여기 있지만 아직 준비되지 않았다 는 뜻으로, “잠시 후 다시 요청하라”에 알맞은 코드입니다.

10.4.3. 서버와 함께 캡처 실행하기

이제 main 에는 두 개의 최상위 코루틴이 있습니다. 캡처 루프와 HTTP 서버입니다. asyncio.gather() 가 둘 다 실행하며, 둘 중 하나가 충돌하면 다른 하나가 취소됩니다:

async def main():
    await asyncio.gather(
        capture_loop(),
        app.start_server(host='0.0.0.0', port=80),
    )

asyncio.run(main())

이제 센서는 연결된 시청자가 몇 명이든 상관없이 한 사이클에 한 프레임만 읽습니다. /stream.jpg 에 접속한 첫 번째 브라우저는 프레임을 봅니다. 두 번째, 세 번째, 열 번째도 마찬가지입니다. 이들은 모두 같은 캡처를 공유하며, 카메라는 다른 라우트에서도 그대로 응답성을 유지합니다.