#!/usr/bin/python3 """ Re-import Dougal-exported data created by system_dump.py The target tables must already be populated with imported data in order for the import to succeed. """ import os from glob import glob import configuration import preplots from datastore import Datastore, psycopg2 locals().update(configuration.vars()) exportables = { "public": { "projects": [ "meta" ], "info": None, "real_time_inputs": None }, "survey": { "final_lines": [ "remarks", "meta" ], "final_shots": [ "meta" ], "preplot_lines": [ "remarks", "ntba", "meta" ], "preplot_points": [ "ntba", "meta" ], "raw_lines": [ "remarks", "meta" ], "raw_shots": [ "meta" ], "planned_lines": None } } def primary_key (table, cursor): # https://wiki.postgresql.org/wiki/Retrieve_primary_key_columns qry = """ SELECT a.attname, format_type(a.atttypid, a.atttypmod) AS data_type FROM pg_index i JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) WHERE i.indrelid = %s::regclass AND i.indisprimary; """ cursor.execute(qry, (table,)) return cursor.fetchall() def import_table(fd, table, columns, cursor): pk = [ r[0] for r in primary_key(table, cursor) ] # Create temporary table to import into temptable = "import_"+table print("Creating temporary table", temptable) qry = f""" CREATE TEMPORARY TABLE {temptable} ON COMMIT DROP AS SELECT {', '.join(pk + columns)} FROM {table} WITH NO DATA; """ #print(qry) cursor.execute(qry) # Import into the temp table print("Import data into temporary table") cursor.copy_from(fd, temptable) # Update the destination table print("Updating destination table") setcols = ", ".join([ f"{c} = t.{c}" for c in columns ]) wherecols = " AND ".join([ f"{table}.{c} = t.{c}" for c in pk ]) qry = f""" UPDATE {table} SET {setcols} FROM {temptable} t WHERE {wherecols}; """ #print(qry) cursor.execute(qry) if __name__ == '__main__': print("Reading configuration") surveys = configuration.surveys() print("Connecting to database") db = Datastore() db.connect() for table in exportables["public"]: with db.conn.cursor() as cursor: columns = exportables["public"][table] path = os.path.join(VARDIR, "-"+table) try: with open(path, "rb") as fd: print(" →→ ", path, " ←← ", table, columns) if columns is not None: import_table(fd, table, columns, cursor) else: try: print(f"Copying from {path} into {table}") cursor.copy_from(fd, table) except psycopg2.errors.UniqueViolation: print(f"It looks like table {table} may have already been imported. Skipping it.") except FileNotFoundError: print(f"File not found. Skipping {path}") db.conn.commit() print("Reading surveys") for survey in surveys: print(f'Survey: {survey["id"]} ({survey["schema"]})') db.set_survey(survey["schema"]) with db.conn.cursor() as cursor: try: pathPrefix = survey["exports"]["machine"]["path"] except KeyError: print("Survey does not define an export path for machine data") continue for table in exportables["survey"]: columns = exportables["survey"][table] path = os.path.join(pathPrefix, "-"+table) print(" ←← ", path, " →→ ", table, columns) try: with open(path, "rb") as fd: if columns is not None: import_table(fd, table, columns, cursor) else: try: print(f"Copying from {path} into {table}") cursor.copy_from(fd, table) except psycopg2.errors.UniqueViolation: print(f"It looks like table {table} may have already been imported. Skipping it.") except FileNotFoundError: print(f"File not found. Skipping {path}") # If we don't commit the data does not actually get copied db.conn.commit() print("Done")