3.20. פרוטוקולים טוריים, מסגור (framing) ו-CRC

UART בקוד העביר בייטים בין שני קצוות. כשלעצמו, זה לא מספיק כדי לבנות קישור אמין. שלוש בעיות צצות ברגע שהתקן אמיתי נמצא בקצה השני של החוט:

  • היכן מתחילה ומסתיימת הודעה? הבייטים מגיעים בזרם ללא תוחם מובנה. אם הצד המקבל מפספס את הבייט הראשון (מופעל לאחר השולח; הפרעה חשמלית קצרה על הקו), כל בייט שאחריו מוסט באחד עד שהצד המקבל מוצא נקודת סנכרון מחדש (resync) טרייה.

  • מה אורכה של כל הודעה? קריאת חיישן בגודל 32 בייט ותשובת סטטוס בגודל 4 בייט נראות זהות ברמת הבייט. הצד המקבל זקוק לדרך לדעת כמה בייטים שייכים להודעה הנוכחית.

  • האם הבייטים הגיעו שלמים? רעש יכול להפוך ביטים בודדים. ללא בדיקה, הצד המקבל פועל בשמחה על נתונים פגומים.

התשובה הסטנדרטית לכל שלוש הבעיות היא לעטוף את הנתונים במסגרת מנה (packet frame): רצף בייטים ידוע בהתחלה, שדה אורך, המטען (payload) עצמו, וסכום ביקורת (checksum) בסוף.

3.20.1. מסגור מנות (Packet framing)

פורמט מסגור טיפוסי:

Six fields drawn in sequence: a two-byte HEADER labelled 0xAA 0x55, a one-byte CMD field selecting which command this packet carries, a two-byte LEN field giving the payload size, a one-byte HCRC field covering HEADER plus CMD plus LEN, a variable-length PAYLOAD of LEN bytes whose format depends on the CMD, and a four-byte DCRC field covering the payload.

מנה ממוסגרת עם CRC נפרדים לכותרת ולנתונים: כותרת (בייטי קסם), פקודה, אורך, CRC של הכותרת, מטען, CRC של הנתונים.

כל שדה ממלא תפקיד אחד:

  • כותרת (בייטי קסם). רצף בייטים קבוע ויוצא דופן – לרוב שני בייטים כמו 0xAA 0x55 – שהצד המקבל סורק עבורו בזרם הנכנס. כשהוא מוצא את הרצף, הוא יודע שמנה חדשה מתחילה ויכול להשליך כל זבל שהגיע לפני כן.

  • פקודה. בייט יחיד שאומר מה היא המנה. ערכי פקודה שונים משתמשים בפורמטי מטען שונים – פקודה אחת עשויה להיות ”הגדר זווית סרבו“ עם שני בייטי מטען, אחרת עשויה להיות ”קרא חיישן“ ללא מטען, ואחרת עשויה להיות ”הודעת יומן“ עם מחרוזת. הצד המקבל מנתב לפי בייט הפקודה כדי לדעת כיצד לפרש את שאר המנה.

  • אורך. שני בייטים הנותנים את גודל המטען בבייטים (little-endian כאן), המאפשרים מטענים עד כ-64 KiB. הצד המקבל קורא בדיוק כמות בייטים זו ברגע ש-CRC הכותרת אומת.

  • CRC של הכותרת. סכום ביקורת בן בייט אחד מעל שדות HEADER, CMD ו-LEN. הצד המקבל בודק אותו לפני קריאת מטען כלשהו, כך ש-LEN פגום נתפס לאחר קומץ בייטים בלבד (ראו את חלק ה-CRC להלן להסבר מדוע זה חשוב).

  • מטען. נתוני יישום ספציפיים לפקודה, באורך של בדיוק LEN בייטים. הפורמט נקבע על ידי בייט הפקודה: רשומה ארוזה בעזרת struct של שדות ברוחב קבוע, מחרוזת, זיכרון גולמי – מה ששני הצדדים מסכימים עליו עבור אותה פקודה.

  • CRC של הנתונים. CRC בן ארבעה בייטים מעל בייטי המטען. הצד המקבל מחשב אותו מחדש מהבייטים שזה עתה קרא ומשליך את המנה אם הוא אינו תואם.

