3.20. 序列協定、訊框與 CRC

程式碼中的 UART 已能在兩端之間搬移位元組。但光是這樣還不足以建立一條可靠的連結。當電線另一端真的接上一個裝置時,立刻會出現三個問題:

  • 一則訊息從哪裡開始、到哪裡結束? 位元組是以串流方式抵達,本身並沒有內建的分隔符。如果接收端漏掉了第一個位元組(在傳送端啟動之後才上電;或是線路上出現短暫的電氣干擾),那麼其後的每個位元組都會錯位一格,直到接收端找到新的重新同步點為止。

  • 每則訊息有多長? 一筆 32 位元組的感測器讀數和一則 4 位元組的狀態回覆,在位元組層級上看起來一模一樣。接收端需要某種方式來得知有多少位元組屬於目前這則訊息。

  • 位元組是否完整無誤地抵達? 雜訊可能翻轉個別的位元。少了檢查機制,接收端就會欣然地對損毀的資料採取動作。

針對這三個問題的標準解答,就是把資料包裝進一個封包訊框:開頭放一段已知的位元組序列、一個長度欄位、酬載本身,以及結尾的一個檢查碼。

3.20.1. 封包訊框

一種典型的訊框格式:

依序繪出六個欄位:一個兩位元組的 HEADER 標示為 0xAA 0x55、一個一位元組的 CMD 欄位用以選擇 本封包所攜帶的命令、一個兩位元組的 LEN 欄位給出酬載大小、一個一位元組的 HCRC 欄位 涵蓋 HEADER 加 CMD 加 LEN、一個長度可變的 PAYLOAD 共 LEN 個位元組其格式取決於 CMD, 以及一個四位元組的 DCRC 欄位涵蓋酬載。

一個具有獨立標頭與資料 CRC 的訊框封包:標頭(魔術位元組)、命令、長度、標頭 CRC、酬載、資料 CRC。

每個欄位各司其職:

  • 標頭(魔術位元組)。 一段固定、不尋常的位元組序列——通常是兩個位元組,例如 0xAA 0x55——接收端會在傳入的串流中掃描它。當找到這段序列時,它便知道一個新封包正要開始,並且可以丟棄先前出現的任何垃圾資料。

  • 命令。 一個位元組,說明這個封包是什麼。不同的命令值會使用不同的酬載格式——某個命令可能代表「設定伺服角度」並帶兩個酬載位元組,另一個可能代表「讀取感測器」而沒有酬載,又一個可能是「記錄訊息」並帶一個字串。接收端會依命令位元組進行分派,以得知該如何解讀封包的其餘部分。

  • 長度。 兩個位元組,給出酬載的大小(單位為位元組,此處為小端序),允許酬載大小最多約 64 KiB。一旦標頭 CRC 通過驗證,接收端便會精確地讀取這麼多個位元組。

  • 標頭 CRC。 一個一位元組的檢查碼,涵蓋 HEADER、CMD 與 LEN 欄位。接收端會在讀取任何酬載之前先檢查它,因此一個損毀的 LEN 只要經過寥寥數個位元組就會被抓出來(為何這很重要,請見下方的 CRC 段落)。

  • 酬載。 命令專屬的應用資料,長度恰為 LEN 個位元組。其格式由命令位元組決定:可以是一筆以 struct 打包、由固定寬度欄位組成的記錄、一個字串、原始記憶體——只要雙方對該命令有所共識即可。

  • 資料 CRC。 一個四位元組的 CRC,涵蓋酬載位元組。接收端會根據它剛讀入的位元組重新計算一次,若不相符便丟棄該封包。

3.20.2. CRC

最簡單的「檢查碼」就是所有位元組的總和,再對 256 或 65536 取模。它能抓出大多數的單一位元翻轉,但會漏掉許多多位元錯誤,而且忽略位元組的順序。

循環冗餘檢查(CRC)是標準的進階做法。它把輸入視為一個很長的二進位數字,並(以一種特殊方式)將它除以一個固定的多項式;相除的餘數就是 CRC。不同的多項式能抓出不同類別的錯誤;常見的 8、16 與 32 位元多項式,各自都能抓出所有短於其位寬的錯誤叢發,外加很大比例的較長叢發。

