3.20. Послідовні протоколи, формати пакетів і CRC

UART у коді переміщував байти між двома кінцями. Самого по собі цього недостатньо для побудови надійного зв’язку. Щойно реальний пристрій опиняється на іншому кінці дроту, виникають три проблеми:

  • Де починається і закінчується повідомлення? Байти надходять потоком без вбудованого роздільника. Якщо приймач пропускає перший байт (увімкнувся пізніше відправника; короткий електричний збій на лінії), кожен наступний байт зміщується на одиницю, доки приймач не знайде нову точку синхронізації.

  • Яка довжина кожного повідомлення? 32-байтове зчитування з датчика і 4-байтова відповідь стану виглядають однаково на рівні байтів. Приймачу потрібен спосіб визначити, скільки байтів належить поточному повідомленню.

  • Чи прийшли байти без пошкоджень? Шуми можуть перевертати окремі біти. Без перевірки приймач без вагань діє на підставі пошкоджених даних.

Стандартна відповідь на всі три проблеми — загорнути дані у кадр пакета: відома послідовність байтів на початку, поле довжини, власне корисне навантаження і контрольна сума в кінці.

3.20.1. Формат кадру пакета

Типовий формат кадру:

Six fields drawn in sequence: a two-byte HEADER labelled 0xAA 0x55, a one-byte CMD field selecting which command this packet carries, a two-byte LEN field giving the payload size, a one-byte HCRC field covering HEADER plus CMD plus LEN, a variable-length PAYLOAD of LEN bytes whose format depends on the CMD, and a four-byte DCRC field covering the payload.

Кадрований пакет з окремими CRC для заголовка і даних: заголовок (магічні байти), команда, довжина, CRC заголовка, корисне навантаження, CRC даних.

Кожне поле виконує одне завдання:

  • Заголовок (магічні байти). Фіксована, нетипова послідовність байтів – зазвичай два байти, наприклад 0xAA 0x55 – яку приймач шукає у вхідному потоці. Коли послідовність знайдено, він знає, що починається новий пакет, і може відкинути весь «сміттєвий» вміст, що надійшов раніше.

  • Команда. Один байт, що вказує, що містить пакет. Різні значення команд використовують різні формати корисного навантаження – одна команда може означати «встановити кут серво» з двома байтами навантаження, інша – «зчитати датчик» без навантаження, ще одна – «записати повідомлення» з рядком. Приймач виконує диспетчеризацію за байтом команди, щоб знати, як інтерпретувати решту пакета.

  • Довжина. Два байти, що задають розмір корисного навантаження у байтах (тут — у форматі little-endian), що дозволяє навантаження до приблизно 64 КіБ. Приймач зчитує рівно стільки байтів після перевірки CRC заголовка.

  • CRC заголовка. Однобайтова контрольна сума полів HEADER, CMD і LEN. Приймач перевіряє її до зчитування будь-якого навантаження, тому пошкоджений LEN виявляється вже після декількох байтів (чому це важливо — дивіться розділ про CRC нижче).

  • Корисне навантаження. Специфічні для команди дані застосунку, рівно LEN байтів. Формат визначається байтом команди: запакований за допомогою struct запис полів фіксованої ширини, рядок, сирі дані пам’яті – все, про що домовились обидві сторони для цієї команди.

  • CRC даних. Чотирибайтовий CRC над байтами корисного навантаження. Приймач обчислює його повторно з щойно прийнятих байтів і відкидає пакет, якщо значення не збігається.

3.20.2. CRC

Найпростіша «контрольна сума» – це сума всіх байтів за модулем 256 або 65536. Вона відловлює більшість одиночних бітових помилок, але пропускає чимало багатобітових і не враховує порядок байтів.

Циклічний надлишковий код (CRC) є стандартним покращенням. Він розглядає вхідні дані як одне велике двійкове число і ділить його (особливим чином) на фіксований поліном; залишок від ділення і є CRC. Різні поліноми відловлюють різні класи помилок; поширені 8-, 16- і 32-бітові поліноми кожен з них відловлюють будь-який пакет помилок коротший за їхню ширину, а також значну частку довших пакетів.

3.20.2.1. Навіщо два CRC