3.20.2. CRC

ה“סכום ביקורת“ הפשוט ביותר הוא סכום כל הבייטים, מודולו 256 או 65536. הוא תופס את רוב היפוכי הביט הבודדים אך מפספס הרבה שגיאות מרובות-ביט ומתעלם מסדר הבייטים.

בדיקת יתירות מחזורית (cyclic redundancy check, CRC) היא השדרוג הסטנדרטי. היא מתייחסת לקלט כאל מספר בינארי ארוך אחד ומחלקת אותו (בדרך מיוחדת) בפולינום קבוע; שארית החלוקה היא ה-CRC. פולינומים שונים תופסים מחלקות שונות של שגיאות; הפולינומים הנפוצים בני 8, 16 ו-32 הביט תופסים כל מקבץ שגיאות (burst) קצר מרוחבם בתוספת חלק גדול ממקבצים ארוכים יותר.

3.20.2.1. מדוע שני CRC

תרשים המנה לעיל נושא שני CRC נפרדים – אחד מעל הכותרת (HEADER, CMD, LEN) ואחד מעל המטען. זהו מה שמסגור עמיד באמת זקוק לו, בגלל האופן שבו CRC נגרר יחיד נכשל כאשר שדה ה-LEN עצמו נפגם בתעבורה:

  • הצד המקבל פועל על ה-LEN הפגום וקורא כמות זו של בייטים מהקו – אולי הרבה יותר ממה שהשולח התכוון.

  • רק ה-CRC הנגרר אומר לבסוף לצד המקבל שמשהו השתבש, ורק לאחר שכל אותם בייטים נצרכו.

  • בעוד שהמנתח (parser) תקוע בהמתנה למספר הבייטים השגוי, מנות אמיתיות המגיעות מאחורי הפגומה נבלעות כמטען, והצד המקבל מאבד מספר מנות במקום רק את האחת.

פיצול ה-CRC פותר זאת:

  • CRC הכותרת מכסה את HEADER, CMD ו-LEN. הצד המקבל בודק אותו לפני קריאת מטען כלשהו, כך ש-LEN פגום נתפס לאחר קומץ בייטים והמנתח מסתנכרן מחדש מיד, ומפיל רק את המנה הפגומה האחת.

  • CRC הנתונים מכסה את המטען. ברגע ש-CRC הכותרת עבר, הצד המקבל יודע שהוא יכול לסמוך על LEN, קורא בדיוק כמות זו של בייטי מטען, ומאמת אותם מול CRC הנתונים.

קביעת גודל נפוצה – וזו שבה עמוד זה משתמש – היא בייט אחד עבור CRC הכותרת (CRC-8 מספיק בהחלט לכותרת בת חמישה בייטים) וארבעה בייטים עבור CRC הנתונים (CRC-32 מכסה קילובייטים רבים של מטען בקצב התנגשויות נמוך עד אפסי).

3.20.2.2. פונקציות עזר

MicroPython כוללת את binascii.crc32() עבור ה-CRC בן ארבעת הבייטים ישירות. עבור CRC הכותרת בן הבייט הבודד, פונקציית עזר קטנה המשתמשת בפולינום שבו משתמשים התקני 1-wire של Maxim (0x8C בצורה משוקפת) קצרה מספיק כדי לכתוב אותה בשורה:

def crc8(data: bytes) -> int:
    crc = 0
    for byte in data:
        crc ^= byte
        for _ in range(8):
            crc = (crc >> 1) ^ 0x8C if crc & 1 else crc >> 1
    return crc & 0xFF

מקודד שלם משלב את שני ה-CRC בפונקציה אחת:

import binascii
import struct