3.20.2.1. 為何要兩個 CRC

上方的封包圖攜帶了兩個獨立的 CRC——一個涵蓋標頭(HEADER、CMD、LEN),另一個涵蓋酬載。這正是一個健全的訊框真正需要的,原因在於當 LEN 欄位本身在傳輸途中損毀時,單一的結尾 CRC 會如何失效:

  • 接收端會依照損毀的 LEN 採取動作,從線路上讀取那麼多個位元組——很可能遠多於傳送端原本想要的數量。

  • 只有結尾的那個 CRC 最終才會告訴接收端出了問題,而且還要等到所有那些位元組都被消耗完之後。

  • 在剖析器卡住等待錯誤數量的位元組的同時,緊接在損毀封包後抵達的真實封包會被當成酬載吞掉,於是接收端會損失好幾個封包,而不只是那一個。

把 CRC 拆開就能解決這個問題:

  • 標頭 CRC 涵蓋 HEADER、CMD 與 LEN。接收端會在讀取任何酬載之前先檢查它,因此一個損毀的 LEN 只要經過寥寥數個位元組就會被抓出來,剖析器立即重新同步,只會犧牲那一個壞封包。

  • 資料 CRC 涵蓋酬載。一旦標頭 CRC 通過,接收端便知道可以信任 LEN,於是精確地讀取那麼多個酬載位元組,並對照資料 CRC 加以驗證。

一種常見的尺寸配置——也是本頁所採用的——是標頭 CRC 用一個位元組(對於五位元組的標頭,一個 CRC-8 綽綽有餘),資料 CRC 用四個位元組(一個 CRC-32 能涵蓋好幾 KB 的酬載,碰撞率低到幾乎可以忽略)。

3.20.2.2. 輔助函式

MicroPython 直接內附 binascii.crc32() 可用於四位元組的 CRC。至於一位元組的標頭 CRC,使用 Maxim 1-wire 裝置所採用的多項式(反射形式為 0x8C)寫成一個小巧的輔助函式,短到足以直接寫進程式中:

def crc8(data: bytes) -> int:
    crc = 0
    for byte in data:
        crc ^= byte
        for _ in range(8):
            crc = (crc >> 1) ^ 0x8C if crc & 1 else crc >> 1
    return crc & 0xFF

一個完整的編碼器在單一函式中結合了這兩個 CRC:

import binascii
import struct

def encode_packet(cmd: int, payload: bytes) -> bytes:
    header = b"\xAA\x55" + bytes([cmd]) + struct.pack("<H", len(payload))
    hcrc   = crc8(header)
    dcrc   = binascii.crc32(payload)
    return header + bytes([hcrc]) + payload + struct.pack("<I", dcrc)

其反向函式則從一個完整封包中還原出命令與酬載,或在任一個 CRC 檢查失敗時回傳 None

def decode_packet(packet: bytes):
    # Layout: HEADER(2) + CMD(1) + LEN(2) + HCRC(1) + PAYLOAD(LEN) + DCRC(4)
    if len(packet) < 10 or packet[0:2] != b"\xAA\x55":
        return None

    header = packet[0:5]
    if crc8(header) != packet[5]:
        return None                       # header CRC mismatch

    cmd    = packet[2]
    length = struct.unpack("<H", packet[3:5])[0]
    if len(packet) != 6 + length + 4:
        return None                       # truncated or oversized

    payload       = packet[6:6 + length]
    received_dcrc = struct.unpack("<I", packet[6 + length:])[0]
    if binascii.crc32(payload) != received_dcrc:
        return None                       # data CRC mismatch

    return cmd, bytes(payload)

實務上,接收端並不會拿到一個完整的封包——位元組是透過 UART 一次一個地抵達,而一個在封包中途暫停的傳送端(或是一條會遺失位元組的雜訊線路),無法就這樣直接被 read() 讀進一個大小恰好的緩衝區中。下一節會以狀態機的形式,把同樣的解碼邏輯逐位元組地執行一遍。

3.20.3. 狀態機式接收端

