feat(framing): class for header with hamming(7,4)

This commit is contained in:
2025-10-30 00:26:32 +01:00
parent 70b88d356a
commit 98215f94b8
2 changed files with 162 additions and 0 deletions

97
hamming_7_4_codec.py Normal file
View File

@@ -0,0 +1,97 @@
import numpy as np
import numpy.typing as npt
GEN_MATRIX = np.asarray([(1, 1, 1, 0, 0, 0, 0),
(1, 0, 0, 1, 1, 0, 0),
(0, 1, 0, 1, 0, 1, 0),
(1, 1, 0, 1, 0, 0, 1),
])
PARITY_MATRIX = np.asarray([(0, 0, 0, 1, 1, 1, 1),
(0, 1, 1, 0, 0, 1, 1),
(1, 0, 1, 0, 1, 0, 1),
])
DECODER_MATRIX = np.asarray([(0, 0, 1, 0, 0, 0, 0,),
(0, 0, 0, 0, 1, 0, 0,),
(0, 0, 0, 0, 0, 1, 0,),
(0, 0, 0, 0, 0, 0, 1,),
])
def number_to_list(number: int, N: int) -> npt.NDArray:
"""Return the last N bits of a number as an MSB-first array"""
return np.array([(number >> (N-1-i)) % 2 for i in range(0, N)])
def list_to_number(array: npt.NDArray, N: int) -> int:
"""Reconstruct a number from the first N elements of an MSB-first array"""
output = 0
for i in range(0, N):
output += array[i] << (N-1-i)
return output
def encode_nibble(data: int) -> int:
"""Encode the lower 4 bits of data as hamming(7,4)"""
data_asarr = number_to_list(data, 4)
output_asarr = (data_asarr @ GEN_MATRIX) % 2
return list_to_number(output_asarr, 7)
def hamming_7_4_encode(data: list[int]) -> list[int]:
"""Encode a list of ints using Hamming(7,4)
The returned list is twice as long as the list in the argument"""
data_encoded = []
for element in data:
data_encoded += [encode_nibble(element >> 4),
encode_nibble(element % 16)]
return data_encoded
def decode_nibble(data: int) -> int:
"""decode a single nibble using the parity check matrix of Hamming(7,4).
This should correct 2 errors maximum."""
data_asarr = number_to_list(data, 7)
syndrome = (PARITY_MATRIX @ data_asarr) % 2
error = list_to_number(syndrome, 3)
if error == 0:
return list_to_number(data_asarr, 4)
data_asarr[error - 1] ^= 1
data_decoded = (DECODER_MATRIX @ data_asarr) % 2
return list_to_number(data_decoded, 4)
def hamming_7_4_decode(data: list[int]) -> list[int]:
"""Decode a list of ints using Hamming(7,4)
The returned list is half as long as the list in the argument"""
return [(decode_nibble(data[i]) << 4) + (decode_nibble(data[i + 1])) for i in range(0, len(data), 2)]
if __name__ == "__main__":
msg = 'TEST DE HA5PLS'
data_in = [byte for byte in bytearray(msg.encode())]
data_encoded = hamming_7_4_encode(data_in)
print(f"Data sent: {msg}")
data_corrupted_1bit = [data ^ (1 << (idx % 7))
for idx, data in enumerate(data_encoded)]
# print(f"Corrupt data with 1-bit errors: {data_corrupted_1bit}")
data_corrected_1bit = hamming_7_4_decode(data_corrupted_1bit)
data_corrected_1bit_str = [int.to_bytes(int(i), 1, "big").decode(
encoding="utf-8", errors="ignore") for i in data_corrected_1bit]
print(f"Recovered data from 1-bit errors: {data_corrected_1bit_str}")
data_corrupted_2bit = [data ^ (5 << (idx % 7))
for idx, data in enumerate(data_encoded)]
# print(f"Corrupt data with 2-bit errors: {data_corrupted_2bit}")
data_corrected_2bit = hamming_7_4_decode(data_corrupted_2bit)
data_corrected_2bit_str = [int.to_bytes(int(i), 1, "big").decode(
encoding="utf-8", errors="ignore") for i in data_corrected_2bit]
print(f"Recovered data from 1-bit errors: {data_corrected_2bit_str}")
data_corrupted_3bit = [data ^ (7 << (idx % 7))
for idx, data in enumerate(data_encoded)]
# print(f"Corrupt data with 3-bit errors: {data_corrupted_3bit}")
data_corrected_3bit = hamming_7_4_decode(data_corrupted_3bit)
data_corrected_3bit_str = [int.to_bytes(int(i), 1, "big").decode(
encoding="utf-8", errors="ignore") for i in data_corrected_3bit]
print(f"Recovered data from 1-bit errors: {data_corrected_3bit_str}")

