mirror of
https://gitlab.com/wgp/dougal/software.git
synced 2025-12-06 08:17:09 +00:00
129 lines
3.1 KiB
Python
129 lines
3.1 KiB
Python
#!/usr/bin/python3
|
|
|
|
"""
|
|
Fixed width record importing functions.
|
|
"""
|
|
|
|
import builtins
|
|
|
|
def to_bool (v):
|
|
try:
|
|
return bool(int(v))
|
|
except ValueError:
|
|
if type(v) == str:
|
|
return v.strip().lower().startswith("t")
|
|
return False
|
|
|
|
transform = {
|
|
"int": lambda v: builtins.int(float(v)),
|
|
"float": float,
|
|
"string": str,
|
|
"str": str,
|
|
"bool": to_bool
|
|
}
|
|
|
|
def parse_line (line, fields, fixed = None):
|
|
# print("parse_line", line, fields, fixed)
|
|
data = dict()
|
|
|
|
if fixed:
|
|
for value in fixed:
|
|
start = value["offset"]
|
|
end = start + len(value["text"])
|
|
text = line[start:end]
|
|
if text != value["text"]:
|
|
return f"Expected text `{value['text']}` at position {start} but found `{text}` instead."
|
|
|
|
for key in fields:
|
|
spec = fields[key]
|
|
transformer = transform[spec.get("type", "str")]
|
|
pos_from = spec["offset"]
|
|
pos_to = pos_from + spec["length"]
|
|
text = line[pos_from:pos_to]
|
|
value = transformer(text)
|
|
if "enum" in spec:
|
|
if "default" in spec:
|
|
value = spec["default"]
|
|
for enum_key in spec["enum"]:
|
|
if enum_key == text:
|
|
enum_value = transformer(spec["enum"][enum_key])
|
|
value = enum_value
|
|
break
|
|
|
|
data[key] = value
|
|
|
|
# print("parse_line data =", data)
|
|
return data
|
|
|
|
|
|
specfields = {
|
|
"sps1": {
|
|
"line_name": { "offset": 1, "length": 16, "type": "int" },
|
|
"point_number": { "offset": 17, "length": 8, "type": "int" },
|
|
"easting": { "offset": 46, "length": 9, "type": "float" },
|
|
"northing": { "offset": 55, "length": 10, "type": "float" }
|
|
},
|
|
"sps21": {
|
|
"line_name": { "offset": 1, "length": 7, "type": "int" },
|
|
"point_number": { "offset": 11, "length": 7, "type": "int" },
|
|
"easting": { "offset": 46, "length": 9, "type": "float" },
|
|
"northing": { "offset": 55, "length": 10, "type": "float" }
|
|
},
|
|
"p190": {
|
|
"line_name": { "offset": 1, "length": 12, "type": "int" },
|
|
"point_number": { "offset": 19, "length": 6, "type": "int" },
|
|
"easting": { "offset": 46, "length": 9, "type": "float" },
|
|
"northing": { "offset": 55, "length": 9, "type": "float" }
|
|
},
|
|
}
|
|
|
|
def from_file(path, spec):
|
|
|
|
# If spec.fields is not present, deduce it from spec.type ("sps1", "sps21", "p190", etc.)
|
|
if "fields" in spec:
|
|
fields = spec["fields"]
|
|
elif "type" in spec and spec["type"] in specfields:
|
|
fields = specfields[spec["type"]]
|
|
else:
|
|
# TODO: Should default to looking for spec.format and doing a legacy import on it
|
|
return "Neither 'type' nor 'fields' given. I don't know how to import this fixed-width dataset."
|
|
|
|
firstRow = spec.get("firstRow", 0)
|
|
|
|
skipStart = [] # Skip lines starting with any of these values
|
|
skipMatch = [] # Skip lines matching any of these values
|
|
|
|
if "type" in spec:
|
|
if spec["type"] == "sps1" or spec["type"] == "sps21" or spec["type"] == "p190":
|
|
skipStart = "H"
|
|
skipMatch = "EOF"
|
|
|
|
records = []
|
|
with open(path, "r", errors="ignore") as fd:
|
|
row = 0
|
|
line = fd.readline()
|
|
|
|
while line:
|
|
skip = False
|
|
|
|
if row < firstRow:
|
|
skip = True
|
|
|
|
if not skip:
|
|
for v in skipStart:
|
|
if line.startswith(v):
|
|
skip = True
|
|
break
|
|
for v in skipMatch:
|
|
if line == v:
|
|
skip = True
|
|
break
|
|
|
|
if not skip:
|
|
records.append(parse_line(line, fields))
|
|
|
|
row += 1
|
|
line = fd.readline()
|
|
|
|
return records
|