接收端無法只是針對某個固定的 N 去呼叫 uart.read(N)——它並不知道下一個封包會有多少位元組,而且線路上的任何垃圾都會打亂對齊。解法是用一個小型狀態機,一次消耗一個位元組,並依據它目前位於封包中的哪個位置來做出反應。主迴圈會輪詢 any() 以查看緩衝了多少位元組,在單一次 read() 呼叫中把它們抽乾,並把每個位元組餵進狀態機:

import time
import binascii
import struct
from machine import UART

HEADER = b"\xAA\x55"
HEADER_LEN = len(HEADER)

# States, in the order the receiver walks through them per packet.
HUNT_FOR_HEADER = 0
READ_COMMAND    = 1
READ_LENGTH     = 2
READ_HEADER_CRC = 3
READ_PAYLOAD    = 4
READ_DATA_CRC   = 5

uart = UART(3, baudrate=115200)

# Receiver state plus partial-field buffers.
state        = HUNT_FOR_HEADER
matched      = 0              # bytes of HEADER matched so far
cmd          = 0              # CMD captured in READ_COMMAND
length_bytes = bytearray()    # raw LEN bytes (kept for the header CRC)
length       = 0              # unpacked LEN
payload      = bytearray()    # payload bytes accumulated in READ_PAYLOAD
crc_bytes    = bytearray()    # DCRC bytes accumulated in READ_DATA_CRC

def handle_packet(cmd: int, payload: bytes) -> None:
    print("cmd", cmd, "payload", payload)

while True:
    # Drain whatever bytes have arrived since the last poll. Idle
    # briefly when the line is quiet so the loop is not a busy spin.
    n = uart.any()
    if not n:
        time.sleep_ms(1)
        continue

    for b in uart.read(n):
        if state == HUNT_FOR_HEADER:
            # Walk the magic bytes. On a mismatch, back off by one
            # so a stray HEADER[0] in the noise still counts as a
            # possible start.
            if b == HEADER[matched]:
                matched += 1
                if matched == HEADER_LEN:
                    state = READ_COMMAND
                    matched = 0
            else:
                matched = 1 if b == HEADER[0] else 0

        elif state == READ_COMMAND:
            cmd = b
            length_bytes = bytearray()
            state = READ_LENGTH

        elif state == READ_LENGTH:
            # LEN is two bytes little-endian.
            length_bytes.append(b)
            if len(length_bytes) == 2:
                length = struct.unpack("<H", length_bytes)[0]
                state = READ_HEADER_CRC

        elif state == READ_HEADER_CRC:
            # Verify the CRC over HEADER + CMD + LEN before
            # committing to read LEN payload bytes. A mismatch
            # aborts here, after just five header bytes -- the
            # next valid header re-syncs quickly.
            expected = crc8(HEADER + bytes([cmd]) + length_bytes)
            if b == expected:
                payload = bytearray()
                state = READ_PAYLOAD
            else:
                state = HUNT_FOR_HEADER

        elif state == READ_PAYLOAD:
            payload.append(b)
            if len(payload) == length:
                crc_bytes = bytearray()
                state = READ_DATA_CRC

        elif state == READ_DATA_CRC:
            # Verify the CRC over the payload and either deliver
            # the packet or drop it. Either way, go back to
            # looking for the next header.
            crc_bytes.append(b)
            if len(crc_bytes) == 4:
                expected = binascii.crc32(payload)
                received = struct.unpack("<I", crc_bytes)[0]
                if expected == received:
                    handle_packet(cmd, bytes(payload))
                state = HUNT_FOR_HEADER

每個位元組會讓狀態機前進一步,或是在一個完整封包、一個錯誤的標頭 CRC 或一個錯誤的資料 CRC 之後退回 HUNT_FOR_HEADER。線路上不符合標頭的垃圾會被默默丟棄;下一個有效的標頭會讓接收端重新同步。關鍵的安全特性來自標頭 CRC:如果 LEN 欄位損毀,剖析器會在標頭 CRC 檢查之後(寥寥數個位元組)就抓到它,而不是在已經決定要讀取一個錯得離譜的酬載位元組數量之後。

3.20.4. 超越基礎