def encode_packet(cmd: int, payload: bytes) -> bytes:
    header = b"\xAA\x55" + bytes([cmd]) + struct.pack("<H", len(payload))
    hcrc   = crc8(header)
    dcrc   = binascii.crc32(payload)
    return header + bytes([hcrc]) + payload + struct.pack("<I", dcrc)

הפונקציה ההפוכה משחזרת את הפקודה ואת המטען ממנה שלמה, או מחזירה None אם אחת מבדיקות ה-CRC נכשלת:

def decode_packet(packet: bytes):
    # Layout: HEADER(2) + CMD(1) + LEN(2) + HCRC(1) + PAYLOAD(LEN) + DCRC(4)
    if len(packet) < 10 or packet[0:2] != b"\xAA\x55":
        return None

    header = packet[0:5]
    if crc8(header) != packet[5]:
        return None                       # header CRC mismatch

    cmd    = packet[2]
    length = struct.unpack("<H", packet[3:5])[0]
    if len(packet) != 6 + length + 4:
        return None                       # truncated or oversized

    payload       = packet[6:6 + length]
    received_dcrc = struct.unpack("<I", packet[6 + length:])[0]
    if binascii.crc32(payload) != received_dcrc:
        return None                       # data CRC mismatch

    return cmd, bytes(payload)

בפועל, הצד המקבל אינו מקבל לידיו מנה שלמה – הבייטים מגיעים אחד בכל פעם דרך ה-UART, ושולח שמשתהה באמצע מנה (או קו רועש המאבד בייט) אינו יכול פשוט להיקרא באמצעות read() לתוך חוצץ (buffer) בגודל הנכון. החלק הבא מריץ את אותה לוגיקת פענוח בייט אחר בייט כמכונת מצבים.

3.20.3. צד מקבל מסוג מכונת מצבים

הצד המקבל אינו יכול פשוט לקרוא ל-uart.read(N) עבור N קבוע כלשהו – הוא אינו יודע כמה בייטים תהיה המנה הבאה, וכל זבל על הקו מפיל את היישור. הפתרון הוא מכונת מצבים קטנה הצורכת את הבייטים אחד בכל פעם ומגיבה בהתאם למיקומה במנה. הלולאה הראשית דוגמת את any() כדי לראות כמה בייטים נמצאים בחוצץ, מרוקנת אותם בקריאת read() אחת, ומזינה כל בייט דרך מכונת המצבים:

import time
import binascii
import struct
from machine import UART

HEADER = b"\xAA\x55"
HEADER_LEN = len(HEADER)

# States, in the order the receiver walks through them per packet.
HUNT_FOR_HEADER = 0
READ_COMMAND    = 1
READ_LENGTH     = 2
READ_HEADER_CRC = 3
READ_PAYLOAD    = 4
READ_DATA_CRC   = 5

uart = UART(3, baudrate=115200)

# Receiver state plus partial-field buffers.
state        = HUNT_FOR_HEADER
matched      = 0              # bytes of HEADER matched so far
cmd          = 0              # CMD captured in READ_COMMAND
length_bytes = bytearray()    # raw LEN bytes (kept for the header CRC)
length       = 0              # unpacked LEN
payload      = bytearray()    # payload bytes accumulated in READ_PAYLOAD
crc_bytes    = bytearray()    # DCRC bytes accumulated in READ_DATA_CRC

def handle_packet(cmd: int, payload: bytes) -> None:
    print("cmd", cmd, "payload", payload)

