mirror of
https://gitlab.com/wgp/dougal/software.git
synced 2025-12-06 07:57:07 +00:00
This supports CSV and similar formats, as well as sailline imports, which is a CSV file with a specific set of column definitions. Does not yet support P111 import (for which there is an implementation already).
164 lines
3.3 KiB
Python
164 lines
3.3 KiB
Python
#!/usr/bin/python3
|
|
|
|
"""
|
|
Delimited record importing functions.
|
|
"""
|
|
|
|
import csv
|
|
import builtins
|
|
|
|
def to_bool (v):
|
|
try:
|
|
return bool(int(v))
|
|
except ValueError:
|
|
if type(v) == str:
|
|
return v.strip().lower().startswith("t")
|
|
return False
|
|
|
|
transform = {
|
|
"int": lambda v: builtins.int(float(v)),
|
|
"float": float,
|
|
"string": str,
|
|
"bool": to_bool
|
|
}
|
|
|
|
def cast_values (row, fields):
|
|
|
|
def enum_for (key):
|
|
field = fields.get(key, {})
|
|
def enum (val):
|
|
if "enum" in field:
|
|
ret_val = field.get("default", val)
|
|
enums = field.get("enum", [])
|
|
for enum_key in enums:
|
|
if enum_key == val:
|
|
ret_val = enums[enum_key]
|
|
return ret_val
|
|
return val
|
|
return enum
|
|
|
|
# Get rid of any unwanted data
|
|
if None in row:
|
|
del(row[None])
|
|
|
|
for key in row:
|
|
|
|
val = row[key]
|
|
enum = enum_for(key)
|
|
transformer = transform.get(fields.get(key, {}).get("type"), str)
|
|
|
|
if type(val) == list:
|
|
for i, v in enumerate(val):
|
|
row[key][i] = transformer(enum(v))
|
|
elif type(val) == dict:
|
|
continue
|
|
else:
|
|
row[key] = transformer(enum(val))
|
|
return row
|
|
|
|
def build_fieldnames (spec): #(arr, key, val):
|
|
fieldnames = []
|
|
|
|
if "fields" in spec:
|
|
for key in spec["fields"]:
|
|
index = spec["fields"][key]["column"]
|
|
try:
|
|
fieldnames[index] = key
|
|
except IndexError:
|
|
assert index >= 0
|
|
fieldnames.extend(((index + 1) - len(fieldnames)) * [None])
|
|
fieldnames[index] = key
|
|
|
|
return fieldnames
|
|
|
|
|
|
def from_file_delimited (path, spec):
|
|
|
|
fieldnames = build_fieldnames(spec)
|
|
fields = spec.get("fields", [])
|
|
delimiter = spec.get("delimiter", ",")
|
|
firstRow = spec.get("firstRow", 0)
|
|
headerRow = spec.get("headerRow", False)
|
|
if headerRow:
|
|
firstRow += 1
|
|
|
|
records = []
|
|
with open(path, "r", errors="ignore") as fd:
|
|
|
|
if spec.get("type") == "x-sl+csv":
|
|
fieldnames = None # Pick from header row
|
|
firstRow = 0
|
|
reader = csv.DictReader(fd, delimiter=delimiter)
|
|
else:
|
|
reader = csv.DictReader(fd, fieldnames=fieldnames, delimiter=delimiter)
|
|
|
|
row = 0
|
|
for line in reader:
|
|
skip = False
|
|
|
|
if row < firstRow:
|
|
skip = True
|
|
|
|
if not skip:
|
|
records.append(cast_values(dict(line), fields))
|
|
|
|
row += 1
|
|
|
|
return records
|
|
|
|
|
|
def remap (line, headers):
|
|
row = dict()
|
|
for i, key in enumerate(headers):
|
|
if "." in key[1:-1]:
|
|
# This is an object
|
|
k, attr = key.split(".")
|
|
if not k in row:
|
|
row[k] = dict()
|
|
row[k][attr] = line[i]
|
|
elif key in row:
|
|
if type(row[key]) == list:
|
|
row[key].append(line[i])
|
|
else:
|
|
row[key] = [ row[key], line[i] ]
|
|
else:
|
|
row[key] = line[i]
|
|
return row
|
|
|
|
def from_file_saillines (path, spec):
|
|
|
|
fields = {
|
|
"sail_line": { "type": "int" },
|
|
"source_line": { "type": "int" },
|
|
"incr": { "type": "bool" },
|
|
"ntba": { "type": "bool" }
|
|
}
|
|
|
|
# fields = spec.get("fields", sl_fields)
|
|
delimiter = spec.get("delimiter", ",")
|
|
firstRow = spec.get("firstRow", 0)
|
|
|
|
records = []
|
|
with open(path, "r", errors="ignore") as fd:
|
|
row = 0
|
|
reader = csv.reader(fd, delimiter=delimiter)
|
|
while row < firstRow:
|
|
next(reader)
|
|
row += 1
|
|
headers = [ h.strip() for h in next(reader) if len(h.strip()) ]
|
|
|
|
for line in reader:
|
|
records.append(cast_values(remap(line, headers), fields))
|
|
|
|
return records
|
|
|
|
|
|
def from_file_p111 (path, spec):
|
|
pass
|
|
|
|
def from_file (path, spec):
|
|
if spec.get("type") == "x-sl+csv":
|
|
return from_file_saillines(path, spec)
|
|
else:
|
|
return from_file_delimited(path, spec)
|