Files
dougal-software/bin/fwr.py

129 lines
3.1 KiB
Python
Raw Normal View History

#!/usr/bin/python3
"""
Fixed width record importing functions.
"""
import builtins
def to_bool (v):
try:
return bool(int(v))
except ValueError:
if type(v) == str:
return v.strip().lower().startswith("t")
return False
transform = {
"int": lambda v: builtins.int(float(v)),
"float": float,
"string": str,
"str": str,
"bool": to_bool
}
def parse_line (line, fields, fixed = None):
2025-08-07 10:52:13 +02:00
# print("parse_line", line, fields, fixed)
data = dict()
if fixed:
for value in fixed:
start = value["offset"]
end = start + len(value["text"])
text = line[start:end]
if text != value["text"]:
return f"Expected text `{value['text']}` at position {start} but found `{text}` instead."
for key in fields:
spec = fields[key]
transformer = transform[spec.get("type", "str")]
pos_from = spec["offset"]
pos_to = pos_from + spec["length"]
text = line[pos_from:pos_to]
value = transformer(text)
if "enum" in spec:
if "default" in spec:
value = spec["default"]
for enum_key in spec["enum"]:
if enum_key == text:
enum_value = transformer(spec["enum"][enum_key])
value = enum_value
break
data[key] = value
2025-08-07 10:52:13 +02:00
# print("parse_line data =", data)
return data
specfields = {
"sps1": {
"line_name": { "offset": 1, "length": 16, "type": "int" },
"point_number": { "offset": 17, "length": 8, "type": "int" },
"easting": { "offset": 46, "length": 9, "type": "float" },
"northing": { "offset": 55, "length": 10, "type": "float" }
},
"sps21": {
"line_name": { "offset": 1, "length": 7, "type": "int" },
"point_number": { "offset": 11, "length": 7, "type": "int" },
"easting": { "offset": 46, "length": 9, "type": "float" },
"northing": { "offset": 55, "length": 10, "type": "float" }
},
"p190": {
"line_name": { "offset": 1, "length": 12, "type": "int" },
"point_number": { "offset": 19, "length": 6, "type": "int" },
"easting": { "offset": 46, "length": 9, "type": "float" },
"northing": { "offset": 55, "length": 9, "type": "float" }
},
}
def from_file(path, spec):
# If spec.fields is not present, deduce it from spec.type ("sps1", "sps21", "p190", etc.)
if "fields" in spec:
fields = spec["fields"]
elif "type" in spec and spec["type"] in specfields:
fields = specfields[spec["type"]]
else:
2024-05-03 11:41:50 +02:00
# TODO: Should default to looking for spec.format and doing a legacy import on it
return "Neither 'type' nor 'fields' given. I don't know how to import this fixed-width dataset."
firstRow = spec.get("firstRow", 0)
skipStart = [] # Skip lines starting with any of these values
skipMatch = [] # Skip lines matching any of these values
if "type" in spec:
if spec["type"] == "sps1" or spec["type"] == "sps21" or spec["type"] == "p190":
skipStart = "H"
skipMatch = "EOF"
records = []
with open(path, "r", errors="ignore") as fd:
row = 0
line = fd.readline()
while line:
skip = False
if row < firstRow:
skip = True
if not skip:
for v in skipStart:
if line.startswith(v):
skip = True
break
for v in skipMatch:
if line == v:
skip = True
break
if not skip:
records.append(parse_line(line, fields))
row += 1
line = fd.readline()
return records