10.3. Streaming langsung -- satu penonton

Browser dapat merender aliran Motion JPEG (MJPEG) multi-bagian langsung di dalam tag <img>. Berikan browser satu respons HTTP yang tidak pernah selesai, tulis JPEG yang dipisahkan oleh batas multipart, dan browser akan menampilkan setiap bingkai saat tiba.

The browser sends GET /stream.jpg; the cam responds with Content-Type multipart/x-mixed-replace and writes one JPEG-bodied part per frame until the browser disconnects.

Koneksinya sederhana: satu header respons, Content-Type: multipart/x-mixed-replace; boundary=frame, kemudian baris --frame, Content-Type: image/jpeg, baris kosong, byte JPEG, \r\n, dan ulangi. Browser menutup koneksi saat <img> dihapus atau tab ditutup.

10.3.1. Mengambil gambar tanpa memblokir

csi0.snapshot() yang memblokir yang digunakan sejauh ini menghentikan seluruh event loop hingga sensor mengirimkan bingkai. Itu tidak masalah saat satu permintaan memicu satu snapshot dan tidak ada yang berjalan. Setelah aliran terbuka, server harus terus menangani permintaan lain saat bingkai berikutnya sedang diambil -- panggilan pengambilan gambar perlu menyerahkan ke event loop saat menunggu sensor.

Polanya adalah wrapper tipis AsyncCSI yang melakukan polling csi.CSI.snapshot() dalam mode non-blokir dan menidurkan coroutine di antara polling. Bab asyncio telah membahas pola ini di AsyncCSI; tambahkan langsung ke skrip untuk saat ini:

import asyncio

class AsyncCSI:
    def __init__(self, *args, **kwargs):
        self._csi = csi.CSI(*args, **kwargs)

    def __getattr__(self, name):
        return getattr(self._csi, name)

    async def snapshot(self):
        while True:
            img = self._csi.snapshot(blocking=False)
            if img is not None:
                return img
            await asyncio.sleep_ms(0)

Semua metode CSI lainnya (reset(), pixformat(), framesize(), gain_db(), ...) diteruskan melalui __getattr__; hanya snapshot() yang digantikan dengan versi yang dapat ditunggu sehingga event loop dapat menjadwalkan coroutine lain di antara polling.

Ganti csi.CSI() biasa dari rute snapshot dengan AsyncCSI():

csi0 = AsyncCSI()
csi0.reset()
csi0.pixformat(csi.RGB565)
csi0.framesize(csi.QVGA)

10.3.2. Isi respons streaming adalah iterator berbasis kelas

Isi respons streaming hanyalah objek yang diiterasi microdot dengan async for, mengirimkan setiap chunk yang dihasilkan ke soket. Di CPython ini biasanya adalah fungsi generator async -- async def dengan yield. MicroPython tidak mendukung itu:

Catatan

MicroPython's asyncio does not support async-generator functions (async def name(): ... yield ...). Streaming response bodies must be class-based async iterators with __aiter__ returning self and __anext__ defined as async def.

Untuk aliran MJPEG ini berarti kelas yang __anext__-nya menunggu satu bingkai dan mengembalikannya dalam wrapper multipart:

BOUNDARY = b'frame'

class FrameStream:
    def __aiter__(self):
        return self

    async def __anext__(self):
        img = await csi0.snapshot()
        jpeg = bytes(img.compress(quality=85).bytearray())
        return (b'--' + BOUNDARY + b'\r\n'
                b'Content-Type: image/jpeg\r\n\r\n'
                + jpeg + b'\r\n')

@app.get('/stream.jpg')
async def stream(request):
    return Response(
        body=FrameStream(),
        headers={
            'Content-Type':
                b'multipart/x-mixed-replace; boundary=' + BOUNDARY,
        },
    )

Instans dibuat baru per permintaan, sehingga setiap klien yang terhubung mendapatkan iterator-nya sendiri. Saat browser memutus koneksi, microdot berhenti menunggu __anext__ dan iterator dibebaskan oleh garbage collector.

Catatan

Pembungkus bytes(...) di sekitar JPEG bersifat defensif. bytearray() mengembalikan tampilan ke buffer citra kamera, dan panggilan snapshot() berikutnya menulis ulang buffer tersebut di tempat. Pembungkusan dalam bytes menyalin JPEG keluar agar chunk yang sedang ditulis microdot tetap stabil meskipun flush penulis belum selesai pada saat __anext__ berjalan kembali.

10.3.3. Menjalankan server di dalam asyncio

Panggilan app.run(host=..., port=...) sebelumnya bersifat memblokir. Handler MJPEG perlu berbagi loop dengan polling snapshot AsyncCSI, jadi ganti app.run dengan start_server() di dalam asyncio.run():

async def main():
    await app.start_server(host='0.0.0.0', port=80)

asyncio.run(main())

Wrapper asyncio.run() memungkinkan server menjadi salah satu task di antara beberapa -- coroutine main kemudian menjadi tempat alami untuk menjalankan pengambilan gambar, deteksi gerakan, dan hal lain yang harus berbagi loop dengan server HTTP.

10.3.4. Satu penonton dalam satu waktu

Setiap klien yang terhubung menjalankan iterator FrameStream-nya sendiri, yang berarti setiap klien memicu panggilan csi0.snapshot()-nya sendiri. Dua browser berarti dua pembacaan sensor per interval bingkai, tiga berarti tiga, dan seterusnya. Sensor sebenarnya tidak dapat mengirimkan bingkai lebih cepat dari laju bingkainya sendiri, sehingga permintaan mengantri satu sama lain dan aliran semua orang melambat.

Perbaikannya adalah satu loop pengambilan gambar bersama yang mempublikasikan satu bingkai ke banyak pembaca.