3.20. 串行协议、分帧与 CRC

代码中的 UART 实现了在两端之间传输字节。但仅凭这一点还不足以构建可靠的链路。一旦线路另一端连接的是真实设备,就会立刻出现三个问题:

  • 一条消息从哪里开始、到哪里结束? 字节是以流的形式到达的,没有内置的分隔符。如果接收方漏掉了第一个字节(例如在发送方上电之后才上电,或线路上出现短暂的电气干扰),那么此后的每个字节都会错位一个,直到接收方找到新的重新同步点为止。

  • 每条消息有多长? 一段 32 字节的传感器读数和一条 4 字节的状态回复,在字节层面看起来完全一样。接收方需要某种方法来知道当前消息包含多少字节。

  • 字节是否完好无损地到达? 噪声可能翻转个别比特位。如果没有校验,接收方就会照单全收地处理被破坏的数据。

针对这三个问题的标准答案,是把数据包装进一个数据包帧(packet frame)中:开头是一段已知的字节序列,接着是长度字段、有效载荷本身,末尾是校验和。

3.20.1. 数据包分帧

一种典型的分帧格式:

依次绘制的六个字段:两字节的 HEADER 标记为 0xAA 0x55;一字节的 CMD 字段,用于选择 该数据包承载的是哪条命令;两字节的 LEN 字段,给出有效载荷的大小;一字节的 HCRC 字段, 覆盖 HEADER 加 CMD 加 LEN;长度可变的 PAYLOAD,共 LEN 字节,其格式取决于 CMD; 以及一个四字节的 DCRC 字段,覆盖有效载荷。

一个带有独立头部 CRC 与数据 CRC 的分帧数据包:头部(魔数字节)、命令、长度、头部 CRC、有效载荷、数据 CRC。

每个字段各司其职:

  • 头部(魔数字节)。 一段固定且不寻常的字节序列——通常是像 0xAA 0x55 这样的两个字节——接收方会在传入流中扫描它。一旦找到该序列,它就知道一个新数据包正在开始,并可以丢弃此前到达的任何垃圾数据。

  • 命令。 一个字节,说明该数据包是什么。不同的命令值使用不同的有效载荷格式——某个命令可能表示“设置舵机角度”,带两个有效载荷字节;另一个可能表示“读取传感器”,没有有效载荷;还有一个可能是“日志消息”,带一个字符串。接收方根据命令字节进行分派,以便知道如何解释数据包的其余部分。

  • 长度。 两个字节,给出有效载荷的字节大小(这里采用小端序),允许有效载荷最大约为 64 KiB。一旦头部 CRC 验证通过,接收方就精确读取这么多字节。

  • 头部 CRC。 一个字节的校验和,覆盖 HEADER、CMD 和 LEN 字段。接收方在读取任何有效载荷之前先检查它,因此被破坏的 LEN 仅需几个字节就能被发现(关于这一点为何重要,请参见下文的 CRC 一节)。

  • 有效载荷。 与命令相关的应用数据,长度恰好为 LEN 字节。其格式由命令字节决定:可以是一条 struct 打包的定宽字段记录、一个字符串、原始内存——只要双方对该命令达成一致即可。

  • 数据 CRC。 覆盖有效载荷字节的四字节 CRC。接收方根据刚读取的字节重新计算它,如果不匹配就丢弃该数据包。

3.20.2. CRC

最简单的“校验和”就是所有字节之和,对 256 或 65536 取模。它能捕捉大多数单比特翻转,但会漏掉很多多比特错误,并且忽略了字节顺序。

循环冗余校验(CRC)是标准的升级方案。它把输入当作一个长长的二进制数,并(以一种特殊的方式)将其除以一个固定的多项式;除法的余数就是 CRC。不同的多项式能捕捉不同类别的错误;常见的 8 位、16 位和 32 位多项式各自都能捕捉到所有短于其位宽的突发错误,以及很大一部分更长的突发错误。

3.20.2.1. 为什么要用两个 CRC