У схемі пакета вище є два окремих CRC – один для заголовка (HEADER, CMD, LEN) і один для корисного навантаження. Саме це й потрібно надійному формату кадру, оскільки єдиний завершальний CRC не справляється, коли під час передачі пошкоджується поле LEN:

  • Приймач діє на підставі пошкодженого LEN і зчитує стільки байтів з лінії – можливо, значно більше, ніж відправляв відправник.

  • Лише завершальний CRC з часом повідомляє приймачу, що щось пішло не так, і лише після того, як усі ці байти вже спожито.

  • Поки аналізатор очікує неправильну кількість байтів, реальні пакети, що надходять за пошкодженим, поглинаються як навантаження, і приймач втрачає кілька пакетів замість одного.

Розподіл CRC вирішує цю проблему:

  • CRC заголовка охоплює HEADER, CMD і LEN. Приймач перевіряє його до зчитування будь-якого навантаження, тому пошкоджений LEN виявляється вже після кількох байтів, і аналізатор негайно повторно синхронізується, «збиваючи» лише один поганий пакет.

  • CRC даних охоплює корисне навантаження. Після проходження перевірки CRC заголовка приймач знає, що може довіряти LEN, зчитує рівно стільки байтів навантаження і звіряє їх з CRC даних.

Поширений вибір розмірів – і те, що використовується на цій сторінці – один байт для CRC заголовка (CRC-8 цілком достатньо для п’ятибайтового заголовка) і чотири байти для CRC даних (CRC-32 покриває багато кілобайтів навантаження з надзвичайно низькою ймовірністю колізії).

3.20.2.2. Допоміжні функції

MicroPython постачає binascii.crc32() для чотирибайтового CRC безпосередньо. Для однобайтового CRC заголовка невелика допоміжна функція, що використовує поліном пристроїв Maxim 1-wire (0x8C у відображеній формі), достатньо коротка, щоб написати її прямо в коді:

def crc8(data: bytes) -> int:
    crc = 0
    for byte in data:
        crc ^= byte
        for _ in range(8):
            crc = (crc >> 1) ^ 0x8C if crc & 1 else crc >> 1
    return crc & 0xFF

Повний кодувальник поєднує два CRC в одній функції:

import binascii
import struct

def encode_packet(cmd: int, payload: bytes) -> bytes:
    header = b"\xAA\x55" + bytes([cmd]) + struct.pack("<H", len(payload))
    hcrc   = crc8(header)
    dcrc   = binascii.crc32(payload)
    return header + bytes([hcrc]) + payload + struct.pack("<I", dcrc)

Зворотна функція відновлює команду і корисне навантаження з повного пакета або повертає None, якщо будь-яка з перевірок CRC не проходить:

def decode_packet(packet: bytes):
    # Layout: HEADER(2) + CMD(1) + LEN(2) + HCRC(1) + PAYLOAD(LEN) + DCRC(4)
    if len(packet) < 10 or packet[0:2] != b"\xAA\x55":
        return None

    header = packet[0:5]
    if crc8(header) != packet[5]:
        return None                       # header CRC mismatch

    cmd    = packet[2]
    length = struct.unpack("<H", packet[3:5])[0]
    if len(packet) != 6 + length + 4:
        return None                       # truncated or oversized

    payload       = packet[6:6 + length]
    received_dcrc = struct.unpack("<I", packet[6 + length:])[0]
    if binascii.crc32(payload) != received_dcrc:
        return None                       # data CRC mismatch

    return cmd, bytes(payload)

На практиці приймач не отримує одразу весь пакет – байти надходять по одному через UART, і відправник, що робить паузу посеред пакета (або шумна лінія, що втрачає байт), не може просто бути зчитаним за допомогою read() у буфер потрібного розміру. Наступний розділ виконує ту саму логіку декодування побайтово у вигляді скінченного автомата.

3.20.3. Приймач на основі скінченного автомата

Приймач не може просто викликати uart.read(N) для якогось фіксованого N – він не знає, скільки байтів буде в наступному пакеті, і будь-який «сміттєвий» вміст на лінії порушує вирівнювання. Рішення – невеликий скінченний автомат, який поглинає байти по одному і реагує залежно від того, де він знаходиться в пакеті. Головний цикл опитує any(), щоб дізнатися, скільки байтів буферовано, зчитує їх за один виклик read() і передає кожен байт через скінченний автомат:

