3.20. シリアルプロトコル、フレーミング、CRC

コードで扱う UART では2つの端点間でバイトをやり取りしました。しかしそれだけでは、信頼性の高いリンクを構築するには不十分です。実際のデバイスがワイヤの反対側にある瞬間に、3つの問題が浮上します。

  • メッセージはどこで始まり、どこで終わるのか? バイトは組み込みの区切り文字を持たないストリームとして到着します。受信側が最初のバイトを取りこぼすと(送信側より後に電源が入った、ライン上に一瞬の電気的グリッチが発生したなど)、それ以降のすべてのバイトは、受信側が新しい再同期点を見つけるまで1バイトずれたままになります。

  • 各メッセージの長さはどれくらいか? 32バイトのセンサー読み取り値と4バイトのステータス応答は、バイトレベルでは見分けがつきません。受信側は、何バイトが現在のメッセージに属しているかを知る手段を必要とします。

  • バイトは無傷で到着したか? ノイズが個々のビットを反転させることがあります。チェックがなければ、受信側は破損したデータをそのまま使ってしまいます。

この3つすべてに対する標準的な答えは、データをパケットフレームで包むことです。先頭に既知のバイト列、長さフィールド、ペイロード本体、そして末尾にチェックサムを配置します。

3.20.1. パケットフレーミング

典型的なフレーミング形式は次のとおりです。

順番に並んだ6つのフィールド。0xAA 0x55 とラベル付けされた 2バイトの HEADER、このパケットがどのコマンドを 運ぶかを選択する1バイトの CMD フィールド、ペイロード サイズを示す2バイトの LEN フィールド、HEADER と CMD と LEN をカバーする1バイトの HCRC フィールド、CMD によって 形式が決まる LEN バイトの可変長 PAYLOAD、そして ペイロードをカバーする4バイトの DCRC フィールド。

ヘッダーとデータで別々のCRCを持つフレーム化パケット。ヘッダー(マジックバイト)、コマンド、長さ、ヘッダーCRC、ペイロード、データCRC。

各フィールドはそれぞれ1つの役割を果たします。

  • ヘッダー(マジックバイト)。 受信側が入力ストリーム内で探す、固定された珍しいバイト列です。多くの場合 0xAA 0x55 のような2バイトです。受信側はこの列を見つけると、新しいパケットが始まることを認識し、その前にあったゴミデータをすべて破棄できます。

  • コマンド。 パケットが であるかを示す1バイトです。コマンド値が異なれば、使用するペイロード形式も異なります。あるコマンドは2バイトのペイロードを伴う「サーボ角度を設定」を意味し、別のコマンドはペイロードなしの「センサーを読み取る」、また別のコマンドは文字列を伴う「メッセージをログに記録」を意味するかもしれません。受信側はコマンドバイトに基づいて振り分け、パケットの残りをどう解釈するかを判断します。

  • 長さ。 ペイロードのサイズをバイト単位で示す2バイトです(ここではリトルエンディアン)。これにより約64 KiBまでのペイロードが可能になります。受信側は、ヘッダーCRCが検証された後、ちょうどこのバイト数を読み取ります。

  • ヘッダーCRC。 HEADER、CMD、LEN の各フィールドに対する1バイトのチェックサムです。受信側はペイロードを読み取る前にこれをチェックするため、破損した LEN はわずか数バイトで検出されます(なぜこれが重要なのかは、後述のCRCセクションを参照してください)。

  • ペイロード。 コマンド固有のアプリケーションデータで、ちょうど LEN バイトの長さです。形式はコマンドバイトによって決まります。struct でパックされた固定幅フィールドのレコード、文字列、生のメモリなど、そのコマンドについて両者が合意したものなら何でもかまいません。

  • データCRC。 ペイロードバイトに対する4バイトのCRCです。受信側は読み取ったばかりのバイトから再計算し、一致しなければパケットを破棄します。

3.20.2. CRC

最も単純な「チェックサム」は、全バイトの合計を256または65536で割った余りです。ほとんどの単一ビット反転は検出できますが、多ビットエラーの多くを見逃し、バイトの並び順を無視します。

巡回冗長検査(CRC)は、その標準的なアップグレードです。入力を1つの長い2進数として扱い、それを固定の 多項式 で(特別な方法で)割ります。その割り算の余りがCRCです。多項式が異なれば検出できるエラーの種類も異なります。一般的な8ビット、16ビット、32ビットの多項式は、それぞれ自身のビット幅より短いすべてのバーストエラーを検出し、さらに長いバーストの大部分も検出します。