while True:
    # Drain whatever bytes have arrived since the last poll. Idle
    # briefly when the line is quiet so the loop is not a busy spin.
    n = uart.any()
    if not n:
        time.sleep_ms(1)
        continue

    for b in uart.read(n):
        if state == HUNT_FOR_HEADER:
            # Walk the magic bytes. On a mismatch, back off by one
            # so a stray HEADER[0] in the noise still counts as a
            # possible start.
            if b == HEADER[matched]:
                matched += 1
                if matched == HEADER_LEN:
                    state = READ_COMMAND
                    matched = 0
            else:
                matched = 1 if b == HEADER[0] else 0

        elif state == READ_COMMAND:
            cmd = b
            length_bytes = bytearray()
            state = READ_LENGTH

        elif state == READ_LENGTH:
            # LEN is two bytes little-endian.
            length_bytes.append(b)
            if len(length_bytes) == 2:
                length = struct.unpack("<H", length_bytes)[0]
                state = READ_HEADER_CRC

        elif state == READ_HEADER_CRC:
            # Verify the CRC over HEADER + CMD + LEN before
            # committing to read LEN payload bytes. A mismatch
            # aborts here, after just five header bytes -- the
            # next valid header re-syncs quickly.
            expected = crc8(HEADER + bytes([cmd]) + length_bytes)
            if b == expected:
                payload = bytearray()
                state = READ_PAYLOAD
            else:
                state = HUNT_FOR_HEADER

        elif state == READ_PAYLOAD:
            payload.append(b)
            if len(payload) == length:
                crc_bytes = bytearray()
                state = READ_DATA_CRC

        elif state == READ_DATA_CRC:
            # Verify the CRC over the payload and either deliver
            # the packet or drop it. Either way, go back to
            # looking for the next header.
            crc_bytes.append(b)
            if len(crc_bytes) == 4:
                expected = binascii.crc32(payload)
                received = struct.unpack("<I", crc_bytes)[0]
                if expected == received:
                    handle_packet(cmd, bytes(payload))
                state = HUNT_FOR_HEADER

כל בייט מקדם את מכונת המצבים בצעד אחד, או נסוג ל-HUNT_FOR_HEADER לאחר מנה שלמה, CRC כותרת פגום, או CRC נתונים פגום. זבל על הקו שאינו תואם את הכותרת מושלך בשקט; הכותרת התקפה הבאה מסנכרנת מחדש את הצד המקבל. תכונת הבטיחות המרכזית נובעת מ-CRC הכותרת: אם שדה ה-LEN פגום, המנתח תופס זאת לאחר בדיקת CRC הכותרת (קומץ בייטים), ולא לאחר התחייבות לקרוא מספר שגוי לחלוטין של בייטי מטען.

3.20.4. מעבר לבסיס

המסגור לעיל הוא המינימום שקישור טורי זקוק לו כדי להתאושש מרעש קו: קסם כותרת, אורך, פקודה, ושני CRC. הוא מזהה פגיעה ומסתנכרן מחדש לאחר בייטים מעוותים, אך הוא מוותר על מנות פגומות במקום להעביר אותן, והוא משאיר את השולח ללא מושג מה הצד המקבל שמע בפועל.

