mirror of
https://gitlab.com/wgp/dougal/software.git
synced 2025-12-06 10:07:08 +00:00
204 lines
5.1 KiB
Python
204 lines
5.1 KiB
Python
#!/usr/bin/python3
|
|
|
|
"""
|
|
P1/11 parsing functions.
|
|
"""
|
|
|
|
import math
|
|
import re
|
|
from datetime import datetime, timedelta, timezone
|
|
from parse_fwr import parse_fwr
|
|
|
|
def _int (string):
|
|
return int(float(string))
|
|
|
|
def _date (string):
|
|
return datetime.strptime(string, "%Y:%m:%d").replace(tzinfo=timezone.utc)
|
|
|
|
def _time (string):
|
|
try:
|
|
return datetime.strptime(string, "%H:%M:%S.%f").replace(tzinfo=timezone.utc).time()
|
|
except ValueError:
|
|
return datetime.strptime(string, "%H:%M:%S").replace(tzinfo=timezone.utc).time()
|
|
|
|
def _datetime (string):
|
|
try:
|
|
return datetime.strptime(string, "%Y:%m:%d:%H:%M:%S.%f").replace(tzinfo=timezone.utc)
|
|
except ValueError:
|
|
try:
|
|
return datetime.strptime(string, "%Y:%m:%d:%H:%M:%S").replace(tzinfo=timezone.utc)
|
|
except ValueError:
|
|
try:
|
|
return datetime.strptime(string, "%Y:%j:%H:%M:%S.%f").replace(tzinfo=timezone.utc)
|
|
except ValueError:
|
|
return datetime.strptime(string, "%Y:%j:%H:%M:%S").replace(tzinfo=timezone.utc)
|
|
|
|
def floatOrNone (string):
|
|
try:
|
|
return float(string)
|
|
except ValueError:
|
|
return None
|
|
|
|
def parse_p111_file_info (string):
|
|
"""
|
|
Parse OGP File Identification Record
|
|
|
|
Returns a dictionary of fields
|
|
"""
|
|
|
|
defs = [
|
|
("Contents Description", str),
|
|
("Format Code", int),
|
|
("Format Version Number", float),
|
|
("File Issue Number", int),
|
|
("Date File Written", _date),
|
|
("Time File Written", _time),
|
|
("Name of File", str),
|
|
("Prepared By", str)
|
|
]
|
|
|
|
record = string.split(",")
|
|
|
|
# i[0][0] is the name
|
|
# i[0][1] is the type converter
|
|
# i[1] is the record value
|
|
data = dict([(i[0][0], i[0][1](i[1])) for i in zip(defs, record[1:])])
|
|
data["tstamp"] = datetime.combine(data["Date File Written"], data["Time File Written"]).replace(tzinfo=timezone.utc)
|
|
data["type"] = "File Identification Record"
|
|
|
|
return data
|
|
|
|
|
|
def parse_p111_header (string):
|
|
"""
|
|
Parse any HC, H1 header record.
|
|
|
|
This function does not interpret the values of individual header types.
|
|
|
|
Returns dictionary of fields.
|
|
"""
|
|
|
|
record = string.split(",")
|
|
data = dict(enumerate([",".join(record[0:4])] + [record[4].rstrip()] + record[5:]))
|
|
data["type"] = "Common Header" if data[0] == "HC" else "P1/11 Header"
|
|
|
|
return data
|
|
|
|
def parse_p111_position (string):
|
|
"""
|
|
Parse a P1 position record.
|
|
|
|
NOTE: Utterly ignores all of the header record definitions.
|
|
NOTE: It also ignores “Additional Quality Measures” and
|
|
“Additional Data Fields”.
|
|
|
|
Returns dictionary of fields.
|
|
"""
|
|
|
|
defs = [
|
|
("Record Identifier", str),
|
|
("Record Version", int),
|
|
("Acquisition Line Name", str),
|
|
("Preplot Line Name", _int),
|
|
("Acquisition Point Number", _int),
|
|
("Preplot Point Number", _int),
|
|
("Index Number", int),
|
|
("Time", _datetime),
|
|
("Object Ref. Number", int),
|
|
("Object Short Name", str),
|
|
("Record Type Number", int),
|
|
("(Dummy field)", str),
|
|
("CRS A Coordinate 1", floatOrNone),
|
|
("CRS A Coordinate 2", floatOrNone),
|
|
("CRS A Coordinate 3", floatOrNone),
|
|
("CRS B Coordinate 1", floatOrNone),
|
|
("CRS B Coordinate 2", floatOrNone),
|
|
("CRS B Coordinate 3", floatOrNone),
|
|
("CRS C Coordinate 1", floatOrNone),
|
|
("CRS C Coordinate 2", floatOrNone),
|
|
("CRS C Coordinate 3", floatOrNone),
|
|
("Error Ellipse Horizontal Semimajor Axis or Radial Error Estimate", floatOrNone),
|
|
("Error Ellipse Horizontal Semiminor Axis", floatOrNone),
|
|
("Error Ellipse Horizontal Azimuth", floatOrNone),
|
|
("Error Ellipse Vertical Axis or Height Error Estimate", floatOrNone)
|
|
]
|
|
|
|
record = string.split(",", 24)
|
|
|
|
# i[0][0] is the name
|
|
# i[0][1] is the type converter
|
|
# i[1] is the record value
|
|
data = dict([(i[0][0], i[0][1](i[1])) for i in zip(defs, record)])
|
|
data["tstamp"] = data["Time"]
|
|
data["type"] = "S" if record[0] == "S1" else "P"
|
|
|
|
return data
|
|
|
|
|
|
def parse_line (string):
|
|
type = string[:3]
|
|
|
|
if type == "OGP":
|
|
return parse_p111_file_info(string)
|
|
elif type == "HC,":
|
|
return parse_p111_header(string)
|
|
elif type == "H1,":
|
|
return parse_p111_header(string)
|
|
elif type == "P1,":
|
|
return parse_p111_position(string)
|
|
elif type == "S1,":
|
|
return parse_p111_position(string)
|
|
else:
|
|
# We ignore other records
|
|
return None
|
|
|
|
|
|
def line_name(records):
|
|
return set([ r['Acquisition Line Name'] for r in p111_type("S", records) ]).pop()
|
|
|
|
def p111_type(type, records):
|
|
return [ r for r in records if r["type"] == type ]
|
|
|
|
def point_number(record):
|
|
if "Preplot Point Number" in record:
|
|
return record["Preplot Point Number"]
|
|
return None
|
|
|
|
def line(record):
|
|
if "Preplot Line Name" in record:
|
|
return record["Preplot Line Name"]
|
|
return None
|
|
|
|
def easting(record):
|
|
if "CRS A Coordinate 1" in record:
|
|
return record["CRS A Coordinate 1"]
|
|
return None
|
|
|
|
def northing(record):
|
|
if "CRS A Coordinate 2" in record:
|
|
return record["CRS A Coordinate 2"]
|
|
return None
|
|
|
|
def from_file(path, only_records=None, shot_range=None):
|
|
records = []
|
|
with open(path) as fd:
|
|
cnt = 0
|
|
line = fd.readline()
|
|
while line:
|
|
cnt = cnt + 1
|
|
|
|
record = parse_line(line)
|
|
if record is not None:
|
|
if only_records:
|
|
if not record["record_type"] in only_records:
|
|
continue
|
|
if shot_range:
|
|
shot = int(record["point_number"])
|
|
if not (shot >= min(shot_range) and shot <= max(shot_range)):
|
|
continue
|
|
|
|
records.append(record)
|
|
line = fd.readline()
|
|
|
|
return records
|