3.20.2.1. なぜ2つのCRCなのか

上のパケット図には 2つ の別々のCRCが含まれています。1つはヘッダー(HEADER、CMD、LEN)に対するもので、もう1つはペイロードに対するものです。これは堅牢なフレーミングが実際に必要とするものです。なぜなら、末尾の単一CRCは、LEN フィールド自体が転送中に破損したときに次のように破綻するからです。

  • 受信側は破損した LEN に基づいて動作し、そのバイト数をワイヤから読み取ります。送信側が意図したよりもはるかに多いかもしれません。

  • 末尾のCRCだけが最終的に何かおかしいことを受信側に伝えますが、それはこれらすべてのバイトを消費し終えた後のことです。

  • パーサーが誤った数のバイトを待ち続けて立ち往生している間、破損したパケットの後ろに到着する本物のパケットがペイロードとして飲み込まれてしまい、受信側は1つではなく複数のパケットを失います。

CRCを分割することでこれが解決します。

  • ヘッダーCRC は HEADER、CMD、LEN をカバーします。受信側はペイロードを読み取る前にこれをチェックするため、破損した LEN は数バイトで検出され、パーサーは即座に再同期し、不良なパケットを1つだけ捨てて済みます。

  • データCRC はペイロードをカバーします。ヘッダーCRCが通過すれば、受信側は LEN を信頼できると判断し、ちょうどそのバイト数のペイロードを読み取り、データCRCと照合して検証します。

一般的なサイズ設定(そしてこのページで使用しているもの)は、ヘッダーCRCに1バイト(5バイトのヘッダーにはCRC-8で十分)、データCRCに4バイト(CRC-32は数キロバイトのペイロードを、衝突率がほぼゼロでカバーします)です。

3.20.2.2. ヘルパー

MicroPython は4バイトのCRC用に binascii.crc32() を直接提供しています。1バイトのヘッダーCRCには、Maxim の1-wireデバイスが使用する多項式(反射形式で 0x8C)を用いた小さなヘルパーが、インラインで書けるほど短く済みます。

def crc8(data: bytes) -> int:
    crc = 0
    for byte in data:
        crc ^= byte
        for _ in range(8):
            crc = (crc >> 1) ^ 0x8C if crc & 1 else crc >> 1
    return crc & 0xFF

完全なエンコーダーは、2つのCRCを1つの関数にまとめます。

import binascii
import struct

def encode_packet(cmd: int, payload: bytes) -> bytes:
    header = b"\xAA\x55" + bytes([cmd]) + struct.pack("<H", len(payload))
    hcrc   = crc8(header)
    dcrc   = binascii.crc32(payload)
    return header + bytes([hcrc]) + payload + struct.pack("<I", dcrc)

逆関数は、完全なパケットからコマンドとペイロードを復元するか、いずれかのCRCチェックが失敗した場合は None を返します。

def decode_packet(packet: bytes):
    # Layout: HEADER(2) + CMD(1) + LEN(2) + HCRC(1) + PAYLOAD(LEN) + DCRC(4)
    if len(packet) < 10 or packet[0:2] != b"\xAA\x55":
        return None

    header = packet[0:5]
    if crc8(header) != packet[5]:
        return None                       # header CRC mismatch

    cmd    = packet[2]
    length = struct.unpack("<H", packet[3:5])[0]
    if len(packet) != 6 + length + 4:
        return None                       # truncated or oversized

    payload       = packet[6:6 + length]
    received_dcrc = struct.unpack("<I", packet[6 + length:])[0]
    if binascii.crc32(payload) != received_dcrc:
        return None                       # data CRC mismatch

    return cmd, bytes(payload)

実際には、受信側はパケット全体を一度に渡されるわけではありません。バイトはUART経由で1つずつ到着し、パケットの途中で一時停止する送信側(あるいはバイトを失うノイズの多いライン)に対しては、適切なサイズのバッファに単純に read() するわけにはいきません。次のセクションでは、同じデコードロジックをステートマシンとして1バイトずつ実行します。

3.20.3. ステートマシン型の受信側