פרוטוקולים טוריים מהעולם האמיתי מוסיפים שכבות של תכונות מעל אותו בסיס. לא כל קישור משובץ זקוק לכולן – בחרו את מה שהיישום באמת דורש:

  • מספרי רצף. מונה קטן שמתקדם בכל שליחה. הצד המקבל מזהה פערים (מנה אבדה), כפילויות (השולח שידר מחדש אך הצד המקבל כבר קיבל את העותק הראשון), וכן – כאשר הערוץ יכול לשנות סדר – הגעות מחוץ לסדר.

  • אישורי קבלה. מנת ACK ייעודית (או ביט מוטמע בתשובה) שהצד המקבל שולח בחזרה כדי לאשר כל מנה. ללא ACK אין לשולח דרך לדעת שהנתונים שלו הגיעו.

  • אישורי קבלה שליליים. NACK הנשלח כאשר הצד המקבל רואה כשל CRC או פער ברצף. השולח משדר מחדש מיד, במקום להמתין לפקיעת תפוגת ACK.

  • שידור מחדש. השולח שומר כל מנה לא מאושרת בתור קטן ושולח אותה מחדש לאחר תפוגת זמן (או עם NACK). מגבלת ניסיונות חוזרים והשהיה מסוימת בין הניסיונות מונעת מקישור שבור לצמיתות להיכנס ללולאה לנצח.

  • חלונות הזזה (sliding windows). התרת מספר מנות בתעופה לפני דרישת ACK שומרת על תפוקה גבוהה בקישורים שבהם זמן הלוך-ושוב ארוך בהשוואה לזמן השליחה לכל מנה. המחיר הוא יותר מצב בצד השולח – חריץ אחד לכל מנה בתעופה.

  • בקרת זרימה. אות מהצד המקבל המורה לשולח להאט או להשהות כאשר החוצץ שלו מתמלא. המימושים משתנים – בייטי XON / XOFF מפורשים, הענקות מבוססות אשראי שבהן הצד המקבל מתיר N מנות נוספות בכל פעם, או קווי החומרה RTS / CTS על החוט עצמו. ללא בקרת זרימה, שולח מהיר מציף בסופו של דבר צד מקבל איטי ומנות נושרות.

  • גרסת פרוטוקול. שדה גרסה מוקדם במנה מאפשר לפורמט להתפתח. כל צד יכול לנהל משא ומתן על הגרסה הגבוהה ביותר ששניהם תומכים בה בעת ההפעלה, או לדחות מנות מעמיתים לא תואמים.

  • קיטוע והרכבה מחדש. LEN בן שני בייטים מגביל את המנה ל-64 KiB; הודעות גדולות מכך מפוצלות למספר מנות ומורכבות מחדש בצד השני. מטא-נתוני הקיטוע (אינדקס מקטע, ספירה כוללת, או דגל ”עוד מקטעים“) שוכנים בתוך המטען.

  • פעימות לב (heartbeats). מנה תקופתית קטנה שאומרת ”אני עדיין כאן“. הצד השני שם לב כאשר פעימות הלב נפסקות ומתחבר מחדש (או נכשל ברעש) במקום להיתקע בשקט.

  • ערוצים. מזהה ערוץ או זרם בכותרת כך שקישור פיזי אחד נושא מספר זרמים לוגיים – ערוץ בקרה, ערוץ טלמטריה, ערוץ יומן – הנבדלים זה מזה רק על ידי אותו שדה.

  • אימות. תג קצר המחושב מהמטען ומערך סודי שרק השולח והצד המקבל הלגיטימיים מכירים. הצד המקבל מחשב את התג שוב מהבייטים שקיבל ודוחה את המנה אם השניים אינם תואמים. זה תופס גם חבלה (תוקף שינה את הבייטים) וכן – אם מספר רצף או חותמת זמן הם חלק ממה שהתג מכסה – שחזור (replay), שבו תוקף מקליט מנה אמיתית מהקו ושולח אותה מחדש מאוחר יותר כדי לגרום לצד המקבל לפעול עליה פעמיים.

  • הצפנה. ערבול בייטי המטען עם מפתח סודי משותף כך שכל מי שקורא את הקו ללא אותו מפתח רואה רק רעש. בדרך כלל בשילוב עם תג האימות לעיל – בלעדיו, תוקף יכול להזין זבל שבמקרה עובר את ה-CRC והצד המקבל מבזבז מחזורים בניסיון לפענח שטויות.

פרוטוקול ”טוב“ טיפוסי עבור ציוד תעשייתי מסתיים עם מסגור, CRC כפול, מספרי רצף, ACK / NACK עם שידור מחדש, ופעימות לב. דוגמאות מהעולם האמיתי ששוות מבט: MAVLink (טלמטריית רחפנים, עם מספרי רצף, מזהי מערכת / רכיב, וחתימות מנה אופציונליות), Modbus (בקרי PLC תעשייתיים, עם קודי פונקציה ו-CRC), ו-NMEA 0183 (פרוטוקול ה-ASCII שכל מקלט GPS צרכני מדבר – הודעות מבוססות-שורה עם סכום ביקורת לאחר תוחם כוכבית).