mirror of
https://gitlab.com/wgp/dougal/software.git
synced 2025-12-06 12:17:08 +00:00
This is a super-simple library that does the minimum required to get things going for the specific operations where this code is foreseen to be used in the immediate future. It is not and it does not aim to be a complete, generic or universal P1/11 parsing solution.
201 lines
5.0 KiB
Python
201 lines
5.0 KiB
Python
#!/usr/bin/python3
|
|
|
|
"""
|
|
P1/11 parsing functions.
|
|
"""
|
|
|
|
import math
|
|
import re
|
|
from datetime import datetime, timedelta, timezone
|
|
from parse_fwr import parse_fwr
|
|
|
|
def _int (string):
|
|
return int(float(string))
|
|
|
|
def _date (string):
|
|
return datetime.strptime(string, "%Y:%m:%d").replace(tzinfo=timezone.utc)
|
|
|
|
def _time (string):
|
|
try:
|
|
return datetime.strptime(string, "%H:%M:%S.%f").replace(tzinfo=timezone.utc).time()
|
|
except ValueError:
|
|
return datetime.strptime(string, "%H:%M:%S").replace(tzinfo=timezone.utc).time()
|
|
|
|
def _datetime (string):
|
|
try:
|
|
return datetime.strptime(string, "%Y:%m:%d:%H:%M:%S.%f").replace(tzinfo=timezone.utc)
|
|
except ValueError:
|
|
try:
|
|
return datetime.strptime(string, "%Y:%m:%d:%H:%M:%S").replace(tzinfo=timezone.utc)
|
|
except ValueError:
|
|
try:
|
|
return datetime.strptime(string, "%Y:%j:%H:%M:%S.%f").replace(tzinfo=timezone.utc)
|
|
except ValueError:
|
|
return datetime.strptime(string, "%Y:%j:%H:%M:%S").replace(tzinfo=timezone.utc)
|
|
|
|
def floatOrNone (string):
|
|
try:
|
|
return float(string)
|
|
except ValueError:
|
|
return None
|
|
|
|
def parse_p111_file_info (string):
|
|
"""
|
|
Parse OGP File Identification Record
|
|
|
|
Returns a dictionary of fields
|
|
"""
|
|
|
|
defs = [
|
|
("Contents Description", str),
|
|
("Format Code", int),
|
|
("Format Version Number", float),
|
|
("File Issue Number", int),
|
|
("Date File Written", _date),
|
|
("Time File Written", _time),
|
|
("Name of File", str),
|
|
("Prepared By", str)
|
|
]
|
|
|
|
record = string.split(",")
|
|
|
|
# i[0][0] is the name
|
|
# i[0][1] is the type converter
|
|
# i[1] is the record value
|
|
data = dict([(i[0][0], i[0][1](i[1])) for i in zip(defs, record[1:])])
|
|
data["tstamp"] = datetime.combine(data["Date File Written"], data["Time File Written"]).replace(tzinfo=timezone.utc)
|
|
data["type"] = "File Identification Record"
|
|
|
|
return data
|
|
|
|
|
|
def parse_p111_header (string):
|
|
"""
|
|
Parse any HC, H1 header record.
|
|
|
|
This function does not interpret the values of individual header types.
|
|
|
|
Returns dictionary of fields.
|
|
"""
|
|
|
|
record = string.split(",")
|
|
data = dict(enumerate([",".join(record[0:4])] + [record[4].rstrip()] + record[5:]))
|
|
data["type"] = "Common Header" if data[0] == "HC" else "P1/11 Header"
|
|
|
|
return data
|
|
|
|
def parse_p111_position (string):
|
|
"""
|
|
Parse a P1 position record.
|
|
|
|
NOTE: Utterly ignores all of the header record definitions.
|
|
NOTE: It also ignores “Additional Quality Measures” and
|
|
“Additional Data Fields”.
|
|
|
|
Returns dictionary of fields.
|
|
"""
|
|
|
|
defs = [
|
|
("Record Identifier", str),
|
|
("Record Version", int),
|
|
("Acquisition Line Name", str),
|
|
("Preplot Line Name", _int),
|
|
("Acquisition Point Number", _int),
|
|
("Preplot Point Number", _int),
|
|
("Index Number", int),
|
|
("Time", _datetime),
|
|
("Object Ref. Number", int),
|
|
("Object Short Name", str),
|
|
("Record Type Number", int),
|
|
("(Dummy field)", str),
|
|
("CRS A Coordinate 1", floatOrNone),
|
|
("CRS A Coordinate 2", floatOrNone),
|
|
("CRS A Coordinate 3", floatOrNone),
|
|
("CRS B Coordinate 1", floatOrNone),
|
|
("CRS B Coordinate 2", floatOrNone),
|
|
("CRS B Coordinate 3", floatOrNone),
|
|
("CRS C Coordinate 1", floatOrNone),
|
|
("CRS C Coordinate 2", floatOrNone),
|
|
("CRS C Coordinate 3", floatOrNone),
|
|
("Error Ellipse Horizontal Semimajor Axis or Radial Error Estimate", floatOrNone),
|
|
("Error Ellipse Horizontal Semiminor Axis", floatOrNone),
|
|
("Error Ellipse Horizontal Azimuth", floatOrNone),
|
|
("Error Ellipse Vertical Axis or Height Error Estimate", floatOrNone)
|
|
]
|
|
|
|
record = string.split(",", 24)
|
|
|
|
# i[0][0] is the name
|
|
# i[0][1] is the type converter
|
|
# i[1] is the record value
|
|
data = dict([(i[0][0], i[0][1](i[1])) for i in zip(defs, record)])
|
|
data["tstamp"] = data["Time"]
|
|
data["type"] = "S" if record[0] == "S1" else "P"
|
|
|
|
return data
|
|
|
|
|
|
def parse_line (string):
|
|
type = string[:3]
|
|
|
|
if type == "OGP":
|
|
return parse_p111_file_info(string)
|
|
elif type == "HC,":
|
|
return parse_p111_header(string)
|
|
elif type == "H1,":
|
|
return parse_p111_header(string)
|
|
elif type == "P1,":
|
|
return parse_p111_position(string)
|
|
elif type == "S1,":
|
|
return parse_p111_position(string)
|
|
else:
|
|
# We ignore other records
|
|
return None
|
|
|
|
|
|
def p111_type(type, records):
|
|
return [ r for r in records if r["type"] == type ]
|
|
|
|
def point_number(record):
|
|
if "Preplot Point Number" in record:
|
|
return record["Preplot Point Number"]
|
|
return None
|
|
|
|
def line(record):
|
|
if "Preplot Line Name" in record:
|
|
return record["Preplot Line Name"]
|
|
return None
|
|
|
|
def easting(record):
|
|
if "CRS A Coordinate 1" in record:
|
|
return record["CRS A Coordinate 1"]
|
|
return None
|
|
|
|
def northing(record):
|
|
if "CRS A Coordinate 2" in record:
|
|
return record["CRS A Coordinate 2"]
|
|
return None
|
|
|
|
def from_file(path, only_records=None, shot_range=None):
|
|
records = []
|
|
with open(path) as fd:
|
|
cnt = 0
|
|
line = fd.readline()
|
|
while line:
|
|
cnt = cnt + 1
|
|
|
|
record = parse_line(line)
|
|
if record is not None:
|
|
if only_records:
|
|
if not record["record_type"] in only_records:
|
|
continue
|
|
if shot_range:
|
|
shot = int(record["point_number"])
|
|
if not (shot >= min(shot_range) and shot <= max(shot_range)):
|
|
continue
|
|
|
|
records.append(record)
|
|
line = fd.readline()
|
|
|
|
return records
|