3.20. Последовательные протоколы, кадрирование и CRC

UART в коде показал, как передавать байты между двумя сторонами. Сама по себе эта передача недостаточна для построения надёжного канала. Как только на другом конце провода оказывается реальное устройство, возникают три проблемы:

  • Где начинается и заканчивается сообщение? Байты поступают потоком без встроенного разделителя. Если приёмник пропустит первый байт (например, был включён позже передатчика или из-за кратковременного электрического сбоя на линии), все последующие байты будут смещены на один, пока приёмник не найдёт новую точку повторной синхронизации.

  • Какова длина каждого сообщения? 32-байтовое показание датчика и 4-байтовый ответ о состоянии на уровне байтов выглядят одинаково. Приёмнику нужен способ узнать, сколько байтов относится к текущему сообщению.

  • Дошли ли байты в целости? Шум может изменить отдельные биты. Без проверки приёмник спокойно начнёт действовать на основе повреждённых данных.

Стандартный ответ на все три вопроса — обернуть данные в пакетный кадр: известная последовательность байтов в начале, поле длины, сама полезная нагрузка и контрольная сумма в конце.

3.20.1. Кадрирование пакетов

Типичный формат кадрирования:

Шесть полей, изображённых последовательно: двухбайтовый HEADER с меткой 0xAA 0x55, однобайтовое поле CMD, выбирающее, какую команду несёт этот пакет, двухбайтовое поле LEN, задающее размер полезной нагрузки, однобайтовое поле HCRC, охватывающее HEADER, CMD и LEN, полезная нагрузка PAYLOAD переменной длины из LEN байтов, формат которой зависит от CMD, и четырёхбайтовое поле DCRC, охватывающее полезную нагрузку.

Кадрированный пакет с отдельными CRC для заголовка и данных: заголовок (магические байты), команда, длина, CRC заголовка, полезная нагрузка, CRC данных.

Каждое поле выполняет одну задачу:

  • Заголовок (магические байты). Фиксированная, необычная последовательность байтов — часто два байта, например 0xAA 0x55 — которую приёмник ищет во входящем потоке. Найдя эту последовательность, он понимает, что начинается новый пакет, и может отбросить любой мусор, поступивший до неё.

  • Команда. Один байт, указывающий, чем является пакет. Разные значения команды используют разные форматы полезной нагрузки — одна команда может означать «задать угол сервопривода» с двумя байтами полезной нагрузки, другая — «прочитать датчик» без полезной нагрузки, ещё одна — «записать сообщение в журнал» со строкой. Приёмник выбирает обработку по байту команды, чтобы понять, как интерпретировать остальную часть пакета.

  • Длина. Два байта, задающие размер полезной нагрузки в байтах (здесь — в формате little-endian), что позволяет иметь полезную нагрузку примерно до 64 КиБ. Приёмник считывает ровно столько байтов после того, как CRC заголовка проверен.

  • CRC заголовка. Однобайтовая контрольная сумма по полям HEADER, CMD и LEN. Приёмник проверяет её перед чтением какой-либо полезной нагрузки, поэтому повреждённое поле LEN обнаруживается уже после нескольких байтов (о том, почему это важно, см. раздел о CRC ниже).

  • Полезная нагрузка. Прикладные данные, специфичные для команды, длиной ровно LEN байтов. Формат определяется байтом команды: упакованная через struct запись из полей фиксированной ширины, строка, необработанная память — что угодно, о чём договорились обе стороны для этой команды.

  • CRC данных. Четырёхбайтовый CRC по байтам полезной нагрузки. Приёмник заново вычисляет его из только что прочитанных байтов и отбрасывает пакет, если значение не совпадает.

3.20.2. CRC

Простейшая «контрольная сумма» — это сумма всех байтов по модулю 256 или 65536. Она ловит большинство одиночных битовых ошибок, но пропускает многие многобитовые ошибки и игнорирует порядок байтов.

Циклический избыточный код (CRC) — это стандартное улучшение. Он рассматривает входные данные как одно длинное двоичное число и делит его (особым образом) на фиксированный полином; остаток от деления и есть CRC. Разные полиномы ловят разные классы ошибок; распространённые 8-, 16- и 32-битные полиномы каждый ловят все пакеты ошибок короче своей разрядности плюс значительную долю более длинных пакетов.