上面的数据包示意图带有两个独立的 CRC——一个覆盖头部(HEADER、CMD、LEN),另一个覆盖有效载荷。这正是一个健壮的分帧方案真正需要的,原因在于:当 LEN 字段本身在传输途中被破坏时,单个置于末尾的 CRC 会如何失效:

  • 接收方按照被破坏的 LEN 行事,从线路上读取那么多字节——很可能远多于发送方的本意。

  • 只有末尾的 CRC 最终才会告诉接收方出了问题,而且要等到所有这些字节都被消耗完之后。

  • 在解析器卡住、等待错误数量的字节期间,紧随被破坏数据包之后到达的真实数据包会被当作有效载荷吞掉,于是接收方丢失的是好几个数据包,而不仅仅是那一个。

把 CRC 拆开就能解决这个问题:

  • 头部 CRC 覆盖 HEADER、CMD 和 LEN。接收方在读取任何有效载荷之前先检查它,因此被破坏的 LEN 仅需几个字节就能被发现,解析器立即重新同步,只损失那一个坏数据包。

  • 数据 CRC 覆盖有效载荷。一旦头部 CRC 通过,接收方就知道可以信任 LEN,于是精确读取那么多有效载荷字节,并用数据 CRC 对它们进行校验。

一种常见的尺寸搭配——也是本页所采用的——是头部 CRC 用一个字节(对于五字节的头部,CRC-8 已绰绰有余),数据 CRC 用四个字节(CRC-32 能以极低的碰撞率覆盖数千字节的有效载荷)。

3.20.2.2. 辅助函数

MicroPython 直接提供了 binascii.crc32() 用于四字节 CRC。对于一字节的头部 CRC,使用 Maxim 的 1-wire 设备所用多项式(反射形式为 0x8C)编写的一个小辅助函数足够简短,可以内联写出:

def crc8(data: bytes) -> int:
    crc = 0
    for byte in data:
        crc ^= byte
        for _ in range(8):
            crc = (crc >> 1) ^ 0x8C if crc & 1 else crc >> 1
    return crc & 0xFF

一个完整的编码器把这两个 CRC 组合到一个函数中:

import binascii
import struct

def encode_packet(cmd: int, payload: bytes) -> bytes:
    header = b"\xAA\x55" + bytes([cmd]) + struct.pack("<H", len(payload))
    hcrc   = crc8(header)
    dcrc   = binascii.crc32(payload)
    return header + bytes([hcrc]) + payload + struct.pack("<I", dcrc)

其逆函数从一个完整的数据包中恢复出命令和有效载荷,如果任一 CRC 校验失败则返回 None

def decode_packet(packet: bytes):
    # Layout: HEADER(2) + CMD(1) + LEN(2) + HCRC(1) + PAYLOAD(LEN) + DCRC(4)
    if len(packet) < 10 or packet[0:2] != b"\xAA\x55":
        return None

    header = packet[0:5]
    if crc8(header) != packet[5]:
        return None                       # header CRC mismatch

    cmd    = packet[2]
    length = struct.unpack("<H", packet[3:5])[0]
    if len(packet) != 6 + length + 4:
        return None                       # truncated or oversized

    payload       = packet[6:6 + length]
    received_dcrc = struct.unpack("<I", packet[6 + length:])[0]
    if binascii.crc32(payload) != received_dcrc:
        return None                       # data CRC mismatch

    return cmd, bytes(payload)

实际上接收方不会一次性拿到整个数据包——字节是通过 UART 一个接一个地到达的,而一个在数据包中途暂停的发送方(或一条丢失了字节的嘈杂线路)无法被简单地 read() 进一个大小正好合适的缓冲区。下一节将以状态机的形式,逐字节地运行同样的解码逻辑。

3.20.3. 状态机接收方

接收方不能简单地用某个固定的 N 去调用 uart.read(N)——它并不知道下一个数据包会有多少字节,而且线路上的任何垃圾数据都会打乱对齐。解决办法是一个小型状态机,它一次消耗一个字节,并根据自己在数据包中所处的位置做出反应。主循环轮询 any() 以查看缓冲了多少字节,用一次 read() 调用把它们全部取出,然后让每个字节通过状态机:

