mirror of
https://gitlab.com/wgp/dougal/software.git
synced 2025-12-06 06:57:07 +00:00
Support import of various fixed-width formats.
This supports reading: SPSv1, SPSv2.1, P190 and custom fixed-width formats. Supports skipping lines by startswith() matching or by complete match (e.g., "EOF"). Closes #300 (SPS v1) Closes #301 (SPS v2.1) Closes #302 (P1/90)
This commit is contained in:
108
bin/fwr.py
Normal file
108
bin/fwr.py
Normal file
@@ -0,0 +1,108 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
"""
|
||||
Fixed width record importing functions.
|
||||
"""
|
||||
|
||||
import builtins
|
||||
|
||||
transform = {
|
||||
"int": lambda v: builtins.int(float(v)),
|
||||
"float": float,
|
||||
"string": str,
|
||||
"bool": bool
|
||||
}
|
||||
|
||||
def parse_line (line, fields):
|
||||
data = dict()
|
||||
|
||||
for key in fields:
|
||||
spec = fields[key]
|
||||
transformer = transform[spec["type"]]
|
||||
pos_from = spec["offset"]
|
||||
pos_to = pos_from + spec["length"]
|
||||
text = line[pos_from:pos_to]
|
||||
value = transformer(text)
|
||||
if "enum" in spec:
|
||||
if "default" in spec:
|
||||
value = spec["default"]
|
||||
for enum_key in spec["enum"]:
|
||||
if enum_key == value:
|
||||
enum_value = transformer(spec["enum"][enum_key])
|
||||
value = enum_value
|
||||
break
|
||||
|
||||
data[key] = value
|
||||
|
||||
return data
|
||||
|
||||
|
||||
specfields = {
|
||||
"sps1": {
|
||||
"line_name": { "offset": 1, "length": 16, "type": "int" },
|
||||
"point_number": { "offset": 17, "length": 8, "type": "int" },
|
||||
"easting": { "offset": 46, "length": 9, "type": "float" },
|
||||
"northing": { "offset": 55, "length": 10, "type": "float" }
|
||||
},
|
||||
"sps21": {
|
||||
"line_name": { "offset": 1, "length": 7, "type": "int" },
|
||||
"point_number": { "offset": 11, "length": 7, "type": "int" },
|
||||
"easting": { "offset": 46, "length": 9, "type": "float" },
|
||||
"northing": { "offset": 55, "length": 10, "type": "float" }
|
||||
},
|
||||
"p190": {
|
||||
"line_name": { "offset": 1, "length": 12, "type": "int" },
|
||||
"point_number": { "offset": 19, "length": 6, "type": "int" },
|
||||
"easting": { "offset": 46, "length": 9, "type": "float" },
|
||||
"northing": { "offset": 55, "length": 9, "type": "float" }
|
||||
},
|
||||
}
|
||||
|
||||
def from_file(path, spec):
|
||||
|
||||
# If spec.fields is not present, deduce it from spec.type ("sps1", "sps21", "p190", etc.)
|
||||
if "fields" in spec:
|
||||
fields = spec["fields"]
|
||||
elif "type" in spec and spec["type"] in specfields:
|
||||
fields = specfields[spec["type"]]
|
||||
else:
|
||||
return "Neither 'type' nor 'fields' given. I don't know how to import this fixed-width dataset."
|
||||
|
||||
firstRow = spec.get("firstRow", 0)
|
||||
|
||||
skipStart = [] # Skip lines starting with any of these values
|
||||
skipMatch = [] # Skip lines matching any of these values
|
||||
|
||||
if "type" in spec:
|
||||
if spec["type"] == "sps1" or spec["type"] == "sps21" or spec["type"] == "p190":
|
||||
skipStart = "H"
|
||||
skipMatch = "EOF"
|
||||
|
||||
records = []
|
||||
with open(path, "r", errors="ignore") as fd:
|
||||
row = 0
|
||||
line = fd.readline()
|
||||
|
||||
while line:
|
||||
skip = False
|
||||
|
||||
if row < firstRow:
|
||||
skip = True
|
||||
|
||||
if not skip:
|
||||
for v in skipStart:
|
||||
if line.startswith(v):
|
||||
skip = True
|
||||
break
|
||||
for v in skipMatch:
|
||||
if line == v:
|
||||
skip = True
|
||||
break
|
||||
|
||||
if not skip:
|
||||
records.append(parse_line(line, fields))
|
||||
|
||||
row += 1
|
||||
line = fd.readline()
|
||||
|
||||
return records
|
||||
@@ -1,68 +0,0 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
"""
|
||||
Legacy fixed width record importing functions.
|
||||
"""
|
||||
|
||||
import builtins
|
||||
|
||||
def parse_fwr (string, widths, start=0):
|
||||
"""Parse a fixed-width record.
|
||||
|
||||
string: the string to parse.
|
||||
widths: a list of record widths. A negative width denotes a field to be skipped.
|
||||
start: optional start index.
|
||||
|
||||
Returns a list of strings.
|
||||
"""
|
||||
results = []
|
||||
current_index = start
|
||||
for width in widths:
|
||||
if width > 0:
|
||||
results.append(string[current_index : current_index + width])
|
||||
current_index += width
|
||||
else:
|
||||
current_index -= width
|
||||
|
||||
return results
|
||||
|
||||
def int (v):
|
||||
return builtins.int(float(v))
|
||||
|
||||
def parse_line (string, spec):
|
||||
"""Parse a line from an SPS file."""
|
||||
names = spec["names"]
|
||||
widths = spec["widths"]
|
||||
normalisers = spec["normalisers"]
|
||||
record = [ t[0](t[1]) for t in zip(normalisers, parse_fwr(string, widths)) ]
|
||||
return dict(zip(names, record))
|
||||
|
||||
def from_file(path, spec = None):
|
||||
if spec is None:
|
||||
spec = {
|
||||
"names": [ "line_name", "point_number", "easting", "northing" ],
|
||||
"widths": [ -1, 10, 10, -25, 10, 10 ],
|
||||
"normalisers": [ int, int, float, float ]
|
||||
}
|
||||
else:
|
||||
normaliser_tokens = [ "int", "float", "str", "bool" ]
|
||||
spec["normalisers"] = [ eval(t) for t in spec["types"] if t in normaliser_tokens ]
|
||||
|
||||
records = []
|
||||
with open(path) as fd:
|
||||
cnt = 0
|
||||
line = fd.readline()
|
||||
while line:
|
||||
cnt = cnt+1
|
||||
|
||||
if line == "EOF":
|
||||
break
|
||||
|
||||
record = parse_line(line, spec)
|
||||
if record is not None:
|
||||
records.append(record)
|
||||
|
||||
line = fd.readline()
|
||||
|
||||
del spec["normalisers"]
|
||||
return records
|
||||
@@ -1,15 +1,33 @@
|
||||
import legacy_fwr
|
||||
import fwr
|
||||
|
||||
"""
|
||||
Preplot importing functions.
|
||||
"""
|
||||
|
||||
def from_file (file, realpath = None):
|
||||
"""
|
||||
Return a list of dicts, where each dict has the structure:
|
||||
{
|
||||
"line_name": <int>,
|
||||
"points": [
|
||||
{
|
||||
"line_name": <int>,
|
||||
"point_number": <int>,
|
||||
"easting": <float>,
|
||||
"northing": <float>
|
||||
},
|
||||
…
|
||||
]
|
||||
}
|
||||
On error, return a string describing the error condition.
|
||||
"""
|
||||
|
||||
filepath = realpath or file["path"]
|
||||
if not "type" in file or file["type"] == "sps":
|
||||
records = legacy_fwr.from_file(filepath, file["format"] if "format" in file else None )
|
||||
else:
|
||||
return "Not an SPS file"
|
||||
records = fwr.from_file(filepath, file)
|
||||
|
||||
if type(records) == str:
|
||||
# This is an error message
|
||||
return records
|
||||
|
||||
lines = []
|
||||
line_names = set([r["line_name"] for r in records])
|
||||
|
||||
Reference in New Issue
Block a user