Add functions to export/import specific columns from DB.

Unlike system_imports.py and system_exports.py, which
deal with whole tables via COPY, this allows us to
export / import *either* whole tables or specific
columns only.

The data will be exported to text files containing
the selected columns + the primary key columns for
the table.

When importing, those tables for which a selection
of columns was exported must already be populated.
The import process will overwrite the data of the
non primary key columns it knows about. If whole
tables are exported, on the other hand, when
re-importing rows will be appended rather than
updated. It is the user's responsibility to make
sure that this will not cause any conflicts.
This commit is contained in:
D. Berge
2020-09-27 19:29:48 +02:00
parent bf313dd8e5
commit a05ecfd41c
2 changed files with 233 additions and 0 deletions

140
bin/system_load.py Executable file
View File

@@ -0,0 +1,140 @@
#!/usr/bin/python3
"""
Re-import Dougal-exported data created by
system_dump.py
The target tables must already be populated with
imported data in order for the import to succeed.
"""
import os
from glob import glob
import configuration
import preplots
from datastore import Datastore, psycopg2
locals().update(configuration.vars())
exportables = {
"public": {
"projects": [ "meta" ],
"real_time_inputs": None
},
"survey": {
"final_lines": [ "remarks", "meta" ],
"final_shots": [ "meta" ],
"preplot_lines": [ "remarks", "ntba", "meta" ],
"preplot_points": [ "ntba", "meta" ],
"raw_lines": [ "remarks", "meta" ],
"raw_shots": [ "meta" ]
}
}
def primary_key (table, cursor):
# https://wiki.postgresql.org/wiki/Retrieve_primary_key_columns
qry = """
SELECT a.attname, format_type(a.atttypid, a.atttypmod) AS data_type
FROM pg_index i
JOIN pg_attribute a
ON a.attrelid = i.indrelid
AND a.attnum = ANY(i.indkey)
WHERE i.indrelid = %s::regclass
AND i.indisprimary;
"""
cursor.execute(qry, (table,))
return cursor.fetchall()
def import_table(fd, table, columns, cursor):
pk = [ r[0] for r in primary_key(table, cursor) ]
# Create temporary table to import into
temptable = "import_"+table
print("Creating temporary table", temptable)
qry = f"""
CREATE TEMPORARY TABLE {temptable}
ON COMMIT DROP
AS SELECT {', '.join(pk + columns)} FROM {table}
WITH NO DATA;
"""
#print(qry)
cursor.execute(qry)
# Import into the temp table
print("Import data into temporary table")
cursor.copy_from(fd, temptable)
# Update the destination table
print("Updating destination table")
setcols = ", ".join([ f"{c} = t.{c}" for c in columns ])
wherecols = " AND ".join([ f"{table}.{c} = t.{c}" for c in pk ])
qry = f"""
UPDATE {table}
SET {setcols}
FROM {temptable} t
WHERE {wherecols};
"""
#print(qry)
cursor.execute(qry)
if __name__ == '__main__':
print("Reading configuration")
surveys = configuration.surveys()
print("Connecting to database")
db = Datastore()
db.connect()
for table in exportables["public"]:
with db.conn.cursor() as cursor:
columns = exportables["public"][table]
path = os.path.join(VARDIR, "-"+table)
with open(path, "rb") as fd:
print(" →→ ", path, " ←← ", table, columns)
if columns is not None:
import_table(fd, table, columns, cursor)
else:
try:
print(f"Copying from {path} into {table}")
cursor.copy_from(fd, table)
except psycopg2.errors.UniqueViolation:
print(f"It looks like table {table} may have already been imported. Skipping it.")
print("Reading surveys")
for survey in surveys:
print(f'Survey: {survey["id"]} ({survey["schema"]})')
db.set_survey(survey["schema"])
with db.conn.cursor() as cursor:
try:
pathPrefix = survey["exports"]["machine"]["path"]
except KeyError:
print("Survey does not define an export path for machine data")
continue
for table in exportables["survey"]:
columns = exportables["survey"][table]
path = os.path.join(pathPrefix, "-"+table)
print(" ←← ", path, " →→ ", table, columns)
with open(path, "rb") as fd:
if columns is not None:
import_table(fd, table, columns, cursor)
else:
try:
print(f"Copying from {path} into {table}")
cursor.copy_from(fd, table)
except psycopg2.errors.UniqueViolation:
print(f"It looks like table {table} may have already been imported. Skipping it.")
# If we don't commit the data does not actually get copied
db.conn.commit()
print("Done")