import time
import binascii
import struct
from machine import UART

HEADER = b"\xAA\x55"
HEADER_LEN = len(HEADER)

# States, in the order the receiver walks through them per packet.
HUNT_FOR_HEADER = 0
READ_COMMAND    = 1
READ_LENGTH     = 2
READ_HEADER_CRC = 3
READ_PAYLOAD    = 4
READ_DATA_CRC   = 5

uart = UART(3, baudrate=115200)

# Receiver state plus partial-field buffers.
state        = HUNT_FOR_HEADER
matched      = 0              # bytes of HEADER matched so far
cmd          = 0              # CMD captured in READ_COMMAND
length_bytes = bytearray()    # raw LEN bytes (kept for the header CRC)
length       = 0              # unpacked LEN
payload      = bytearray()    # payload bytes accumulated in READ_PAYLOAD
crc_bytes    = bytearray()    # DCRC bytes accumulated in READ_DATA_CRC

def handle_packet(cmd: int, payload: bytes) -> None:
    print("cmd", cmd, "payload", payload)

while True:
    # Drain whatever bytes have arrived since the last poll. Idle
    # briefly when the line is quiet so the loop is not a busy spin.
    n = uart.any()
    if not n:
        time.sleep_ms(1)
        continue

    for b in uart.read(n):
        if state == HUNT_FOR_HEADER:
            # Walk the magic bytes. On a mismatch, back off by one
            # so a stray HEADER[0] in the noise still counts as a
            # possible start.
            if b == HEADER[matched]:
                matched += 1
                if matched == HEADER_LEN:
                    state = READ_COMMAND
                    matched = 0
            else:
                matched = 1 if b == HEADER[0] else 0

        elif state == READ_COMMAND:
            cmd = b
            length_bytes = bytearray()
            state = READ_LENGTH

        elif state == READ_LENGTH:
            # LEN is two bytes little-endian.
            length_bytes.append(b)
            if len(length_bytes) == 2:
                length = struct.unpack("<H", length_bytes)[0]
                state = READ_HEADER_CRC

        elif state == READ_HEADER_CRC:
            # Verify the CRC over HEADER + CMD + LEN before
            # committing to read LEN payload bytes. A mismatch
            # aborts here, after just five header bytes -- the
            # next valid header re-syncs quickly.
            expected = crc8(HEADER + bytes([cmd]) + length_bytes)
            if b == expected:
                payload = bytearray()
                state = READ_PAYLOAD
            else:
                state = HUNT_FOR_HEADER

        elif state == READ_PAYLOAD:
            payload.append(b)
            if len(payload) == length:
                crc_bytes = bytearray()
                state = READ_DATA_CRC

        elif state == READ_DATA_CRC:
            # Verify the CRC over the payload and either deliver
            # the packet or drop it. Either way, go back to
            # looking for the next header.
            crc_bytes.append(b)
            if len(crc_bytes) == 4:
                expected = binascii.crc32(payload)
                received = struct.unpack("<I", crc_bytes)[0]
                if expected == received:
                    handle_packet(cmd, bytes(payload))
                state = HUNT_FOR_HEADER

每个字节让状态机前进一步,或者在一个完整数据包之后、头部 CRC 错误之后、或数据 CRC 错误之后回退到 HUNT_FOR_HEADER。线路上与头部不匹配的垃圾数据会被静默丢弃;下一个有效的头部会让接收方重新同步。关键的安全特性来自头部 CRC:如果 LEN 字段被破坏,解析器会在头部 CRC 检查之后(仅几个字节)就抓到它,而不是在已经决定去读取一个荒谬错误数量的有效载荷字节之后才发现。

3.20.4. 超越基础方案