上述的訊框是一條序列連結要從線路雜訊中復原所需的最低限度:標頭魔術位元組、長度、命令,以及兩個 CRC。它能偵測損毀並在亂碼位元組之後重新同步,但它是放棄受損的封包,而不是想辦法讓它們通過;而且它讓傳送端對接收端究竟收到了什麼毫無頭緒。

現實世界的序列協定會在那條基礎之上層層堆疊功能。並非每一條嵌入式連結都需要全部的功能——挑選應用程式真正需要的即可:

  • 序號。 一個小型計數器,每次傳送時遞增。接收端可藉此偵測缺口(某個封包遺失了)、重複(傳送端重傳了,但接收端早已接受第一份副本),以及——在通道可能重新排序的情況下——亂序抵達。

  • 確認(ACK)。 一個專用的 ACK 封包(或回覆中夾帶的一個位元),由接收端送回,用以確認每個封包。少了 ACK,傳送端就無從得知它的資料是否抵達。

  • 否定確認(NACK)。 當接收端看到一次 CRC 失敗或一個序號缺口時送出的 NACK。傳送端會立即重傳,而不必等到 ACK 逾時觸發。

  • 重傳。 傳送端會把每個尚未被確認的封包保留在一個小型佇列中,並在逾時之後(或收到 NACK 時)重新送出。一個重試上限以及重試之間的某種退避,可避免一條永久損壞的連結無止盡地迴圈下去。

  • 滑動視窗。 在要求 ACK 之前允許多個封包同時在傳輸途中,可在來回時間相較於每封包傳送時間較長的連結上維持吞吐量。代價是傳送端需要更多狀態——每個傳輸途中的封包各占一個槽位。

  • 流量控制。 一個來自接收端的訊號,在其緩衝區快要被填滿時告訴傳送端放慢或暫停。實作方式各異——明確的 XON / XOFF 位元組、以額度為基礎的授予(接收端一次核准再多 N 個封包),或是電線本身上的 RTS / CTS 硬體線路。少了流量控制,一個快速的傳送端最終會壓垮一個緩慢的接收端,封包就會被丟棄。

  • 協定版本。 在封包前段放一個版本欄位,可讓格式得以演進。雙方可以在啟動時協商出兩者皆支援的最高版本,或拒絕來自不相容對端的封包。

  • 分片與重組。 兩位元組的 LEN 把封包上限定在 64 KiB;大於此值的訊息會被切分成多個封包,並在另一端重組。分片的中介資料(分片索引、總數,或一個「還有更多分片」旗標)位於酬載內部。

  • 心跳。 一個小型的週期性封包,說著「我還在這裡」。當心跳停止時,另一端會注意到並重新連線(或大聲報錯),而不是默默地卡住不動。

  • 通道。 在標頭中放一個通道或串流 ID,讓單一條實體連結承載數條邏輯串流——一條控制通道、一條遙測通道、一條記錄通道——僅憑該欄位加以區分。

  • 身分驗證。 一個短標籤,由酬載與一個只有合法的傳送端與接收端才知道的祕密值計算而得。接收端會根據它收到的位元組再計算一次該標籤,若兩者不符便拒絕該封包。這能抓出兩種情況:竄改(攻擊者修改了位元組),以及——如果標籤所涵蓋的內容包含序號或時間戳——重放,亦即攻擊者從線路上錄下一個真實封包,稍後再重新送出,以使接收端對它動作兩次。

  • 加密。 用一把共享的祕密金鑰把酬載位元組打亂,使得任何在沒有該金鑰的情況下讀取線路的人,只會看到雜訊。通常會與上述的身分驗證標籤搭配使用——少了它,攻擊者就能餵入剛好能通過 CRC 的垃圾,使接收端浪費運算週期去試圖解密一堆無意義的內容。

一個針對工業設備的典型「優良」協定,最終會具備訊框、雙重 CRC、序號、附帶重傳的 ACK / NACK,以及心跳。值得一看的現實世界範例:MAVLink(無人機遙測,具有序號、系統/元件 ID,以及選用的封包簽章)、Modbus(工業 PLC,具有功能碼與 CRC),以及 NMEA 0183(每一台消費級 GPS 接收器都會使用的 ASCII 協定——以行為單位的訊息,在一個星號分隔符之後接一個檢查碼)。