受信側は、ある固定の N に対して単純に uart.read(N) を呼ぶことはできません。次のパケットが何バイトになるか分からず、ライン上のあらゆるゴミがアライメントを狂わせるからです。解決策は、バイトを1つずつ消費し、パケット内のどの位置にいるかに基づいて反応する小さなステートマシンです。メインループは any() をポーリングして何バイトがバッファされているかを確認し、それらを1回の read() 呼び出しで吸い出し、各バイトをステートマシンに送り込みます。

import time
import binascii
import struct
from machine import UART

HEADER = b"\xAA\x55"
HEADER_LEN = len(HEADER)

# States, in the order the receiver walks through them per packet.
HUNT_FOR_HEADER = 0
READ_COMMAND    = 1
READ_LENGTH     = 2
READ_HEADER_CRC = 3
READ_PAYLOAD    = 4
READ_DATA_CRC   = 5

uart = UART(3, baudrate=115200)

# Receiver state plus partial-field buffers.
state        = HUNT_FOR_HEADER
matched      = 0              # bytes of HEADER matched so far
cmd          = 0              # CMD captured in READ_COMMAND
length_bytes = bytearray()    # raw LEN bytes (kept for the header CRC)
length       = 0              # unpacked LEN
payload      = bytearray()    # payload bytes accumulated in READ_PAYLOAD
crc_bytes    = bytearray()    # DCRC bytes accumulated in READ_DATA_CRC

def handle_packet(cmd: int, payload: bytes) -> None:
    print("cmd", cmd, "payload", payload)

while True:
    # Drain whatever bytes have arrived since the last poll. Idle
    # briefly when the line is quiet so the loop is not a busy spin.
    n = uart.any()
    if not n:
        time.sleep_ms(1)
        continue

    for b in uart.read(n):
        if state == HUNT_FOR_HEADER:
            # Walk the magic bytes. On a mismatch, back off by one
            # so a stray HEADER[0] in the noise still counts as a
            # possible start.
            if b == HEADER[matched]:
                matched += 1
                if matched == HEADER_LEN:
                    state = READ_COMMAND
                    matched = 0
            else:
                matched = 1 if b == HEADER[0] else 0

        elif state == READ_COMMAND:
            cmd = b
            length_bytes = bytearray()
            state = READ_LENGTH

        elif state == READ_LENGTH:
            # LEN is two bytes little-endian.
            length_bytes.append(b)
            if len(length_bytes) == 2:
                length = struct.unpack("<H", length_bytes)[0]
                state = READ_HEADER_CRC

        elif state == READ_HEADER_CRC:
            # Verify the CRC over HEADER + CMD + LEN before
            # committing to read LEN payload bytes. A mismatch
            # aborts here, after just five header bytes -- the
            # next valid header re-syncs quickly.
            expected = crc8(HEADER + bytes([cmd]) + length_bytes)
            if b == expected:
                payload = bytearray()
                state = READ_PAYLOAD
            else:
                state = HUNT_FOR_HEADER

        elif state == READ_PAYLOAD:
            payload.append(b)
            if len(payload) == length:
                crc_bytes = bytearray()
                state = READ_DATA_CRC

        elif state == READ_DATA_CRC:
            # Verify the CRC over the payload and either deliver
            # the packet or drop it. Either way, go back to
            # looking for the next header.
            crc_bytes.append(b)
            if len(crc_bytes) == 4:
                expected = binascii.crc32(payload)
                received = struct.unpack("<I", crc_bytes)[0]
                if expected == received:
                    handle_packet(cmd, bytes(payload))
                state = HUNT_FOR_HEADER

各バイトはステートマシンを1ステップ進めるか、完全なパケットの後、不正なヘッダーCRC、または不正なデータCRCの後に HUNT_FOR_HEADER に戻ります。ヘッダーに一致しないライン上のゴミは黙って破棄され、次の有効なヘッダーが受信側を再同期させます。重要な安全特性はヘッダーCRCから得られます。LEN フィールドが破損した場合、パーサーは、とんでもなく誤った数のペイロードバイトを読み取ることにコミットした後ではなく、ヘッダーCRCチェックの後(数バイト)でそれを検出します。

3.20.4. 基本を超えて

上記のフレーミングは、シリアルリンクがライン上のノイズから回復するために必要な 最小限 のものです。ヘッダーマジック、長さ、コマンド、そして2つのCRCです。破損を検出し、文字化けしたバイトの後で再同期しますが、損傷したパケットを通過させるのではなく諦めてしまい、また送信側には受信側が実際に何を受け取ったのかが分からないままです。

