10.3. Truyền trực tiếp -- một người xem

Trình duyệt có thể hiển thị luồng Motion JPEG (MJPEG) nhiều phần trực tiếp bên trong thẻ <img>. Gửi cho trình duyệt một response HTTP không bao giờ kết thúc, ghi các JPEG được phân tách bởi một multipart boundary, và trình duyệt hiển thị từng khung hình khi nó đến.

The browser sends GET /stream.jpg; the cam responds with Content-Type multipart/x-mixed-replace and writes one JPEG-bodied part per frame until the browser disconnects.

Giao thức rất đơn giản: một response header, Content-Type: multipart/x-mixed-replace; boundary=frame, sau đó một dòng --frame, Content-Type: image/jpeg, một dòng trắng, các byte JPEG, \r\n, rồi lặp lại. Trình duyệt đóng kết nối khi thẻ <img> bị xóa hoặc tab bị đóng.

10.3.1. Chụp ảnh mà không bị blocking

Lệnh csi0.snapshot() blocking đã dùng từ trước sẽ làm tắc nghẽn toàn bộ event loop cho đến khi cảm biến cung cấp một khung hình. Điều đó ổn khi một yêu cầu kích hoạt một ảnh chụp và không có gì khác đang chạy. Khi đã mở một luồng, server phải tiếp tục xử lý các yêu cầu khác trong khi khung hình tiếp theo đang được chụp -- lệnh gọi chụp cần nhường quyền cho event loop trong khi chờ cảm biến.

Pattern là một wrapper AsyncCSI mỏng polling csi.CSI.snapshot() ở chế độ non-blocking và cho coroutine ngủ giữa các lần poll. Chương asyncio đã trình bày pattern này trong AsyncCSI; nhúng trực tiếp vào tập lệnh cho bây giờ:

import asyncio

class AsyncCSI:
    def __init__(self, *args, **kwargs):
        self._csi = csi.CSI(*args, **kwargs)

    def __getattr__(self, name):
        return getattr(self._csi, name)

    async def snapshot(self):
        while True:
            img = self._csi.snapshot(blocking=False)
            if img is not None:
                return img
            await asyncio.sleep_ms(0)

Mọi method CSI khác (reset(), pixformat(), framesize(), gain_db(), ...) được chuyển tiếp qua __getattr__; chỉ snapshot() được thay thế bằng phiên bản awaitable cho phép event loop lên lịch các coroutine khác giữa các lần poll.

Thay csi.CSI() thuần từ route snapshot bằng AsyncCSI():

csi0 = AsyncCSI()
csi0.reset()
csi0.pixformat(csi.RGB565)
csi0.framesize(csi.QVGA)

10.3.2. Body của response phát trực tiếp là iterator dựa trên class

Body của response phát trực tiếp chỉ là một object mà microdot lặp qua với async for, gửi từng chunk được yield xuống socket. Trên CPython, đây thường là async generator function -- async def với yield. MicroPython không hỗ trợ điều đó:

Ghi chú

asyncio của MicroPython không hỗ trợ async-generator function (async def name(): ... yield ...). Body của response phát trực tiếp phải là async iterator dựa trên class với __aiter__ trả về self__anext__ được định nghĩa là async def.

Đối với luồng MJPEG, điều đó có nghĩa là một class mà __anext__ của nó await một khung hình và trả về nó được đóng gói trong wrapper multipart:

BOUNDARY = b'frame'

class FrameStream:
    def __aiter__(self):
        return self

    async def __anext__(self):
        img = await csi0.snapshot()
        jpeg = bytes(img.compress(quality=85).bytearray())
        return (b'--' + BOUNDARY + b'\r\n'
                b'Content-Type: image/jpeg\r\n\r\n'
                + jpeg + b'\r\n')

@app.get('/stream.jpg')
async def stream(request):
    return Response(
        body=FrameStream(),
        headers={
            'Content-Type':
                b'multipart/x-mixed-replace; boundary=' + BOUNDARY,
        },
    )

Instance được tạo mới cho mỗi yêu cầu, vì vậy mỗi client kết nối có iterator riêng của mình. Khi trình duyệt ngắt kết nối, microdot ngừng await __anext__ và iterator được thu gom rác.

Ghi chú

Việc bọc bytes(...) quanh JPEG là để phòng ngừa. bytearray() trả về một view vào bộ đệm ảnh của camera, và lần gọi snapshot() tiếp theo sẽ ghi đè bộ đệm đó tại chỗ. Bọc trong bytes sao chép JPEG ra ngoài để chunk mà microdot đang ghi vẫn ổn định kể cả khi quá trình flush của writer chưa hoàn thành trước khi __anext__ chạy lại.

10.3.3. Chạy server bên trong asyncio

Lệnh gọi app.run(host=..., port=...) trước đây là blocking. Handler MJPEG cần chia sẻ vòng lặp với các lần poll snapshot của AsyncCSI, vì vậy hãy thay app.run bằng start_server() bên trong một asyncio.run():

async def main():
    await app.start_server(host='0.0.0.0', port=80)

asyncio.run(main())

Wrapper asyncio.run() cho phép server là một task trong số nhiều task -- coroutine main khi đó là nơi tự nhiên để khởi tạo capture, phát hiện chuyển động, và bất cứ thứ gì khác cần chia sẻ vòng lặp với HTTP server.

10.3.4. Một người xem tại một thời điểm

Mỗi client kết nối chạy iterator FrameStream riêng của mình, nghĩa là mỗi client kích hoạt lệnh gọi csi0.snapshot() riêng. Hai trình duyệt tức là hai lần đọc cảm biến mỗi khoảng khung hình, ba trình duyệt tức là ba lần, và cứ thế. Cảm biến thực sự không thể cung cấp khung hình nhanh hơn tốc độ của chính nó, vì vậy các yêu cầu xếp hàng sau nhau và luồng của mọi người đều chậm lại.

Giải pháp là một vòng lặp capture dùng chung duy nhất xuất bản một khung hình cho nhiều người đọc.