65
header.py Normal file
View File

@@ -0,0 +1,65 @@
from hamming_8_4_codec import hamming_8_4_encode, hamming_8_4_decode
from crc import Configuration, Crc16
PROTOCOL_IDENTIFIER = 0x0
# CRC_POLY = (0xed2f << 1) + 1 # from https://users.ece.cmu.edu/~koopman/crc/index.html as "best 16-bit CRC" on 2025-10-24
ONBEAT_CRC = Configuration(Crc16.IBM_3740)
class Onbeat_Header:
"""This class represents a header for a single packet of PLSTV."""
def __init__(self, protocol_id, protocol_configuration: int, callsign: str, pkt_len: int, pkt_sequence_id: int):
if protocol_id >= 2 << 4:
raise OverflowError(
f"protocol identifier should be confined to 4 bits, got {protocol_id}")
self.protocol_id = protocol_id
if protocol_configuration >= 2 << 4:
raise OverflowError(
f"protocol configuration should be confined to 4 bits, got {protocol_configuration}")
self.protocol_configuration = protocol_configuration
call_len = len(callsign.encode())
if call_len >= 10:
raise OverflowError(
f"Callsign must be confined to 10 bytes, got {callsign} with length {call_len} (using UTF-8)")
self.callsign = callsign
if pkt_len >= 2 << 16:
raise OverflowError(
f"Maximum allowed packet size is {2 << 16 - 1} got {pkt_len}")
self.pkt_len = pkt_len
if pkt_sequence_id >= 2 << 8:
raise OverflowError(
f"Packet sequence ID must be confined to 8 bits, got {pkt_sequence_id}")
self.pkt_sequence_id = pkt_sequence_id
def encode(self) -> list[int]:
header_asints = []
header_asints += [(PROTOCOL_IDENTIFIER << 4) +
self.protocol_configuration]
header_asints += [(int.from_bytes(self.callsign.encode())
>> 8*i) % 2 << 8 for i in range(0, 10)]
header_asints += [(self.pkt_len >> 8), self.pkt_len % 2 << 8]
header_asints += [self.pkt_sequence_id]
header_crc = ONBEAT_CRC.checksum(bytes(header_asints))
header_asints += [header_crc]
return hamming_8_4_encode(header_asints)
def decode(self, header_encoded: list[int]):
header_corrected = hamming_8_4_decode(header_encoded)
checksum = ONBEAT_CRC.checksum(
bytes(header_corrected[14] << 8 + header_corrected[15]))
if checksum != 0:
raise ValueError(
"Checksum of header is non-zero, packet is invalid")
self.protocol_id = header_corrected[0] >> 4
self.protocol_configuration = header_corrected[0] % 16
callsign_numeric = 0
for i in range(1, 11):
callsign_numeric += header_corrected[i]
callsign_numeric <<= 8
self.callsign = int.to_bytes(callsign_numeric, byteorder="big").decode()
self.pkt_len = header_corrected[11] << 8 + header_corrected[12]
self.pkt_sequence_id = header_corrected[13]