3.20. Protocolli seriali, framing e CRC¶
UART nel codice ha spostato byte tra due estremità. Da solo, questo non basta per costruire un collegamento affidabile. Tre problemi emergono nel momento in cui all’altro capo del filo c’è un dispositivo reale:
Dove inizia e dove finisce un messaggio? I byte arrivano in un flusso senza alcun delimitatore integrato. Se il ricevitore perde il primo byte (acceso dopo il mittente; un breve disturbo elettrico sulla linea), ogni byte successivo risulta sfasato di uno finché il ricevitore non trova un nuovo punto di risincronizzazione.
Quanto è lungo ciascun messaggio? Una lettura del sensore da 32 byte e una risposta di stato da 4 byte appaiono identiche a livello di byte. Il ricevitore ha bisogno di un modo per sapere quanti byte appartengono al messaggio corrente.
I byte sono arrivati intatti? Il rumore può invertire singoli bit. Senza un controllo, il ricevitore agisce tranquillamente su dati corrotti.
La risposta standard a tutti e tre i problemi è racchiudere i dati in un frame di pacchetto: una sequenza di byte nota all’inizio, un campo di lunghezza, il payload stesso e una checksum alla fine.
3.20.1. Framing dei pacchetti¶
Un tipico formato di framing:
Un pacchetto con framing dotato di CRC separati per header e dati: header (byte magici), comando, lunghezza, CRC dell’header, payload, CRC dei dati.¶
Ogni campo svolge un compito:
Header (byte magici). Una sequenza di byte fissa e insolita – spesso due byte come
0xAA 0x55– che il ricevitore cerca nel flusso in ingresso. Quando trova la sequenza, sa che sta iniziando un nuovo pacchetto e può scartare qualsiasi spazzatura arrivata prima.Comando. Un singolo byte che indica cosa è il pacchetto. Valori di comando diversi usano formati di payload diversi – un comando potrebbe significare «imposta angolo servo» con due byte di payload, un altro potrebbe significare «leggi sensore» senza payload, un altro ancora potrebbe essere «messaggio di log» con una stringa. Il ricevitore smista in base al byte di comando per sapere come interpretare il resto del pacchetto.
Lunghezza. Due byte che indicano la dimensione del payload in byte (qui little-endian), consentendo payload fino a circa 64 KiB. Il ricevitore legge esattamente questo numero di byte una volta verificato il CRC dell’header.
CRC dell’header. Una checksum di un byte sui campi HEADER, CMD e LEN. Il ricevitore la verifica prima di leggere qualsiasi payload, in modo che un LEN corrotto venga rilevato dopo appena una manciata di byte (vedi la sezione sui CRC più sotto per capire perché questo è importante).
Payload. Dati applicativi specifici del comando, lunghi esattamente LEN byte. Il formato è determinato dal byte di comando: un record impacchettato con
structdi campi a larghezza fissa, una stringa, memoria grezza – qualunque cosa entrambe le parti concordino per quel comando.CRC dei dati. Un CRC di quattro byte sui byte del payload. Il ricevitore lo ricalcola dai byte appena letti e scarta il pacchetto se non corrisponde.
3.20.2. CRC¶
La «checksum» più semplice è la somma di tutti i byte, modulo 256 o 65536. Rileva la maggior parte delle inversioni di singolo bit ma manca molti errori multi-bit e ignora l’ordine dei byte.
Un cyclic redundancy check (CRC) è l’aggiornamento standard. Tratta l’input come un unico lungo numero binario e lo divide (in un modo speciale) per un polinomio fisso; il resto della divisione è il CRC. Polinomi diversi rilevano classi diverse di errori; i comuni polinomi a 8, 16 e 32 bit rilevano ciascuno ogni raffica di errori più corta della loro larghezza più una grande frazione di raffiche più lunghe.
3.20.2.1. Perché due CRC¶
Il diagramma del pacchetto qui sopra trasporta due CRC separati – uno sull’header (HEADER, CMD, LEN) e uno sul payload. Questo è ciò di cui un framing robusto ha effettivamente bisogno, a causa di come un singolo CRC finale fallisce quando il campo LEN stesso viene corrotto durante il transito:
Il ricevitore agisce sul LEN corrotto e legge quel numero di byte dal filo – possibilmente molti più di quanti il mittente intendesse.
Solo il CRC finale alla fine comunica al ricevitore che qualcosa è andato storto, e solo dopo che tutti quei byte sono stati consumati.
Mentre il parser è bloccato in attesa del numero sbagliato di byte, i pacchetti reali che arrivano dietro quello corrotto vengono inghiottiti come payload, e il ricevitore perde diversi pacchetti anziché uno solo.
Suddividere il CRC risolve questo problema:
Il CRC dell’header copre HEADER, CMD e LEN. Il ricevitore lo verifica prima di leggere qualsiasi payload, in modo che un LEN corrotto venga rilevato dopo una manciata di byte e il parser si risincronizzi immediatamente, eliminando solo l’unico pacchetto difettoso.
Il CRC dei dati copre il payload. Una volta superato il CRC dell’header, il ricevitore sa di potersi fidare di LEN, legge esattamente quel numero di byte di payload e li verifica rispetto al CRC dei dati.
Un dimensionamento comune – e quello usato in questa pagina – è un byte per il CRC dell’header (un CRC-8 è più che sufficiente per un header di cinque byte) e quattro byte per il CRC dei dati (un CRC-32 copre molti kilobyte di payload con un tasso di collisione infinitesimale).
3.20.2.2. Funzioni di supporto¶
MicroPython include binascii.crc32() per il CRC a quattro byte direttamente. Per il CRC dell’header a un byte, un piccolo helper che usa il polinomio impiegato dai dispositivi 1-wire di Maxim (0x8C in forma riflessa) è abbastanza breve da scrivere inline:
def crc8(data: bytes) -> int:
crc = 0
for byte in data:
crc ^= byte
for _ in range(8):
crc = (crc >> 1) ^ 0x8C if crc & 1 else crc >> 1
return crc & 0xFF
Un encoder completo combina i due CRC in un’unica funzione:
import binascii
import struct
def encode_packet(cmd: int, payload: bytes) -> bytes:
header = b"\xAA\x55" + bytes([cmd]) + struct.pack("<H", len(payload))
hcrc = crc8(header)
dcrc = binascii.crc32(payload)
return header + bytes([hcrc]) + payload + struct.pack("<I", dcrc)
La funzione inversa recupera il comando e il payload da un pacchetto completo, oppure restituisce None se uno dei due controlli CRC fallisce:
def decode_packet(packet: bytes):
# Layout: HEADER(2) + CMD(1) + LEN(2) + HCRC(1) + PAYLOAD(LEN) + DCRC(4)
if len(packet) < 10 or packet[0:2] != b"\xAA\x55":
return None
header = packet[0:5]
if crc8(header) != packet[5]:
return None # header CRC mismatch
cmd = packet[2]
length = struct.unpack("<H", packet[3:5])[0]
if len(packet) != 6 + length + 4:
return None # truncated or oversized
payload = packet[6:6 + length]
received_dcrc = struct.unpack("<I", packet[6 + length:])[0]
if binascii.crc32(payload) != received_dcrc:
return None # data CRC mismatch
return cmd, bytes(payload)
In pratica il ricevitore non riceve un pacchetto intero già pronto – i byte arrivano uno alla volta attraverso la UART, e un mittente che si interrompe a metà pacchetto (o una linea rumorosa che perde un byte) non può semplicemente essere letto con read() in un buffer della dimensione giusta. La prossima sezione esegue la stessa logica di decodifica byte per byte come una macchina a stati.
3.20.3. Un ricevitore a macchina a stati¶
Il ricevitore non può semplicemente chiamare uart.read(N) per un N fisso – non sa quanti byte sarà il prossimo pacchetto, e qualsiasi spazzatura sulla linea altera l’allineamento. La soluzione è una piccola macchina a stati che consuma i byte uno alla volta e reagisce in base a dove si trova nel pacchetto. Il ciclo principale interroga any() per vedere quanti byte sono nel buffer, li svuota in un’unica chiamata read() e passa ciascun byte attraverso la macchina a stati:
import time
import binascii
import struct
from machine import UART
HEADER = b"\xAA\x55"
HEADER_LEN = len(HEADER)
# States, in the order the receiver walks through them per packet.
HUNT_FOR_HEADER = 0
READ_COMMAND = 1
READ_LENGTH = 2
READ_HEADER_CRC = 3
READ_PAYLOAD = 4
READ_DATA_CRC = 5
uart = UART(3, baudrate=115200)
# Receiver state plus partial-field buffers.
state = HUNT_FOR_HEADER
matched = 0 # bytes of HEADER matched so far
cmd = 0 # CMD captured in READ_COMMAND
length_bytes = bytearray() # raw LEN bytes (kept for the header CRC)
length = 0 # unpacked LEN
payload = bytearray() # payload bytes accumulated in READ_PAYLOAD
crc_bytes = bytearray() # DCRC bytes accumulated in READ_DATA_CRC
def handle_packet(cmd: int, payload: bytes) -> None:
print("cmd", cmd, "payload", payload)
while True:
# Drain whatever bytes have arrived since the last poll. Idle
# briefly when the line is quiet so the loop is not a busy spin.
n = uart.any()
if not n:
time.sleep_ms(1)
continue
for b in uart.read(n):
if state == HUNT_FOR_HEADER:
# Walk the magic bytes. On a mismatch, back off by one
# so a stray HEADER[0] in the noise still counts as a
# possible start.
if b == HEADER[matched]:
matched += 1
if matched == HEADER_LEN:
state = READ_COMMAND
matched = 0
else:
matched = 1 if b == HEADER[0] else 0
elif state == READ_COMMAND:
cmd = b
length_bytes = bytearray()
state = READ_LENGTH
elif state == READ_LENGTH:
# LEN is two bytes little-endian.
length_bytes.append(b)
if len(length_bytes) == 2:
length = struct.unpack("<H", length_bytes)[0]
state = READ_HEADER_CRC
elif state == READ_HEADER_CRC:
# Verify the CRC over HEADER + CMD + LEN before
# committing to read LEN payload bytes. A mismatch
# aborts here, after just five header bytes -- the
# next valid header re-syncs quickly.
expected = crc8(HEADER + bytes([cmd]) + length_bytes)
if b == expected:
payload = bytearray()
state = READ_PAYLOAD
else:
state = HUNT_FOR_HEADER
elif state == READ_PAYLOAD:
payload.append(b)
if len(payload) == length:
crc_bytes = bytearray()
state = READ_DATA_CRC
elif state == READ_DATA_CRC:
# Verify the CRC over the payload and either deliver
# the packet or drop it. Either way, go back to
# looking for the next header.
crc_bytes.append(b)
if len(crc_bytes) == 4:
expected = binascii.crc32(payload)
received = struct.unpack("<I", crc_bytes)[0]
if expected == received:
handle_packet(cmd, bytes(payload))
state = HUNT_FOR_HEADER
Ogni byte fa avanzare la macchina a stati di un passo, oppure la riporta a HUNT_FOR_HEADER dopo un pacchetto completo, un CRC dell’header errato o un CRC dei dati errato. La spazzatura sulla linea che non corrisponde all’header viene scartata silenziosamente; il successivo header valido risincronizza il ricevitore. La proprietà di sicurezza chiave deriva dal CRC dell’header: se il campo LEN è corrotto, il parser lo rileva dopo il controllo del CRC dell’header (una manciata di byte), non dopo essersi impegnato a leggere un numero di byte di payload selvaggiamente sbagliato.
3.20.4. Oltre il livello base¶
Il framing descritto sopra è il minimo di cui un collegamento seriale ha bisogno per recuperare dal rumore di linea: byte magici dell’header, lunghezza, comando e due CRC. Rileva la corruzione e si risincronizza dopo byte alterati, ma rinuncia ai pacchetti danneggiati anziché farli passare, e lascia il mittente senza alcuna idea di cosa il ricevitore abbia effettivamente sentito.
I protocolli seriali del mondo reale aggiungono funzionalità a strati sopra quel livello base. Non tutti i collegamenti embedded hanno bisogno di tutte – scegli ciò che l’applicazione richiede effettivamente:
Numeri di sequenza. Un piccolo contatore che si incrementa a ogni invio. Il ricevitore rileva lacune (un pacchetto è andato perso), duplicati (il mittente ha ritrasmesso ma il ricevitore aveva già accettato la prima copia) e – dove il canale può riordinare – arrivi fuori ordine.
Riscontri (ACK). Un pacchetto ACK dedicato (o un bit incorporato in una risposta) che il ricevitore rimanda indietro per confermare ciascun pacchetto. Senza ACK il mittente non ha modo di sapere se i suoi dati sono arrivati.
Riscontri negativi (NACK). Un NACK inviato quando il ricevitore rileva un errore di CRC o una lacuna nella sequenza. Il mittente ritrasmette immediatamente, invece di attendere lo scadere di un timeout di ACK.
Ritrasmissione. Il mittente mantiene ogni pacchetto non riscontrato in una piccola coda e lo reinvia dopo un timeout (o su un NACK). Un limite di tentativi e un certo backoff tra i tentativi impediscono a un collegamento permanentemente guasto di ripetersi all’infinito.
Finestre scorrevoli. Consentire diversi pacchetti in transito prima di richiedere un ACK mantiene alto il throughput sui collegamenti in cui il round-trip è lungo rispetto al tempo di invio per pacchetto. Il costo è un maggiore stato lato mittente – uno slot per ogni pacchetto in transito.
Controllo di flusso. Un segnale dal ricevitore che dice al mittente di rallentare o fermarsi quando il suo buffer si sta riempiendo. Le implementazioni variano – byte espliciti XON / XOFF, concessioni basate su crediti in cui il ricevitore autorizza N pacchetti aggiuntivi alla volta, oppure le linee hardware RTS / CTS sul filo stesso. Senza controllo di flusso un mittente veloce finisce per sovraccaricare un ricevitore lento e i pacchetti vengono persi.
Versione del protocollo. Un campo di versione all’inizio del pacchetto consente al formato di evolversi. Ciascuna parte può negoziare all’avvio la versione più alta supportata da entrambi, oppure rifiutare pacchetti da peer incompatibili.
Frammentazione e riassemblaggio. Un LEN di due byte limita il pacchetto a 64 KiB; i messaggi più grandi vengono suddivisi in più pacchetti e riassemblati dall’altra parte. I metadati di frammentazione (indice del frammento, conteggio totale o un flag «altri frammenti») risiedono all’interno del payload.
Heartbeat. Un piccolo pacchetto periodico che dice «sono ancora qui». L’altra parte si accorge quando gli heartbeat si fermano e si riconnette (o fallisce in modo evidente) invece di rimanere bloccata in silenzio.
Canali. Un ID di canale o flusso nell’header in modo che un singolo collegamento fisico trasporti diversi flussi logici – un canale di controllo, un canale di telemetria, un canale di log – distinti solo da quel campo.
Autenticazione. Un breve tag calcolato dal payload e da un valore segreto che solo il mittente e il ricevitore legittimi conoscono. Il ricevitore ricalcola il tag dai byte ricevuti e rifiuta il pacchetto se i due non corrispondono. Questo rileva sia la manomissione (un aggressore ha modificato i byte) sia – se un numero di sequenza o un timestamp fa parte di ciò che il tag copre – il replay, in cui un aggressore registra un pacchetto reale dal filo e lo reinvia in seguito per far sì che il ricevitore agisca su di esso due volte.
Cifratura. Mescolare i byte del payload con una chiave segreta condivisa in modo che chiunque legga la linea senza quella chiave veda solo rumore. Solitamente combinata con il tag di autenticazione sopra – senza di esso, un aggressore può immettere spazzatura che per caso supera il CRC e il ricevitore spreca cicli cercando di decifrare cose senza senso.
Un tipico «buon» protocollo per apparecchiature industriali finisce per avere framing, doppio CRC, numeri di sequenza, ACK / NACK con ritrasmissione e heartbeat. Esempi del mondo reale degni di nota: MAVLink (telemetria dei droni, con numeri di sequenza, ID di sistema / componente e firme di pacchetto opzionali), Modbus (PLC industriali, con codici funzione e CRC) e NMEA 0183 (il protocollo ASCII parlato da ogni ricevitore GPS di consumo – messaggi basati su righe con una checksum dopo un delimitatore asterisco).