Files
dougal-software/bin/smsrc.py

96 lines
2.3 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/python3
"""
SmartSource parsing functions.
"""
import mmap
import struct
from collections import namedtuple
def _str (v):
return str(v, 'ascii').strip()
def _tstamp (v):
return str(v) # TODO
def _f10 (v):
return float(v)/10
def _ignore (v):
return None
st_smartsource_header = struct.Struct(">6s 4s 30s 10s 2s 1s 17s 1s 1s 2s 2s 2s 2s 2s 4s 6s 5s 5s 6s 4s 88s")
fn_smartsource_header = (
_str, int, _str, int, int, _str, _tstamp, int, int, int, int, int, int, int, int, int,
float, float, float, int, _str
)
SmartsourceHeader = namedtuple("SmartsourceHeader", "header blk_siz line shot mask trg_mode time src_number num_subarray num_guns num_active num_delta num_auto num_nofire spread volume avg_delta std_delta baro_press manifold spare")
st_smartsource_gun = struct.Struct(">1s 2s 1s 1s 1s 1s 1s 6s 6s 4s 4s 4s 4s 4s")
fn_smartsource_gun = (
int, int, int, _str, _str, lambda v: v=="Y", _str,
_f10, _f10, _f10, _f10,
int, int, int
)
SmartsourceGun = namedtuple("SmartsourceGun", "string gun source mode detect autofire spare aim_point firetime delay depth pressure volume filltime")
SmartSourceRecord = namedtuple("SmartSourceRecord", "header guns")
def safe_apply (iter):
def safe_fn (fn, v):
try:
return fn(v)
except ValueError:
return None
return [safe_fn(v[0], v[1]) for v in iter]
def _check_chunk_size(chunk, size):
return len(chunk) == size
def from_file(path):
records = []
with open(path, "rb") as fd:
with mmap.mmap(fd.fileno(), length=0, access=mmap.ACCESS_READ) as buffer:
while True:
offset = buffer.find(b"*SMSRC")
if offset == -1:
break
buffer = buffer[offset:]
record, length = read_smartsource(buffer)
if record is not None:
records.append(record)
if length != 0:
buffer = buffer[length:]
else:
buffer = buffer[1:]
return records
def read_smartsource(buffer):
length = 0
header = st_smartsource_header.unpack_from(buffer, 0)
length += st_smartsource_header.size
header = SmartsourceHeader(*safe_apply(zip(fn_smartsource_header, header)))
record = dict(header._asdict())
record["guns"] = []
for _ in range(header.num_guns):
gun = st_smartsource_gun.unpack_from(buffer, length)
record["guns"].append(SmartsourceGun(*safe_apply(zip(fn_smartsource_gun, gun))))
length += st_smartsource_gun.size
return (record, length)