from header import Onbeat_Header, PROTOCOL_IDENTIFIER from reedsolo import RSCodec import numpy as np PROTOCOL_V1_NOFEC = 1 PROTOCOL_V1_FEC = 2 RSC = RSCodec(255-223,255) def decode_packet(packet: list[int]) -> tuple[Onbeat_Header, list[int]]: if len(packet) < 32: raise OverflowError(f"Packet is too small to be valid, len: {len(packet)}") header_raw = packet[0:32] header = Onbeat_Header(0, 0, "", 0, 0) if header.decode(header_raw) != True: raise RuntimeError("invalid CRC") if header.protocol_id != PROTOCOL_IDENTIFIER: raise ValueError(f"Protocol ID mismatch, got{header.protocol_id}, expected {PROTOCOL_IDENTIFIER}") if header.pkt_len > len(packet) - 32: raise OverflowError(f"Header not fully captured, len:{header.pkt_len}, maximal: {len(packet) - 32}") elif header.pkt_len == 0: return header, [] payload = packet[32:32+header.pkt_len] if header.protocol_configuration == PROTOCOL_V1_FEC: payload_decoded = np.empty((4, min(len(payload)//4-32, 255)), dtype=int) for i in range(0,4): payload_decoded[i] = RSC.decode(bytearray(payload[i::4]))[0] payload = payload_decoded.flatten(order='F').tolist() elif header.protocol_configuration == PROTOCOL_V1_NOFEC: pass else: raise ValueError(f"Unknown protocol configuration: {header.protocol_configuration}") return header, payload def encode_packet(payload: list[int], callsign: str, sequence: int, rs_coding: bool = False) -> list[int]: protocol_configuration = PROTOCOL_V1_NOFEC if rs_coding == True: protocol_configuration = PROTOCOL_V1_FEC if len(payload) != 0: for _ in range(0, len(payload)%4): payload.append(0) payload_encoded = np.empty((4, min((len(payload))//4+32, 255)), dtype=int) for i in range(0,4): payload_encoded[i] = RSC.encode(bytearray(payload[i::4])) payload = payload_encoded.flatten(order='F').tolist() header = Onbeat_Header(0, protocol_configuration, callsign, len(payload), sequence) return header.encode() + payload