3.20.2.1. Почему два CRC

Диаграмма пакета выше несёт два отдельных CRC — один по заголовку (HEADER, CMD, LEN) и один по полезной нагрузке. Именно это нужно надёжному кадрированию, потому что единственный завершающий CRC даёт сбой, когда само поле LEN повреждается при передаче:

  • Приёмник действует на основе повреждённого LEN и считывает с линии столько байтов — возможно, гораздо больше, чем предполагал передатчик.

  • Только завершающий CRC в конце концов сообщает приёмнику, что что-то пошло не так, и только после того, как все эти байты уже поглощены.

  • Пока анализатор застрял в ожидании неправильного количества байтов, реальные пакеты, поступающие за повреждённым, поглощаются как полезная нагрузка, и приёмник теряет несколько пакетов вместо одного.

Разделение CRC решает эту проблему:

  • CRC заголовка охватывает HEADER, CMD и LEN. Приёмник проверяет его перед чтением какой-либо полезной нагрузки, поэтому повреждённое LEN обнаруживается уже после нескольких байтов, и анализатор немедленно повторно синхронизируется, теряя только один испорченный пакет.

  • CRC данных охватывает полезную нагрузку. После того как CRC заголовка прошёл проверку, приёмник знает, что может доверять LEN, считывает ровно столько байтов полезной нагрузки и проверяет их по CRC данных.

Типичный выбор размеров — и именно его использует эта страница — это один байт для CRC заголовка (CRC-8 более чем достаточно для пятибайтового заголовка) и четыре байта для CRC данных (CRC-32 покрывает многие килобайты полезной нагрузки с пренебрежимо малой вероятностью коллизий).

3.20.2.2. Вспомогательные функции

MicroPython предоставляет binascii.crc32() для четырёхбайтового CRC напрямую. Для однобайтового CRC заголовка небольшая вспомогательная функция, использующая полином устройств Maxim 1-wire (0x8C в отражённой форме), достаточно коротка, чтобы написать её прямо в коде:

def crc8(data: bytes) -> int:
    crc = 0
    for byte in data:
        crc ^= byte
        for _ in range(8):
            crc = (crc >> 1) ^ 0x8C if crc & 1 else crc >> 1
    return crc & 0xFF

Полный кодировщик объединяет оба CRC в одной функции:

import binascii
import struct

def encode_packet(cmd: int, payload: bytes) -> bytes:
    header = b"\xAA\x55" + bytes([cmd]) + struct.pack("<H", len(payload))
    hcrc   = crc8(header)
    dcrc   = binascii.crc32(payload)
    return header + bytes([hcrc]) + payload + struct.pack("<I", dcrc)

Обратная функция извлекает команду и полезную нагрузку из полного пакета или возвращает None, если хотя бы одна из проверок CRC не прошла:

def decode_packet(packet: bytes):
    # Layout: HEADER(2) + CMD(1) + LEN(2) + HCRC(1) + PAYLOAD(LEN) + DCRC(4)
    if len(packet) < 10 or packet[0:2] != b"\xAA\x55":
        return None

    header = packet[0:5]
    if crc8(header) != packet[5]:
        return None                       # header CRC mismatch

    cmd    = packet[2]
    length = struct.unpack("<H", packet[3:5])[0]
    if len(packet) != 6 + length + 4:
        return None                       # truncated or oversized

    payload       = packet[6:6 + length]
    received_dcrc = struct.unpack("<I", packet[6 + length:])[0]
    if binascii.crc32(payload) != received_dcrc:
        return None                       # data CRC mismatch

    return cmd, bytes(payload)

На практике приёмник не получает целый пакет сразу — байты поступают по одному через UART, и передатчик, делающий паузу посреди пакета (или зашумлённая линия, теряющая байт), нельзя просто прочитать через read() в буфер нужного размера. В следующем разделе та же логика декодирования выполняется байт за байтом в виде конечного автомата.

3.20.3. Приёмник на конечном автомате

Приёмник не может просто вызвать uart.read(N) для некоторого фиксированного N — он не знает, сколько байтов будет в следующем пакете, и любой мусор на линии сбивает выравнивание. Решение — небольшой конечный автомат, который потребляет байты по одному и реагирует в зависимости от того, на каком месте пакета он находится. Главный цикл опрашивает any(), чтобы узнать, сколько байтов в буфере, считывает их одним вызовом read() и пропускает каждый байт через конечный автомат:

