diff --git a/hamming_7_4_codec.py b/hamming_7_4_codec.py new file mode 100644 index 0000000..ebcb296 --- /dev/null +++ b/hamming_7_4_codec.py @@ -0,0 +1,97 @@ +import numpy as np +import numpy.typing as npt + +GEN_MATRIX = np.asarray([(1, 1, 1, 0, 0, 0, 0), + (1, 0, 0, 1, 1, 0, 0), + (0, 1, 0, 1, 0, 1, 0), + (1, 1, 0, 1, 0, 0, 1), + ]) +PARITY_MATRIX = np.asarray([(0, 0, 0, 1, 1, 1, 1), + (0, 1, 1, 0, 0, 1, 1), + (1, 0, 1, 0, 1, 0, 1), + ]) +DECODER_MATRIX = np.asarray([(0, 0, 1, 0, 0, 0, 0,), + (0, 0, 0, 0, 1, 0, 0,), + (0, 0, 0, 0, 0, 1, 0,), + (0, 0, 0, 0, 0, 0, 1,), + ]) + + +def number_to_list(number: int, N: int) -> npt.NDArray: + """Return the last N bits of a number as an MSB-first array""" + return np.array([(number >> (N-1-i)) % 2 for i in range(0, N)]) + + +def list_to_number(array: npt.NDArray, N: int) -> int: + """Reconstruct a number from the first N elements of an MSB-first array""" + output = 0 + for i in range(0, N): + output += array[i] << (N-1-i) + return output + + +def encode_nibble(data: int) -> int: + """Encode the lower 4 bits of data as hamming(7,4)""" + data_asarr = number_to_list(data, 4) + output_asarr = (data_asarr @ GEN_MATRIX) % 2 + return list_to_number(output_asarr, 7) + + +def hamming_7_4_encode(data: list[int]) -> list[int]: + """Encode a list of ints using Hamming(7,4) + The returned list is twice as long as the list in the argument""" + data_encoded = [] + for element in data: + data_encoded += [encode_nibble(element >> 4), + encode_nibble(element % 16)] + return data_encoded + + +def decode_nibble(data: int) -> int: + """decode a single nibble using the parity check matrix of Hamming(7,4). + This should correct 2 errors maximum.""" + data_asarr = number_to_list(data, 7) + syndrome = (PARITY_MATRIX @ data_asarr) % 2 + error = list_to_number(syndrome, 3) + if error == 0: + return list_to_number(data_asarr, 4) + data_asarr[error - 1] ^= 1 + data_decoded = (DECODER_MATRIX @ data_asarr) % 2 + return list_to_number(data_decoded, 4) + + +def hamming_7_4_decode(data: list[int]) -> list[int]: + """Decode a list of ints using Hamming(7,4) + The returned list is half as long as the list in the argument""" + return [(decode_nibble(data[i]) << 4) + (decode_nibble(data[i + 1])) for i in range(0, len(data), 2)] + + +if __name__ == "__main__": + msg = 'TEST DE HA5PLS' + data_in = [byte for byte in bytearray(msg.encode())] + data_encoded = hamming_7_4_encode(data_in) + print(f"Data sent: {msg}") + + data_corrupted_1bit = [data ^ (1 << (idx % 7)) + for idx, data in enumerate(data_encoded)] + # print(f"Corrupt data with 1-bit errors: {data_corrupted_1bit}") + data_corrected_1bit = hamming_7_4_decode(data_corrupted_1bit) + data_corrected_1bit_str = [int.to_bytes(int(i), 1, "big").decode( + encoding="utf-8", errors="ignore") for i in data_corrected_1bit] + print(f"Recovered data from 1-bit errors: {data_corrected_1bit_str}") + + data_corrupted_2bit = [data ^ (5 << (idx % 7)) + for idx, data in enumerate(data_encoded)] + # print(f"Corrupt data with 2-bit errors: {data_corrupted_2bit}") + data_corrected_2bit = hamming_7_4_decode(data_corrupted_2bit) + data_corrected_2bit_str = [int.to_bytes(int(i), 1, "big").decode( + encoding="utf-8", errors="ignore") for i in data_corrected_2bit] + print(f"Recovered data from 1-bit errors: {data_corrected_2bit_str}") + + data_corrupted_3bit = [data ^ (7 << (idx % 7)) + for idx, data in enumerate(data_encoded)] + # print(f"Corrupt data with 3-bit errors: {data_corrupted_3bit}") + data_corrected_3bit = hamming_7_4_decode(data_corrupted_3bit) + data_corrected_3bit_str = [int.to_bytes(int(i), 1, "big").decode( + encoding="utf-8", errors="ignore") for i in data_corrected_3bit] + print(f"Recovered data from 1-bit errors: {data_corrected_3bit_str}") diff --git a/header.py b/header.py new file mode 100644 index 0000000..d185dc4 --- /dev/null +++ b/header.py @@ -0,0 +1,65 @@ +from hamming_8_4_codec import hamming_8_4_encode, hamming_8_4_decode +from crc import Configuration, Crc16 +PROTOCOL_IDENTIFIER = 0x0 +# CRC_POLY = (0xed2f << 1) + 1 # from https://users.ece.cmu.edu/~koopman/crc/index.html as "best 16-bit CRC" on 2025-10-24 +ONBEAT_CRC = Configuration(Crc16.IBM_3740) + + +class Onbeat_Header: + """This class represents a header for a single packet of PLSTV.""" + + def __init__(self, protocol_id, protocol_configuration: int, callsign: str, pkt_len: int, pkt_sequence_id: int): + if protocol_id >= 2 << 4: + raise OverflowError( + f"protocol identifier should be confined to 4 bits, got {protocol_id}") + self.protocol_id = protocol_id + + if protocol_configuration >= 2 << 4: + raise OverflowError( + f"protocol configuration should be confined to 4 bits, got {protocol_configuration}") + self.protocol_configuration = protocol_configuration + + call_len = len(callsign.encode()) + if call_len >= 10: + raise OverflowError( + f"Callsign must be confined to 10 bytes, got {callsign} with length {call_len} (using UTF-8)") + self.callsign = callsign + + if pkt_len >= 2 << 16: + raise OverflowError( + f"Maximum allowed packet size is {2 << 16 - 1} got {pkt_len}") + self.pkt_len = pkt_len + + if pkt_sequence_id >= 2 << 8: + raise OverflowError( + f"Packet sequence ID must be confined to 8 bits, got {pkt_sequence_id}") + self.pkt_sequence_id = pkt_sequence_id + + def encode(self) -> list[int]: + header_asints = [] + header_asints += [(PROTOCOL_IDENTIFIER << 4) + + self.protocol_configuration] + header_asints += [(int.from_bytes(self.callsign.encode()) + >> 8*i) % 2 << 8 for i in range(0, 10)] + header_asints += [(self.pkt_len >> 8), self.pkt_len % 2 << 8] + header_asints += [self.pkt_sequence_id] + header_crc = ONBEAT_CRC.checksum(bytes(header_asints)) + header_asints += [header_crc] + return hamming_8_4_encode(header_asints) + + def decode(self, header_encoded: list[int]): + header_corrected = hamming_8_4_decode(header_encoded) + checksum = ONBEAT_CRC.checksum( + bytes(header_corrected[14] << 8 + header_corrected[15])) + if checksum != 0: + raise ValueError( + "Checksum of header is non-zero, packet is invalid") + self.protocol_id = header_corrected[0] >> 4 + self.protocol_configuration = header_corrected[0] % 16 + callsign_numeric = 0 + for i in range(1, 11): + callsign_numeric += header_corrected[i] + callsign_numeric <<= 8 + self.callsign = int.to_bytes(callsign_numeric, byteorder="big").decode() + self.pkt_len = header_corrected[11] << 8 + header_corrected[12] + self.pkt_sequence_id = header_corrected[13]