mirror of
https://gitlab.com/wgp/dougal/software.git
synced 2025-12-06 11:17:08 +00:00
Support import of delimited formats.
This supports CSV and similar formats, as well as sailline imports, which is a CSV file with a specific set of column definitions. Does not yet support P111 import (for which there is an implementation already).
This commit is contained in:
163
bin/delimited.py
Normal file
163
bin/delimited.py
Normal file
@@ -0,0 +1,163 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
"""
|
||||
Delimited record importing functions.
|
||||
"""
|
||||
|
||||
import csv
|
||||
import builtins
|
||||
|
||||
def to_bool (v):
|
||||
try:
|
||||
return bool(int(v))
|
||||
except ValueError:
|
||||
if type(v) == str:
|
||||
return v.strip().lower().startswith("t")
|
||||
return False
|
||||
|
||||
transform = {
|
||||
"int": lambda v: builtins.int(float(v)),
|
||||
"float": float,
|
||||
"string": str,
|
||||
"bool": to_bool
|
||||
}
|
||||
|
||||
def cast_values (row, fields):
|
||||
|
||||
def enum_for (key):
|
||||
field = fields.get(key, {})
|
||||
def enum (val):
|
||||
if "enum" in field:
|
||||
ret_val = field.get("default", val)
|
||||
enums = field.get("enum", [])
|
||||
for enum_key in enums:
|
||||
if enum_key == val:
|
||||
ret_val = enums[enum_key]
|
||||
return ret_val
|
||||
return val
|
||||
return enum
|
||||
|
||||
# Get rid of any unwanted data
|
||||
if None in row:
|
||||
del(row[None])
|
||||
|
||||
for key in row:
|
||||
|
||||
val = row[key]
|
||||
enum = enum_for(key)
|
||||
transformer = transform.get(fields.get(key, {}).get("type"), str)
|
||||
|
||||
if type(val) == list:
|
||||
for i, v in enumerate(val):
|
||||
row[key][i] = transformer(enum(v))
|
||||
elif type(val) == dict:
|
||||
continue
|
||||
else:
|
||||
row[key] = transformer(enum(val))
|
||||
return row
|
||||
|
||||
def build_fieldnames (spec): #(arr, key, val):
|
||||
fieldnames = []
|
||||
|
||||
if "fields" in spec:
|
||||
for key in spec["fields"]:
|
||||
index = spec["fields"][key]["column"]
|
||||
try:
|
||||
fieldnames[index] = key
|
||||
except IndexError:
|
||||
assert index >= 0
|
||||
fieldnames.extend(((index + 1) - len(fieldnames)) * [None])
|
||||
fieldnames[index] = key
|
||||
|
||||
return fieldnames
|
||||
|
||||
|
||||
def from_file_delimited (path, spec):
|
||||
|
||||
fieldnames = build_fieldnames(spec)
|
||||
fields = spec.get("fields", [])
|
||||
delimiter = spec.get("delimiter", ",")
|
||||
firstRow = spec.get("firstRow", 0)
|
||||
headerRow = spec.get("headerRow", False)
|
||||
if headerRow:
|
||||
firstRow += 1
|
||||
|
||||
records = []
|
||||
with open(path, "r", errors="ignore") as fd:
|
||||
|
||||
if spec.get("type") == "x-sl+csv":
|
||||
fieldnames = None # Pick from header row
|
||||
firstRow = 0
|
||||
reader = csv.DictReader(fd, delimiter=delimiter)
|
||||
else:
|
||||
reader = csv.DictReader(fd, fieldnames=fieldnames, delimiter=delimiter)
|
||||
|
||||
row = 0
|
||||
for line in reader:
|
||||
skip = False
|
||||
|
||||
if row < firstRow:
|
||||
skip = True
|
||||
|
||||
if not skip:
|
||||
records.append(cast_values(dict(line), fields))
|
||||
|
||||
row += 1
|
||||
|
||||
return records
|
||||
|
||||
|
||||
def remap (line, headers):
|
||||
row = dict()
|
||||
for i, key in enumerate(headers):
|
||||
if "." in key[1:-1]:
|
||||
# This is an object
|
||||
k, attr = key.split(".")
|
||||
if not k in row:
|
||||
row[k] = dict()
|
||||
row[k][attr] = line[i]
|
||||
elif key in row:
|
||||
if type(row[key]) == list:
|
||||
row[key].append(line[i])
|
||||
else:
|
||||
row[key] = [ row[key], line[i] ]
|
||||
else:
|
||||
row[key] = line[i]
|
||||
return row
|
||||
|
||||
def from_file_saillines (path, spec):
|
||||
|
||||
fields = {
|
||||
"sail_line": { "type": "int" },
|
||||
"source_line": { "type": "int" },
|
||||
"incr": { "type": "bool" },
|
||||
"ntba": { "type": "bool" }
|
||||
}
|
||||
|
||||
# fields = spec.get("fields", sl_fields)
|
||||
delimiter = spec.get("delimiter", ",")
|
||||
firstRow = spec.get("firstRow", 0)
|
||||
|
||||
records = []
|
||||
with open(path, "r", errors="ignore") as fd:
|
||||
row = 0
|
||||
reader = csv.reader(fd, delimiter=delimiter)
|
||||
while row < firstRow:
|
||||
next(reader)
|
||||
row += 1
|
||||
headers = [ h.strip() for h in next(reader) if len(h.strip()) ]
|
||||
|
||||
for line in reader:
|
||||
records.append(cast_values(remap(line, headers), fields))
|
||||
|
||||
return records
|
||||
|
||||
|
||||
def from_file_p111 (path, spec):
|
||||
pass
|
||||
|
||||
def from_file (path, spec):
|
||||
if spec.get("type") == "x-sl+csv":
|
||||
return from_file_saillines(path, spec)
|
||||
else:
|
||||
return from_file_delimited(path, spec)
|
||||
Reference in New Issue
Block a user