import time
import binascii
import struct
from machine import UART

HEADER = b"\xAA\x55"
HEADER_LEN = len(HEADER)

# States, in the order the receiver walks through them per packet.
HUNT_FOR_HEADER = 0
READ_COMMAND    = 1
READ_LENGTH     = 2
READ_HEADER_CRC = 3
READ_PAYLOAD    = 4
READ_DATA_CRC   = 5

uart = UART(3, baudrate=115200)

# Receiver state plus partial-field buffers.
state        = HUNT_FOR_HEADER
matched      = 0              # bytes of HEADER matched so far
cmd          = 0              # CMD captured in READ_COMMAND
length_bytes = bytearray()    # raw LEN bytes (kept for the header CRC)
length       = 0              # unpacked LEN
payload      = bytearray()    # payload bytes accumulated in READ_PAYLOAD
crc_bytes    = bytearray()    # DCRC bytes accumulated in READ_DATA_CRC

def handle_packet(cmd: int, payload: bytes) -> None:
    print("cmd", cmd, "payload", payload)

while True:
    # Drain whatever bytes have arrived since the last poll. Idle
    # briefly when the line is quiet so the loop is not a busy spin.
    n = uart.any()
    if not n:
        time.sleep_ms(1)
        continue

    for b in uart.read(n):
        if state == HUNT_FOR_HEADER:
            # Walk the magic bytes. On a mismatch, back off by one
            # so a stray HEADER[0] in the noise still counts as a
            # possible start.
            if b == HEADER[matched]:
                matched += 1
                if matched == HEADER_LEN:
                    state = READ_COMMAND
                    matched = 0
            else:
                matched = 1 if b == HEADER[0] else 0

        elif state == READ_COMMAND:
            cmd = b
            length_bytes = bytearray()
            state = READ_LENGTH

        elif state == READ_LENGTH:
            # LEN is two bytes little-endian.
            length_bytes.append(b)
            if len(length_bytes) == 2:
                length = struct.unpack("<H", length_bytes)[0]
                state = READ_HEADER_CRC

        elif state == READ_HEADER_CRC:
            # Verify the CRC over HEADER + CMD + LEN before
            # committing to read LEN payload bytes. A mismatch
            # aborts here, after just five header bytes -- the
            # next valid header re-syncs quickly.
            expected = crc8(HEADER + bytes([cmd]) + length_bytes)
            if b == expected:
                payload = bytearray()
                state = READ_PAYLOAD
            else:
                state = HUNT_FOR_HEADER

        elif state == READ_PAYLOAD:
            payload.append(b)
            if len(payload) == length:
                crc_bytes = bytearray()
                state = READ_DATA_CRC

        elif state == READ_DATA_CRC:
            # Verify the CRC over the payload and either deliver
            # the packet or drop it. Either way, go back to
            # looking for the next header.
            crc_bytes.append(b)
            if len(crc_bytes) == 4:
                expected = binascii.crc32(payload)
                received = struct.unpack("<I", crc_bytes)[0]
                if expected == received:
                    handle_packet(cmd, bytes(payload))
                state = HUNT_FOR_HEADER

Каждый байт продвигает конечный автомат на один шаг или возвращает его в состояние HUNT_FOR_HEADER после полного пакета, неверного CRC заголовка или неверного CRC данных. Мусор на линии, не совпадающий с заголовком, молча отбрасывается; следующий действительный заголовок повторно синхронизирует приёмник. Ключевое свойство безопасности обеспечивается CRC заголовка: если поле LEN повреждено, анализатор обнаруживает это после проверки CRC заголовка (несколько байтов), а не после того, как обяжется прочитать совершенно неправильное количество байтов полезной нагрузки.

3.20.4. За пределами базового уровня

Кадрирование выше — это минимум, необходимый последовательному каналу для восстановления после шума на линии: магические байты заголовка, длина, команда и два CRC. Оно обнаруживает повреждения и повторно синхронизируется после искажённых байтов, но отказывается от повреждённых пакетов, а не доставляет их, и оставляет передатчик в неведении относительно того, что именно услышал приёмник.

