Files
dougal-software/bin/smsrc.py
D. Berge 19ce158329 Import SmartSource header data.
Provided that the SmartSource headers are being
saved to file, and that the path to those files
is present in the survey configuration, we now
import SmartSource information as metadata in
raw_shots.meta->'smsrc'.

Closes #19.
2020-09-06 13:45:56 +02:00

96 lines
2.3 KiB
Python

#!/usr/bin/python3
"""
SmartSource parsing functions.
"""
import mmap
import struct
from collections import namedtuple
def _str (v):
return str(v, 'ascii').strip()
def _tstamp (v):
return str(v) # TODO
def _f10 (v):
return float(v)/10
def _ignore (v):
return None
st_smartsource_header = struct.Struct(">6s 4s 30s 10s 2s 1s 17s 1s 1s 2s 2s 2s 2s 2s 4s 6s 5s 5s 6s 4s 88s")
fn_smartsource_header = (
_str, int, _str, int, int, _str, _tstamp, int, int, int, int, int, int, int, int, int,
float, float, float, int, _str
)
SmartsourceHeader = namedtuple("SmartsourceHeader", "header blk_siz line shot mask trg_mode time src_number num_subarray num_guns num_active num_delta num_auto num_nofire spread volume avg_delta std_delta baro_press manifold spare")
st_smartsource_gun = struct.Struct(">1s 2s 1s 1s 1s 1s 1s 6s 6s 4s 4s 4s 4s 4s")
fn_smartsource_gun = (
int, int, int, _str, _str, lambda v: v=="Y", _str,
_f10, _f10, _f10, _f10,
int, int, int
)
SmartsourceGun = namedtuple("SmartsourceGun", "string gun source mode detect autofire spare aim_point firetime delay depth pressure volume filltime")
SmartSourceRecord = namedtuple("SmartSourceRecord", "header guns")
def safe_apply (iter):
def safe_fn (fn, v):
try:
return fn(v)
except ValueError:
return None
return [safe_fn(v[0], v[1]) for v in iter]
def _check_chunk_size(chunk, size):
return len(chunk) == size
def from_file(path):
records = []
with open(path, "rb") as fd:
with mmap.mmap(fd.fileno(), length=0, access=mmap.ACCESS_READ) as buffer:
while True:
offset = buffer.find(b"*SMSRC")
if offset == -1:
break
buffer = buffer[offset:]
record, length = read_smartsource(buffer)
if record is not None:
records.append(record)
if length != 0:
buffer = buffer[length:]
else:
buffer = buffer[1:]
return records
def read_smartsource(buffer):
length = 0
header = st_smartsource_header.unpack_from(buffer, 0)
length += st_smartsource_header.size
header = SmartsourceHeader(*safe_apply(zip(fn_smartsource_header, header)))
record = dict(header._asdict())
record["guns"] = []
for _ in range(header.num_guns):
gun = st_smartsource_gun.unpack_from(buffer, length)
record["guns"].append(SmartsourceGun(*safe_apply(zip(fn_smartsource_gun, gun))))
length += st_smartsource_gun.size
return (record, length)