#!/usr/bin/python3 """ Delimited record importing functions. """ import csv import builtins def to_bool (v): try: return bool(int(v)) except ValueError: if type(v) == str: return v.strip().lower().startswith("t") return False transform = { "int": lambda v: builtins.int(float(v)), "float": float, "string": str, "bool": to_bool } def cast_values (row, fields): def enum_for (key): field = fields.get(key, {}) def enum (val): if "enum" in field: ret_val = field.get("default", val) enums = field.get("enum", []) for enum_key in enums: if enum_key == val: ret_val = enums[enum_key] return ret_val return val return enum # Get rid of any unwanted data if None in row: del(row[None]) for key in row: val = row[key] enum = enum_for(key) transformer = transform.get(fields.get(key, {}).get("type"), str) if type(val) == list: for i, v in enumerate(val): row[key][i] = transformer(enum(v)) elif type(val) == dict: continue else: row[key] = transformer(enum(val)) return row def build_fieldnames (spec): #(arr, key, val): fieldnames = [] if "fields" in spec: for key in spec["fields"]: index = spec["fields"][key]["column"] try: fieldnames[index] = key except IndexError: assert index >= 0 fieldnames.extend(((index + 1) - len(fieldnames)) * [None]) fieldnames[index] = key return fieldnames def from_file_delimited (path, spec): fieldnames = build_fieldnames(spec) fields = spec.get("fields", []) delimiter = spec.get("delimiter", ",") firstRow = spec.get("firstRow", 0) headerRow = spec.get("headerRow", False) if headerRow: firstRow += 1 records = [] with open(path, "r", errors="ignore") as fd: if spec.get("type") == "x-sl+csv": fieldnames = None # Pick from header row firstRow = 0 reader = csv.DictReader(fd, delimiter=delimiter) else: reader = csv.DictReader(fd, fieldnames=fieldnames, delimiter=delimiter) row = 0 for line in reader: skip = False if row < firstRow: skip = True if not skip: records.append(cast_values(dict(line), fields)) row += 1 return records def remap (line, headers): row = dict() for i, key in enumerate(headers): if "." in key[1:-1]: # This is an object k, attr = key.split(".") if not k in row: row[k] = dict() row[k][attr] = line[i] elif key in row: if type(row[key]) == list: row[key].append(line[i]) else: row[key] = [ row[key], line[i] ] else: row[key] = line[i] return row def from_file_saillines (path, spec): fields = { "sail_line": { "type": "int" }, "source_line": { "type": "int" }, "incr": { "type": "bool" }, "ntba": { "type": "bool" } } # fields = spec.get("fields", sl_fields) delimiter = spec.get("delimiter", ",") firstRow = spec.get("firstRow", 0) records = [] with open(path, "r", errors="ignore") as fd: row = 0 reader = csv.reader(fd, delimiter=delimiter) while row < firstRow: next(reader) row += 1 headers = [ h.strip() for h in next(reader) if len(h.strip()) ] for line in reader: records.append(cast_values(remap(line, headers), fields)) return records def from_file_p111 (path, spec): pass def from_file (path, spec): if spec.get("type") == "x-sl+csv": return from_file_saillines(path, spec) else: return from_file_delimited(path, spec)