import time
import binascii
import struct
from machine import UART

HEADER = b"\xAA\x55"
HEADER_LEN = len(HEADER)

# States, in the order the receiver walks through them per packet.
HUNT_FOR_HEADER = 0
READ_COMMAND    = 1
READ_LENGTH     = 2
READ_HEADER_CRC = 3
READ_PAYLOAD    = 4
READ_DATA_CRC   = 5

uart = UART(3, baudrate=115200)

# Receiver state plus partial-field buffers.
state        = HUNT_FOR_HEADER
matched      = 0              # bytes of HEADER matched so far
cmd          = 0              # CMD captured in READ_COMMAND
length_bytes = bytearray()    # raw LEN bytes (kept for the header CRC)
length       = 0              # unpacked LEN
payload      = bytearray()    # payload bytes accumulated in READ_PAYLOAD
crc_bytes    = bytearray()    # DCRC bytes accumulated in READ_DATA_CRC

def handle_packet(cmd: int, payload: bytes) -> None:
    print("cmd", cmd, "payload", payload)

while True:
    # Drain whatever bytes have arrived since the last poll. Idle
    # briefly when the line is quiet so the loop is not a busy spin.
    n = uart.any()
    if not n:
        time.sleep_ms(1)
        continue

    for b in uart.read(n):
        if state == HUNT_FOR_HEADER:
            # Walk the magic bytes. On a mismatch, back off by one
            # so a stray HEADER[0] in the noise still counts as a
            # possible start.
            if b == HEADER[matched]:
                matched += 1
                if matched == HEADER_LEN:
                    state = READ_COMMAND
                    matched = 0
            else:
                matched = 1 if b == HEADER[0] else 0

        elif state == READ_COMMAND:
            cmd = b
            length_bytes = bytearray()
            state = READ_LENGTH

        elif state == READ_LENGTH:
            # LEN is two bytes little-endian.
            length_bytes.append(b)
            if len(length_bytes) == 2:
                length = struct.unpack("<H", length_bytes)[0]
                state = READ_HEADER_CRC

        elif state == READ_HEADER_CRC:
            # Verify the CRC over HEADER + CMD + LEN before
            # committing to read LEN payload bytes. A mismatch
            # aborts here, after just five header bytes -- the
            # next valid header re-syncs quickly.
            expected = crc8(HEADER + bytes([cmd]) + length_bytes)
            if b == expected:
                payload = bytearray()
                state = READ_PAYLOAD
            else:
                state = HUNT_FOR_HEADER

        elif state == READ_PAYLOAD:
            payload.append(b)
            if len(payload) == length:
                crc_bytes = bytearray()
                state = READ_DATA_CRC

        elif state == READ_DATA_CRC:
            # Verify the CRC over the payload and either deliver
            # the packet or drop it. Either way, go back to
            # looking for the next header.
            crc_bytes.append(b)
            if len(crc_bytes) == 4:
                expected = binascii.crc32(payload)
                received = struct.unpack("<I", crc_bytes)[0]
                if expected == received:
                    handle_packet(cmd, bytes(payload))
                state = HUNT_FOR_HEADER

Кожен байт просуває скінченний автомат на один крок або повертає його до стану HUNT_FOR_HEADER після повного пакета, поганого CRC заголовка або поганого CRC даних. «Сміттєвий» вміст на лінії, що не відповідає заголовку, мовчки відкидається; наступний дійсний заголовок повторно синхронізує приймач. Ключова властивість безпеки пов’язана з CRC заголовка: якщо поле LEN пошкоджене, аналізатор виявляє це після перевірки CRC заголовка (через кілька байтів), а не після зобов’язання прочитати абсурдно неправильну кількість байтів навантаження.

3.20.4. Поза базовим рівнем

Наведений вище формат кадру є мінімальним для того, щоб послідовний зв’язок міг відновитися від шуму лінії: магічні байти заголовка, довжина, команда і два CRC. Він виявляє пошкодження і повторно синхронізується після зіпсованих байтів, але відмовляється від пошкоджених пакетів, не намагаючись їх доставити, і залишає відправника без інформації про те, що насправді почув приймач.