Реальные последовательные протоколы добавляют поверх этого базового уровня дополнительные возможности. Не каждому встраиваемому каналу нужны все они — выбирайте то, что действительно требуется приложению:

  • Порядковые номера. Небольшой счётчик, увеличивающийся при каждой отправке. Приёмник обнаруживает пропуски (пакет был потерян), дубликаты (передатчик повторно передал, но приёмник уже принял первую копию) и — там, где канал может менять порядок, — поступление пакетов не по порядку.

  • Подтверждения. Специальный пакет ACK (или совмещённый бит в ответе), который приёмник отправляет обратно для подтверждения каждого пакета. Без ACK у передатчика нет способа узнать, что его данные дошли.

  • Отрицательные подтверждения. NACK, отправляемый, когда приёмник видит сбой CRC или пропуск в порядковых номерах. Передатчик немедленно повторяет передачу, вместо того чтобы ждать срабатывания тайм-аута ACK.

  • Повторная передача. Передатчик хранит каждый неподтверждённый пакет в небольшой очереди и повторно отправляет его по истечении тайм-аута (или по NACK). Ограничение числа попыток и некоторая задержка между повторами не дают навсегда сломанному каналу зациклиться.

  • Скользящие окна. Разрешение нескольким пакетам находиться в пути до требования ACK поддерживает пропускную способность на каналах, где время кругового обхода велико по сравнению со временем отправки одного пакета. Цена — больше состояния на стороне передатчика: по одной ячейке на каждый пакет в пути.

  • Управление потоком. Сигнал от приёмника, указывающий передатчику замедлиться или приостановиться, когда его буфер заполняется. Реализации различаются — явные байты XON / XOFF, кредитная схема, при которой приёмник выдаёт разрешение ещё на N пакетов за раз, или аппаратные линии RTS / CTS на самом проводе. Без управления потоком быстрый передатчик в конце концов переполняет медленный приёмник, и пакеты теряются.

  • Версия протокола. Поле версии в начале пакета позволяет формату развиваться. Каждая сторона может при запуске согласовать наивысшую версию, поддерживаемую обеими, или отвергать пакеты от несовместимых узлов.

  • Фрагментация и сборка. Двухбайтовое поле LEN ограничивает пакет 64 КиБ; сообщения больше этого размера разбиваются на несколько пакетов и собираются на другой стороне. Метаданные фрагментации (индекс фрагмента, общее количество или флаг «есть ещё фрагменты») находятся внутри полезной нагрузки.

  • Сигналы присутствия (heartbeat). Небольшой периодический пакет, который сообщает «я всё ещё здесь». Другая сторона замечает, когда сигналы присутствия прекращаются, и переподключается (или явно сообщает об ошибке), вместо того чтобы молча зависнуть.

  • Каналы. Идентификатор канала или потока в заголовке, чтобы один физический канал нёс несколько логических потоков — канал управления, канал телеметрии, канал журналирования, — различаемых только по этому полю.

  • Аутентификация. Короткий тег, вычисляемый из полезной нагрузки и секретного значения, которое знают только законные передатчик и приёмник. Приёмник снова вычисляет тег из полученных байтов и отвергает пакет, если оба значения не совпадают. Это ловит как подделку (злоумышленник изменил байты), так и — если порядковый номер или метка времени входят в то, что покрывает тег, — повторное воспроизведение, когда злоумышленник записывает реальный пакет с линии и повторно отправляет его позже, чтобы приёмник отреагировал на него дважды.

  • Шифрование. Перемешивание байтов полезной нагрузки с помощью общего секретного ключа, чтобы любой, кто читает линию без этого ключа, видел только шум. Обычно сочетается с тегом аутентификации, описанным выше — без него злоумышленник может подсунуть мусор, который случайно проходит проверку CRC, и приёмник тратит ресурсы, пытаясь расшифровать бессмыслицу.

Типичный «хороший» протокол для промышленного оборудования в итоге включает кадрирование, двойной CRC, порядковые номера, ACK / NACK с повторной передачей и сигналы присутствия. Примеры из реального мира, на которые стоит взглянуть: MAVLink (телеметрия дронов, с порядковыми номерами, идентификаторами системы / компонента и необязательными подписями пакетов), Modbus (промышленные ПЛК, с кодами функций и CRC) и NMEA 0183 (ASCII-протокол, на котором говорит каждый потребительский GPS-приёмник — построчные сообщения с контрольной суммой после разделителя-звёздочки).