Files
dougal-software/bin/system_load.py
2021-05-26 00:15:09 +02:00

151 lines
3.8 KiB
Python
Executable File

#!/usr/bin/python3
"""
Re-import Dougal-exported data created by
system_dump.py
The target tables must already be populated with
imported data in order for the import to succeed.
"""
import os
from glob import glob
import configuration
import preplots
from datastore import Datastore, psycopg2
locals().update(configuration.vars())
exportables = {
"public": {
"projects": [ "meta" ],
"info": None,
"real_time_inputs": None
},
"survey": {
"final_lines": [ "remarks", "meta" ],
"final_shots": [ "meta" ],
"preplot_lines": [ "remarks", "ntba", "meta" ],
"preplot_points": [ "ntba", "meta" ],
"raw_lines": [ "remarks", "meta" ],
"raw_shots": [ "meta" ],
"planned_lines": None
}
}
def primary_key (table, cursor):
# https://wiki.postgresql.org/wiki/Retrieve_primary_key_columns
qry = """
SELECT a.attname, format_type(a.atttypid, a.atttypmod) AS data_type
FROM pg_index i
JOIN pg_attribute a
ON a.attrelid = i.indrelid
AND a.attnum = ANY(i.indkey)
WHERE i.indrelid = %s::regclass
AND i.indisprimary;
"""
cursor.execute(qry, (table,))
return cursor.fetchall()
def import_table(fd, table, columns, cursor):
pk = [ r[0] for r in primary_key(table, cursor) ]
# Create temporary table to import into
temptable = "import_"+table
print("Creating temporary table", temptable)
qry = f"""
CREATE TEMPORARY TABLE {temptable}
ON COMMIT DROP
AS SELECT {', '.join(pk + columns)} FROM {table}
WITH NO DATA;
"""
#print(qry)
cursor.execute(qry)
# Import into the temp table
print("Import data into temporary table")
cursor.copy_from(fd, temptable)
# Update the destination table
print("Updating destination table")
setcols = ", ".join([ f"{c} = t.{c}" for c in columns ])
wherecols = " AND ".join([ f"{table}.{c} = t.{c}" for c in pk ])
qry = f"""
UPDATE {table}
SET {setcols}
FROM {temptable} t
WHERE {wherecols};
"""
#print(qry)
cursor.execute(qry)
if __name__ == '__main__':
print("Reading configuration")
surveys = configuration.surveys()
print("Connecting to database")
db = Datastore()
db.connect()
for table in exportables["public"]:
with db.conn.cursor() as cursor:
columns = exportables["public"][table]
path = os.path.join(VARDIR, "-"+table)
try:
with open(path, "rb") as fd:
print(" →→ ", path, " ←← ", table, columns)
if columns is not None:
import_table(fd, table, columns, cursor)
else:
try:
print(f"Copying from {path} into {table}")
cursor.copy_from(fd, table)
except psycopg2.errors.UniqueViolation:
print(f"It looks like table {table} may have already been imported. Skipping it.")
except FileNotFoundError:
print(f"File not found. Skipping {path}")
db.conn.commit()
print("Reading surveys")
for survey in surveys:
print(f'Survey: {survey["id"]} ({survey["schema"]})')
db.set_survey(survey["schema"])
with db.conn.cursor() as cursor:
try:
pathPrefix = survey["exports"]["machine"]["path"]
except KeyError:
print("Survey does not define an export path for machine data")
continue
for table in exportables["survey"]:
columns = exportables["survey"][table]
path = os.path.join(pathPrefix, "-"+table)
print(" ←← ", path, " →→ ", table, columns)
try:
with open(path, "rb") as fd:
if columns is not None:
import_table(fd, table, columns, cursor)
else:
try:
print(f"Copying from {path} into {table}")
cursor.copy_from(fd, table)
except psycopg2.errors.UniqueViolation:
print(f"It looks like table {table} may have already been imported. Skipping it.")
except FileNotFoundError:
print(f"File not found. Skipping {path}")
# If we don't commit the data does not actually get copied
db.conn.commit()
print("Done")