mirror of
https://gitlab.com/wgp/dougal/software.git
synced 2025-12-06 11:27:09 +00:00
Provided that the SmartSource headers are being saved to file, and that the path to those files is present in the survey configuration, we now import SmartSource information as metadata in raw_shots.meta->'smsrc'. Closes #19.
96 lines
2.3 KiB
Python
96 lines
2.3 KiB
Python
#!/usr/bin/python3
|
|
|
|
"""
|
|
SmartSource parsing functions.
|
|
"""
|
|
|
|
import mmap
|
|
import struct
|
|
from collections import namedtuple
|
|
|
|
def _str (v):
|
|
return str(v, 'ascii').strip()
|
|
|
|
def _tstamp (v):
|
|
return str(v) # TODO
|
|
|
|
def _f10 (v):
|
|
return float(v)/10
|
|
|
|
def _ignore (v):
|
|
return None
|
|
|
|
st_smartsource_header = struct.Struct(">6s 4s 30s 10s 2s 1s 17s 1s 1s 2s 2s 2s 2s 2s 4s 6s 5s 5s 6s 4s 88s")
|
|
|
|
fn_smartsource_header = (
|
|
_str, int, _str, int, int, _str, _tstamp, int, int, int, int, int, int, int, int, int,
|
|
float, float, float, int, _str
|
|
)
|
|
|
|
SmartsourceHeader = namedtuple("SmartsourceHeader", "header blk_siz line shot mask trg_mode time src_number num_subarray num_guns num_active num_delta num_auto num_nofire spread volume avg_delta std_delta baro_press manifold spare")
|
|
|
|
st_smartsource_gun = struct.Struct(">1s 2s 1s 1s 1s 1s 1s 6s 6s 4s 4s 4s 4s 4s")
|
|
|
|
fn_smartsource_gun = (
|
|
int, int, int, _str, _str, lambda v: v=="Y", _str,
|
|
_f10, _f10, _f10, _f10,
|
|
int, int, int
|
|
)
|
|
|
|
SmartsourceGun = namedtuple("SmartsourceGun", "string gun source mode detect autofire spare aim_point firetime delay depth pressure volume filltime")
|
|
|
|
SmartSourceRecord = namedtuple("SmartSourceRecord", "header guns")
|
|
|
|
def safe_apply (iter):
|
|
def safe_fn (fn, v):
|
|
try:
|
|
return fn(v)
|
|
except ValueError:
|
|
return None
|
|
return [safe_fn(v[0], v[1]) for v in iter]
|
|
|
|
def _check_chunk_size(chunk, size):
|
|
return len(chunk) == size
|
|
|
|
def from_file(path):
|
|
|
|
records = []
|
|
|
|
with open(path, "rb") as fd:
|
|
with mmap.mmap(fd.fileno(), length=0, access=mmap.ACCESS_READ) as buffer:
|
|
|
|
while True:
|
|
|
|
offset = buffer.find(b"*SMSRC")
|
|
|
|
if offset == -1:
|
|
break
|
|
|
|
buffer = buffer[offset:]
|
|
record, length = read_smartsource(buffer)
|
|
|
|
if record is not None:
|
|
records.append(record)
|
|
|
|
if length != 0:
|
|
buffer = buffer[length:]
|
|
else:
|
|
buffer = buffer[1:]
|
|
|
|
return records
|
|
|
|
def read_smartsource(buffer):
|
|
length = 0
|
|
header = st_smartsource_header.unpack_from(buffer, 0)
|
|
length += st_smartsource_header.size
|
|
header = SmartsourceHeader(*safe_apply(zip(fn_smartsource_header, header)))
|
|
record = dict(header._asdict())
|
|
record["guns"] = []
|
|
|
|
for _ in range(header.num_guns):
|
|
gun = st_smartsource_gun.unpack_from(buffer, length)
|
|
record["guns"].append(SmartsourceGun(*safe_apply(zip(fn_smartsource_gun, gun))))
|
|
length += st_smartsource_gun.size
|
|
|
|
return (record, length)
|