mirror of
https://gitlab.com/wgp/dougal/software.git
synced 2025-12-06 13:07:08 +00:00
96 lines
2.3 KiB
Python
96 lines
2.3 KiB
Python
|
|
#!/usr/bin/python3
|
||
|
|
|
||
|
|
"""
|
||
|
|
SmartSource parsing functions.
|
||
|
|
"""
|
||
|
|
|
||
|
|
import mmap
|
||
|
|
import struct
|
||
|
|
from collections import namedtuple
|
||
|
|
|
||
|
|
def _str (v):
|
||
|
|
return str(v, 'ascii').strip()
|
||
|
|
|
||
|
|
def _tstamp (v):
|
||
|
|
return str(v) # TODO
|
||
|
|
|
||
|
|
def _f10 (v):
|
||
|
|
return float(v)/10
|
||
|
|
|
||
|
|
def _ignore (v):
|
||
|
|
return None
|
||
|
|
|
||
|
|
st_smartsource_header = struct.Struct(">6s 4s 30s 10s 2s 1s 17s 1s 1s 2s 2s 2s 2s 2s 4s 6s 5s 5s 6s 4s 88s")
|
||
|
|
|
||
|
|
fn_smartsource_header = (
|
||
|
|
_str, int, _str, int, int, _str, _tstamp, int, int, int, int, int, int, int, int, int,
|
||
|
|
float, float, float, int, _str
|
||
|
|
)
|
||
|
|
|
||
|
|
SmartsourceHeader = namedtuple("SmartsourceHeader", "header blk_siz line shot mask trg_mode time src_number num_subarray num_guns num_active num_delta num_auto num_nofire spread volume avg_delta std_delta baro_press manifold spare")
|
||
|
|
|
||
|
|
st_smartsource_gun = struct.Struct(">1s 2s 1s 1s 1s 1s 1s 6s 6s 4s 4s 4s 4s 4s")
|
||
|
|
|
||
|
|
fn_smartsource_gun = (
|
||
|
|
int, int, int, _str, _str, lambda v: v=="Y", _str,
|
||
|
|
_f10, _f10, _f10, _f10,
|
||
|
|
int, int, int
|
||
|
|
)
|
||
|
|
|
||
|
|
SmartsourceGun = namedtuple("SmartsourceGun", "string gun source mode detect autofire spare aim_point firetime delay depth pressure volume filltime")
|
||
|
|
|
||
|
|
SmartSourceRecord = namedtuple("SmartSourceRecord", "header guns")
|
||
|
|
|
||
|
|
def safe_apply (iter):
|
||
|
|
def safe_fn (fn, v):
|
||
|
|
try:
|
||
|
|
return fn(v)
|
||
|
|
except ValueError:
|
||
|
|
return None
|
||
|
|
return [safe_fn(v[0], v[1]) for v in iter]
|
||
|
|
|
||
|
|
def _check_chunk_size(chunk, size):
|
||
|
|
return len(chunk) == size
|
||
|
|
|
||
|
|
def from_file(path):
|
||
|
|
|
||
|
|
records = []
|
||
|
|
|
||
|
|
with open(path, "rb") as fd:
|
||
|
|
with mmap.mmap(fd.fileno(), length=0, access=mmap.ACCESS_READ) as buffer:
|
||
|
|
|
||
|
|
while True:
|
||
|
|
|
||
|
|
offset = buffer.find(b"*SMSRC")
|
||
|
|
|
||
|
|
if offset == -1:
|
||
|
|
break
|
||
|
|
|
||
|
|
buffer = buffer[offset:]
|
||
|
|
record, length = read_smartsource(buffer)
|
||
|
|
|
||
|
|
if record is not None:
|
||
|
|
records.append(record)
|
||
|
|
|
||
|
|
if length != 0:
|
||
|
|
buffer = buffer[length:]
|
||
|
|
else:
|
||
|
|
buffer = buffer[1:]
|
||
|
|
|
||
|
|
return records
|
||
|
|
|
||
|
|
def read_smartsource(buffer):
|
||
|
|
length = 0
|
||
|
|
header = st_smartsource_header.unpack_from(buffer, 0)
|
||
|
|
length += st_smartsource_header.size
|
||
|
|
header = SmartsourceHeader(*safe_apply(zip(fn_smartsource_header, header)))
|
||
|
|
record = dict(header._asdict())
|
||
|
|
record["guns"] = []
|
||
|
|
|
||
|
|
for _ in range(header.num_guns):
|
||
|
|
gun = st_smartsource_gun.unpack_from(buffer, length)
|
||
|
|
record["guns"].append(SmartsourceGun(*safe_apply(zip(fn_smartsource_gun, gun))))
|
||
|
|
length += st_smartsource_gun.size
|
||
|
|
|
||
|
|
return (record, length)
|