現実世界のシリアルプロトコルは、その基本の上に機能を積み重ねます。すべての組み込みリンクがそれらすべてを必要とするわけではありません。アプリケーションが実際に必要とするものを選んでください。

  • シーケンス番号。 送信のたびにインクリメントされる小さなカウンターです。受信側は、ギャップ(パケットが失われた)、重複(送信側が再送したが、受信側はすでに最初のコピーを受け入れていた)、そして、チャネルが順序を入れ替えうる場合には、順序の入れ替わった到着を検出します。

  • 確認応答(ACK)。 受信側が各パケットを確認するために送り返す専用のACKパケット(または応答にピギーバックさせたビット)です。ACKがなければ、送信側はデータが到着したかどうかを知る術がありません。

  • 否定確認応答(NACK)。 受信側がCRC失敗やシーケンスのギャップを検出したときに送られるNACKです。送信側は、ACKタイムアウトが発動するのを待つのではなく、即座に再送します。

  • 再送。 送信側は、確認応答されていない各パケットを小さなキューに保持し、タイムアウト後(またはNACK時)に再送します。リトライ回数の上限と、リトライ間のバックオフがあれば、恒久的に壊れたリンクが永遠にループするのを防げます。

  • スライディングウィンドウ。 ACKを要求する前に複数のパケットを送信中の状態にしておくことで、1パケットあたりの送信時間に比べて往復時間が長いリンクでもスループットを維持できます。コストは送信側の状態が増えることです。送信中のパケットごとに1スロットが必要になります。

  • フロー制御。 バッファがいっぱいになりつつあるときに、減速または一時停止するよう受信側が送信側に伝える信号です。実装はさまざまです。明示的な XON / XOFF バイト、受信側が一度にN個追加のパケットを許可するクレジットベースの付与、あるいはワイヤ上の RTS / CTS ハードウェアラインそのものなどがあります。フロー制御がなければ、速い送信側はいずれ遅い受信側を追い越し、パケットが破棄されます。

  • プロトコルバージョン。 パケットの先頭近くにあるバージョンフィールドにより、形式を進化させることができます。各側は起動時に双方がサポートする最も高いバージョンをネゴシエートしたり、互換性のないピアからのパケットを拒否したりできます。

  • フラグメンテーションと再構成。 2バイトの LEN はパケットを64 KiBに制限します。それより大きいメッセージは複数のパケットに分割され、反対側で再構成されます。フラグメンテーションのメタデータ(フラグメントインデックス、総数、または「さらにフラグメントあり」フラグ)はペイロード内に格納されます。

  • ハートビート。 「まだここにいます」と伝える小さな周期的なパケットです。反対側は、ハートビートが止まったことに気づくと、黙ってハングするのではなく再接続します(あるいは目立つ形で失敗します)。

  • チャネル。 ヘッダー内のチャネルまたはストリームIDにより、1本の物理リンクが複数の論理ストリームを運びます。制御チャネル、テレメトリチャネル、ログチャネルなどが、そのフィールドだけで区別されます。

  • 認証。 正規の送信側と受信側だけが知っているペイロードと秘密値から計算された短いタグです。受信側は受信したバイトから再びタグを計算し、両者が一致しなければパケットを拒否します。これは改ざん(攻撃者がバイトを変更した)と、タグがカバーする対象にシーケンス番号やタイムスタンプが含まれている場合はリプレイ(攻撃者がワイヤ上から本物のパケットを記録し、受信側に二度動作させるために後で再送する)の両方を検出します。

  • 暗号化。 共有された秘密鍵でペイロードバイトをスクランブルし、その鍵を持たずにラインを読む者にはノイズしか見えないようにします。通常は上記の認証タグと組み合わせます。認証タグがなければ、攻撃者はたまたまCRCを通過するゴミを送り込むことができ、受信側は無意味なものを復号しようとしてサイクルを浪費します。

産業機器向けの典型的な「良い」プロトコルは、最終的にフレーミング、デュアルCRC、シーケンス番号、再送付きのACK / NACK、そしてハートビートを備えることになります。一見の価値がある現実世界の例として、MAVLink(ドローンのテレメトリ。シーケンス番号、システム / コンポーネントID、オプションのパケット署名を備える)、Modbus(産業用PLC。ファンクションコードとCRCを備える)、NMEA 0183(あらゆる民生用GPS受信機が話すASCIIプロトコル。星形の区切り文字の後にチェックサムを置く行ベースのメッセージ)があります。