Реальні послідовні протоколи нашаровують функції поверх цього базового рівня. Не кожен вбудований канал зв’язку потребує всіх з них – обирайте те, що дійсно необхідно для застосунку:

  • Порядкові номери. Невеликий лічильник, що збільшується з кожним надсиланням. Приймач виявляє прогалини (пакет загублено), дублікати (відправник повторно передав, але приймач вже прийняв першу копію), а там, де канал може переставляти пакети, – надходження не в тому порядку.

  • Підтвердження. Окремий ACK-пакет (або прапорець-вставка у відповіді), який приймач надсилає назад для підтвердження кожного пакета. Без ACK відправник не має змоги дізнатися, чи надійшли його дані.

  • Негативні підтвердження. NACK, що надсилається, коли приймач бачить збій CRC або прогалину в послідовності. Відправник повторно передає негайно, замість того щоб чекати спрацювання тайм-ауту ACK.

  • Повторна передача. Відправник зберігає кожен непідтверджений пакет у невеликій черзі і повторно надсилає його після тайм-ауту (або за NACK). Ліміт спроб і деяка затримка між ними запобігають нескінченному циклу при постійно несправному каналі.

  • Ковзаючі вікна. Дозвіл кільком пакетам перебувати в дорозі до отримання ACK підтримує пропускну здатність на каналах, де час кругового обходу значно більший за час надсилання одного пакета. Ціна – більший стан на стороні відправника: один слот на пакет у дорозі.

  • Управління потоком. Сигнал від приймача, що наказує відправнику сповільнитися або зупинитися, коли буфер приймача заповнюється. Реалізації різняться – явні байти XON / XOFF, надання кредитів, де приймач дозволяє N додаткових пакетів за раз, або апаратні лінії RTS / CTS на самому дроті. Без управління потоком швидкий відправник врешті-решт переповнює повільний приймач і пакети відкидаються.

  • Версія протоколу. Поле версії на початку пакета дозволяє формату розвиватися. Кожна сторона може погодити найвищу версію, підтримувану обома, під час запуску або відхиляти пакети від несумісних вузлів.

  • Фрагментація та збирання. Двобайтовий LEN обмежує пакет 64 КіБ; повідомлення більшого розміру розбиваються на кілька пакетів і збираються на іншому кінці. Метадані фрагментації (індекс фрагмента, загальна кількість або прапорець «ще є фрагменти») розміщуються всередині корисного навантаження.

  • Пульси. Невеликий періодичний пакет, що означає «я ще тут». Інша сторона помічає, коли пульси зупиняються, і відновлює з’єднання (або явно повідомляє про помилку) замість того, щоб мовчки зависнути.

  • Канали. Ідентифікатор каналу або потоку в заголовку дозволяє одному фізичному каналу нести кілька логічних потоків – канал управління, канал телеметрії, канал журналювання – які розрізняються лише за цим полем.

  • Автентифікація. Короткий тег, обчислений з корисного навантаження і секретного значення, відомого лише законному відправнику і приймачу. Приймач знову обчислює тег з отриманих байтів і відхиляє пакет, якщо вони не збігаються. Це відловлює як підробку (зловмисник змінив байти), так – якщо порядковий номер або мітка часу входить до того, що охоплює тег – і повтор, де зловмисник записує справжній пакет з лінії і повторно надсилає його пізніше, змушуючи приймача діяти двічі.

  • Шифрування. Перемішування байтів корисного навантаження за допомогою спільного секретного ключа, щоб будь-хто, хто читає лінію без цього ключа, бачив лише шум. Зазвичай поєднується з наведеним вище тегом автентифікації – без нього зловмисник може надсилати «сміттєві» дані, які випадково проходять CRC, і приймач витрачає цикли на спробу розшифрувати безглуздя.

Типовий «хороший» протокол для промислового обладнання включає кадрування, подвійний CRC, порядкові номери, ACK / NACK із повторною передачею та пульси. Варті уваги реальні приклади: MAVLink (телеметрія дронів, із порядковими номерами, ідентифікаторами системи / компонента і необов’язковими підписами пакетів), Modbus (промислові PLC, з кодами функцій і CRC) і NMEA 0183 (ASCII-протокол, яким розмовляє кожен споживчий GPS-приймач – рядкові повідомлення з контрольною сумою після роздільника-зірочки).