feat(framing): framing done

refactor(framing): re-name hamming codec to 7,4 to reflect algorithm used
This commit is contained in:
2025-11-30 13:44:41 +01:00
parent 0489d8fc31
commit 5bf5bfec5d
4 changed files with 77 additions and 75 deletions

View File

@@ -1,12 +1,12 @@
from hamming_8_4_codec import hamming_8_4_encode, hamming_8_4_decode
from crc import Configuration, Crc16
from hamming_7_4_codec import hamming_7_4_encode, hamming_7_4_decode
from crc import Calculator, Crc16
PROTOCOL_IDENTIFIER = 0x0
# CRC_POLY = (0xed2f << 1) + 1 # from https://users.ece.cmu.edu/~koopman/crc/index.html as "best 16-bit CRC" on 2025-10-24
ONBEAT_CRC = Configuration(Crc16.IBM_3740)
ONBEAT_CRC = Calculator(Crc16.IBM_3740)
class Onbeat_Header:
"""This class represents a header for a single packet of PLSTV."""
"""This class represents a header for a single packet of ONBEAT."""
def __init__(self, protocol_id, protocol_configuration: int, callsign: str, pkt_len: int, pkt_sequence_id: int):
if protocol_id >= 2 << 4:
@@ -20,9 +20,11 @@ class Onbeat_Header:
self.protocol_configuration = protocol_configuration
call_len = len(callsign.encode())
if call_len >= 10:
if call_len > 10:
raise OverflowError(
f"Callsign must be confined to 10 bytes, got {callsign} with length {call_len} (using UTF-8)")
for _ in range(call_len, 10):
callsign += "\0"
self.callsign = callsign
if pkt_len >= 2 << 16:
@@ -30,7 +32,7 @@ class Onbeat_Header:
f"Maximum allowed packet size is {2 << 16 - 1} got {pkt_len}")
self.pkt_len = pkt_len
if pkt_sequence_id >= 2 << 8:
if pkt_sequence_id >= 256:
raise OverflowError(
f"Packet sequence ID must be confined to 8 bits, got {pkt_sequence_id}")
self.pkt_sequence_id = pkt_sequence_id
@@ -39,27 +41,18 @@ class Onbeat_Header:
header_asints = []
header_asints += [(PROTOCOL_IDENTIFIER << 4) +
self.protocol_configuration]
header_asints += [(int.from_bytes(self.callsign.encode())
>> 8*i) % 2 << 8 for i in range(0, 10)]
header_asints += [(self.pkt_len >> 8), self.pkt_len % 2 << 8]
header_asints += [ord(c) for c in self.callsign]
header_asints += [(self.pkt_len >> 8), self.pkt_len % 256]
header_asints += [self.pkt_sequence_id]
header_crc = ONBEAT_CRC.checksum(bytes(header_asints))
header_asints += [header_crc]
return hamming_8_4_encode(header_asints)
header_asints += [header_crc >> 8, header_crc % 256]
return hamming_7_4_encode(header_asints)
def decode(self, header_encoded: list[int]):
header_corrected = hamming_8_4_decode(header_encoded)
checksum = ONBEAT_CRC.checksum(
bytes(header_corrected[14] << 8 + header_corrected[15]))
if checksum != 0:
raise ValueError(
"Checksum of header is non-zero, packet is invalid")
self.protocol_id = header_corrected[0] >> 4
self.protocol_configuration = header_corrected[0] % 16
callsign_numeric = 0
for i in range(1, 11):
callsign_numeric += header_corrected[i]
callsign_numeric <<= 8
self.callsign = int.to_bytes(callsign_numeric, byteorder="big").decode()
self.pkt_len = header_corrected[11] << 8 + header_corrected[12]
self.pkt_sequence_id = header_corrected[13]
def decode(self, header_encoded: list[int]) -> bool:
header_corrected = hamming_7_4_decode(header_encoded)
self.protocol_id = int(header_corrected[0] >> 4)
self.protocol_configuration = int(header_corrected[0] % 16)
self.callsign = bytes(header_corrected[1:11]).decode(errors="ignore")
self.pkt_len = int((header_corrected[11] << 8) + header_corrected[12])
self.pkt_sequence_id = int(header_corrected[13])
return ONBEAT_CRC.verify(bytearray(header_corrected[0:14]), (header_corrected[14] << 8) + header_corrected[15])