上面的分帧方案是一条串行链路从线路噪声中恢复所需的最低限度:头部魔数、长度、命令以及两个 CRC。它能检测损坏并在字节出错后重新同步,但它对受损的数据包是直接放弃,而不是设法让它们通过;而且它让发送方完全不知道接收方究竟收到了什么。

现实世界中的串行协议会在这个基础之上叠加各种功能。并非每条嵌入式链路都需要全部这些功能——挑选应用真正需要的即可:

  • 序列号。 一个小计数器,每次发送时递增。接收方据此检测出间隙(有数据包丢失)、重复(发送方重传了,但接收方已经接受了第一份副本),以及——在信道可能重排顺序的情况下——乱序到达。

  • 确认(ACK)。 一个专门的 ACK 数据包(或回复中搭载的一个比特位),接收方将其发回以确认每个数据包。如果没有 ACK,发送方就无从知晓它的数据是否到达。

  • 否定确认(NACK)。 当接收方发现 CRC 失败或序列号间隙时发送的一个 NACK。发送方据此立即重传,而不必等待 ACK 超时触发。

  • 重传。 发送方把每个未确认的数据包保存在一个小队列中,并在超时后(或收到 NACK 时)重新发送。重试次数上限以及重试之间的某种退避机制,可以避免一条永久故障的链路无休止地循环下去。

  • 滑动窗口。 允许在要求 ACK 之前有若干个数据包处于在途状态,可以在往返时间相对于每个数据包的发送时间较长的链路上维持吞吐量。其代价是发送方需要更多状态——每个在途数据包占用一个槽位。

  • 流量控制。 接收方发出的一种信号,当其缓冲区即将填满时告诉发送方放慢或暂停。具体实现各不相同——显式的 XON / XOFF 字节、基于信用额度的授予(接收方每次许可 N 个数据包),或者线路本身上的 RTS / CTS 硬件信号线。如果没有流量控制,一个快速的发送方最终会压垮一个缓慢的接收方,从而导致数据包被丢弃。

  • 协议版本。 在数据包靠前位置放置一个版本字段,可以让格式不断演进。双方可以在启动时协商出二者都支持的最高版本,或者拒绝来自不兼容对端的数据包。

  • 分片与重组。 两字节的 LEN 把数据包上限限制在 64 KiB;大于此值的消息会被拆分成多个数据包,并在另一端重新组装。分片元数据(分片索引、总片数,或一个“还有更多分片”的标志)保存在有效载荷内部。

  • 心跳。 一个小型的周期性数据包,表示“我还在线”。当心跳停止时,另一端会注意到并重新连接(或大声报错),而不是悄无声息地挂起。

  • 信道。 在头部中放置一个信道或流 ID,使一条物理链路可以承载多条逻辑流——一条控制信道、一条遥测信道、一条日志信道——它们仅靠该字段加以区分。

  • 认证。 一个由有效载荷和一个秘密值计算得出的短标签,只有合法的发送方和接收方才知道该秘密值。接收方根据收到的字节再次计算该标签,如果两者不匹配就拒绝该数据包。这既能抓出篡改(攻击者修改了字节),也能——如果序列号或时间戳是该标签所覆盖内容的一部分——抓出重放攻击,即攻击者从线路上记录下一个真实的数据包,稍后再重新发送它,以使接收方对其重复处理两次。

  • 加密。 用一个共享密钥打乱有效载荷字节,使任何没有该密钥而读取线路的人只会看到噪声。通常与上面的认证标签结合使用——如果没有它,攻击者就可以喂入恰好能通过 CRC 的垃圾数据,让接收方白白浪费时间去尝试解密无意义的内容。

一个面向工业设备的典型“优秀”协议,最终会具备分帧、双 CRC、序列号、带重传的 ACK / NACK 以及心跳。一些值得一看的现实世界例子:MAVLink(无人机遥测,带有序列号、系统 / 组件 ID 以及可选的数据包签名)、Modbus(工业 PLC,带有功能码和 CRC),以及 NMEA 0183(每台消费级 GPS 接收机都使用的 ASCII 协议——基于行的消息,在星号分隔符之后带一个校验和)。