Files
dougal-software/bin/delimited.py
D. Berge 7e1023f6e8 Support import of delimited formats.
This supports CSV and similar formats, as well as sailline
imports, which is a CSV file with a specific set of column
definitions.

Does not yet support P111 import (for which there is an
implementation already).
2024-05-03 11:42:20 +02:00

164 lines
3.3 KiB
Python

#!/usr/bin/python3
"""
Delimited record importing functions.
"""
import csv
import builtins
def to_bool (v):
try:
return bool(int(v))
except ValueError:
if type(v) == str:
return v.strip().lower().startswith("t")
return False
transform = {
"int": lambda v: builtins.int(float(v)),
"float": float,
"string": str,
"bool": to_bool
}
def cast_values (row, fields):
def enum_for (key):
field = fields.get(key, {})
def enum (val):
if "enum" in field:
ret_val = field.get("default", val)
enums = field.get("enum", [])
for enum_key in enums:
if enum_key == val:
ret_val = enums[enum_key]
return ret_val
return val
return enum
# Get rid of any unwanted data
if None in row:
del(row[None])
for key in row:
val = row[key]
enum = enum_for(key)
transformer = transform.get(fields.get(key, {}).get("type"), str)
if type(val) == list:
for i, v in enumerate(val):
row[key][i] = transformer(enum(v))
elif type(val) == dict:
continue
else:
row[key] = transformer(enum(val))
return row
def build_fieldnames (spec): #(arr, key, val):
fieldnames = []
if "fields" in spec:
for key in spec["fields"]:
index = spec["fields"][key]["column"]
try:
fieldnames[index] = key
except IndexError:
assert index >= 0
fieldnames.extend(((index + 1) - len(fieldnames)) * [None])
fieldnames[index] = key
return fieldnames
def from_file_delimited (path, spec):
fieldnames = build_fieldnames(spec)
fields = spec.get("fields", [])
delimiter = spec.get("delimiter", ",")
firstRow = spec.get("firstRow", 0)
headerRow = spec.get("headerRow", False)
if headerRow:
firstRow += 1
records = []
with open(path, "r", errors="ignore") as fd:
if spec.get("type") == "x-sl+csv":
fieldnames = None # Pick from header row
firstRow = 0
reader = csv.DictReader(fd, delimiter=delimiter)
else:
reader = csv.DictReader(fd, fieldnames=fieldnames, delimiter=delimiter)
row = 0
for line in reader:
skip = False
if row < firstRow:
skip = True
if not skip:
records.append(cast_values(dict(line), fields))
row += 1
return records
def remap (line, headers):
row = dict()
for i, key in enumerate(headers):
if "." in key[1:-1]:
# This is an object
k, attr = key.split(".")
if not k in row:
row[k] = dict()
row[k][attr] = line[i]
elif key in row:
if type(row[key]) == list:
row[key].append(line[i])
else:
row[key] = [ row[key], line[i] ]
else:
row[key] = line[i]
return row
def from_file_saillines (path, spec):
fields = {
"sail_line": { "type": "int" },
"source_line": { "type": "int" },
"incr": { "type": "bool" },
"ntba": { "type": "bool" }
}
# fields = spec.get("fields", sl_fields)
delimiter = spec.get("delimiter", ",")
firstRow = spec.get("firstRow", 0)
records = []
with open(path, "r", errors="ignore") as fd:
row = 0
reader = csv.reader(fd, delimiter=delimiter)
while row < firstRow:
next(reader)
row += 1
headers = [ h.strip() for h in next(reader) if len(h.strip()) ]
for line in reader:
records.append(cast_values(remap(line, headers), fields))
return records
def from_file_p111 (path, spec):
pass
def from_file (path, spec):
if spec.get("type") == "x-sl+csv":
return from_file_saillines(path, spec)
else:
return from_file_delimited(path, spec)