refactor(framing): re-name hamming codec to 7,4 to reflect algorithm used
47 lines
2.1 KiB
Python
47 lines
2.1 KiB
Python
from header import Onbeat_Header, PROTOCOL_IDENTIFIER
|
|
from reedsolo import RSCodec
|
|
import numpy as np
|
|
|
|
PROTOCOL_V1_NOFEC = 1
|
|
PROTOCOL_V1_FEC = 2
|
|
RSC = RSCodec(255-223,255)
|
|
|
|
def decode_packet(packet: list[int]) -> tuple[Onbeat_Header, list[int]]:
|
|
if len(packet) < 32:
|
|
raise OverflowError(f"Packet is too small to be valid, len: {len(packet)}")
|
|
header_raw = packet[0:32]
|
|
header = Onbeat_Header(0, 0, "", 0, 0)
|
|
if header.decode(header_raw) != True:
|
|
raise RuntimeError("invalid CRC")
|
|
if header.protocol_id != PROTOCOL_IDENTIFIER:
|
|
raise ValueError(f"Protocol ID mismatch, got{header.protocol_id}, expected {PROTOCOL_IDENTIFIER}")
|
|
if header.pkt_len > len(packet) - 32:
|
|
raise OverflowError(f"Header not fully captured, len:{header.pkt_len}, maximal: {len(packet) - 32}")
|
|
elif header.pkt_len == 0:
|
|
return header, []
|
|
payload = packet[32:32+header.pkt_len]
|
|
if header.protocol_configuration == PROTOCOL_V1_FEC:
|
|
payload_decoded = np.empty((4, min(len(payload)//4-32, 255)), dtype=int)
|
|
for i in range(0,4):
|
|
payload_decoded[i] = RSC.decode(bytearray(payload[i::4]))[0]
|
|
payload = payload_decoded.flatten(order='F').tolist()
|
|
elif header.protocol_configuration == PROTOCOL_V1_NOFEC:
|
|
pass
|
|
else:
|
|
raise ValueError(f"Unknown protocol configuration: {header.protocol_configuration}")
|
|
return header, payload
|
|
|
|
def encode_packet(payload: list[int], callsign: str, sequence: int, rs_coding: bool = False) -> list[int]:
|
|
protocol_configuration = PROTOCOL_V1_NOFEC
|
|
if rs_coding == True:
|
|
protocol_configuration = PROTOCOL_V1_FEC
|
|
if len(payload) != 0:
|
|
for _ in range(0, len(payload)%4):
|
|
payload.append(0)
|
|
payload_encoded = np.empty((4, min((len(payload))//4+32, 255)), dtype=int)
|
|
for i in range(0,4):
|
|
payload_encoded[i] = RSC.encode(bytearray(payload[i::4]))
|
|
payload = payload_encoded.flatten(order='F').tolist()
|
|
header = Onbeat_Header(0, protocol_configuration, callsign, len(payload), sequence)
|
|
return header.encode() + payload
|