mirror of
https://gitlab.com/wgp/dougal/software.git
synced 2025-12-06 08:57:08 +00:00
The deferred import routing will delete any online data for any raw sequences that it imports.
502 lines
14 KiB
Python
502 lines
14 KiB
Python
import os
|
||
import json
|
||
import psycopg2
|
||
import configuration
|
||
import preplots
|
||
import p111
|
||
|
||
"""
|
||
Interface to the PostgreSQL database.
|
||
"""
|
||
|
||
def file_hash(file):
|
||
"""
|
||
Calculate a file hash based on its size, inode, modification and creation times.
|
||
|
||
The hash is used to uniquely identify files in the database and detect if they
|
||
have changed.
|
||
"""
|
||
st = os.stat(file)
|
||
return ":".join([str(v) for v in [st.st_size, st.st_mtime, st.st_ctime, st.st_ino]])
|
||
|
||
class Datastore:
|
||
"""
|
||
The database interface.
|
||
|
||
Usage:
|
||
db = Datastore()
|
||
"""
|
||
conn = None
|
||
autocommit = True
|
||
|
||
def __init__(self, **opts):
|
||
self.settings = configuration.read()["db"]
|
||
self.options = opts
|
||
|
||
def connect(self, **opts):
|
||
"""
|
||
Connect to the database.
|
||
|
||
opts: Dictionary of options to pass to psycopg2.connect in addition to the
|
||
connection string found in the system configuration (see configuration.py).
|
||
|
||
Does not normally need to be invoked directly, as it is called by the
|
||
set_survey() method.
|
||
"""
|
||
if self.conn is not None:
|
||
self.conn.close()
|
||
|
||
self.conn = psycopg2.connect(configuration.read()["db"]["connection_string"], **opts)
|
||
|
||
def set_autocommit(value = True):
|
||
"""
|
||
Enable or disable autocommit.
|
||
|
||
By default, each one of this class' methods commit their results before returning.
|
||
Sometimes that is not desirable so this methods allows the caller to change that
|
||
behaviour. If autocommit is disabled, the caller should commit / rollback explicitly,
|
||
e.g., by calling Datastore#conn.commit()
|
||
"""
|
||
self.autocommit = value
|
||
|
||
def maybe_commit(self):
|
||
if self.autocommit is True:
|
||
self.conn.commit()
|
||
|
||
def set_survey(self, schema = None):
|
||
"""
|
||
Set the schema corresponding to the survey to which subsequent operations will apply.
|
||
Note that this will replace the existing connection (if any) with a new one.
|
||
If no schema is given, defaults to 'public'.
|
||
"""
|
||
|
||
if schema is None:
|
||
search_path = "public"
|
||
else:
|
||
search_path = ",".join([schema, "public"])
|
||
|
||
if self.conn:
|
||
self.conn.close()
|
||
|
||
self.connect(options=f"-c search_path={search_path}")
|
||
|
||
|
||
def file_in_db(self, filepath):
|
||
"""
|
||
Check if the file is already in the database.
|
||
"""
|
||
with self.conn:
|
||
with self.conn.cursor() as cursor:
|
||
qry = "SELECT path, hash FROM files WHERE path = %s;"
|
||
cursor.execute(qry, (filepath,))
|
||
results = cursor.fetchall()
|
||
if len(results):
|
||
return (filepath, file_hash(filepath)) in results
|
||
|
||
|
||
def add_file(self, path, cursor = None):
|
||
"""
|
||
Add a file to the `files` table of a survey.
|
||
"""
|
||
if cursor is None:
|
||
cur = self.conn.cursor()
|
||
else:
|
||
cur = cursor
|
||
|
||
hash = file_hash(path)
|
||
qry = "CALL add_file(%s, %s);"
|
||
cur.execute(qry, (path, hash))
|
||
if cursor is None:
|
||
self.maybe_commit()
|
||
# We do not commit if we've been passed a cursor, instead
|
||
# we assume that we are in the middle of a transaction
|
||
return hash
|
||
|
||
def del_file(self, path, cursor = None):
|
||
"""
|
||
Remove a file from a survey's `file` table.
|
||
"""
|
||
if cursor is None:
|
||
cur = self.conn.cursor()
|
||
else:
|
||
cur = cursor
|
||
|
||
qry = "DELETE FROM files WHERE path = %s;"
|
||
cur.execute(qry, (path,))
|
||
if cursor is None:
|
||
self.maybe_commit()
|
||
# We do not commit if we've been passed a cursor, instead
|
||
# we assume that we are in the middle of a transaction
|
||
|
||
def list_files(self, cursor = None):
|
||
"""
|
||
List all files known to a survey.
|
||
"""
|
||
if cursor is None:
|
||
cur = self.conn.cursor()
|
||
else:
|
||
cur = cursor
|
||
|
||
qry = "SELECT * FROM files WHERE hash NOT LIKE '*%';"
|
||
cur.execute(qry)
|
||
res = cur.fetchall()
|
||
|
||
if cursor is None:
|
||
self.maybe_commit()
|
||
# We do not commit if we've been passed a cursor, instead
|
||
# we assume that we are in the middle of a transaction
|
||
return res
|
||
|
||
def save_preplots(self, lines, path, preplot_class, epsg = 0):
|
||
"""
|
||
Save preplot data.
|
||
|
||
Arguments:
|
||
|
||
lines (iterable): should be a collection of lines returned from
|
||
one of the preplot-reading functions (see preplots.py).
|
||
|
||
path (string): the full path to the preplot file from where the lines
|
||
have been read. It will be added to the survey's `file` table so that
|
||
it can be monitored for changes.
|
||
|
||
preplot_class (string): a valid preplot class code (`V` for saillines,
|
||
`S` for sources).
|
||
|
||
epsg (number): the EPSG code that identifies the preplot's CRS. Defaults
|
||
to unknown.
|
||
"""
|
||
|
||
with self.conn.cursor() as cursor:
|
||
hash = self.add_file(path, cursor)
|
||
count=0
|
||
for line in lines:
|
||
count += 1
|
||
print(f"\u001b[2KSaving line {count} / {len(lines)} ({len(line['points'])} points)", end="\r", flush=True)
|
||
|
||
p0 = line["points"][0]
|
||
p1 = line["points"][-1]
|
||
incr = p0["point_number"] <= p1["point_number"]
|
||
geom = f'LINESTRING({p0["easting"]} {p0["northing"]}, {p1["easting"]} {p1["northing"]})'
|
||
|
||
qry = """
|
||
INSERT INTO preplot_lines AS pl
|
||
(line, incr, class, remarks, geometry, hash)
|
||
VALUES
|
||
(%s, %s, %s, '', ST_GeomFromText(%s, %s), %s)
|
||
ON CONFLICT (line, class) DO UPDATE
|
||
SET incr = EXCLUDED.incr, geometry = EXCLUDED.geometry, hash = EXCLUDED.hash
|
||
WHERE pl.hash <> EXCLUDED.hash;
|
||
-- Note that remarks are *not* overwritten
|
||
"""
|
||
|
||
cursor.execute(qry, (line["line_name"], incr, preplot_class, geom, epsg, hash))
|
||
|
||
points = [ (line["line_name"], p["point_number"], preplot_class, p["easting"], p["northing"], epsg) for p in line["points"] ]
|
||
|
||
qry = """
|
||
INSERT INTO preplot_points
|
||
(line, point, class, geometry)
|
||
VALUES
|
||
(%s, %s, %s, ST_SetSRID(ST_MakePoint(%s, %s), %s))
|
||
ON CONFLICT (line, point, class) DO UPDATE
|
||
SET geometry = EXCLUDED.geometry;
|
||
"""
|
||
|
||
cursor.executemany(qry, points)
|
||
|
||
self.maybe_commit()
|
||
|
||
def save_raw_p190(self, records, fileinfo, filepath, epsg = 0, filedata = None, ntbp = False):
|
||
"""
|
||
Save raw P1 data.
|
||
|
||
Note that despite its name, this function does not care whether the data
|
||
comes from a P1/90 or a P1/11 or something else altogether.
|
||
|
||
Arguments:
|
||
|
||
records (iterable): a collection of P1 records as produced for instance by
|
||
the from_file() function from p190.py.
|
||
|
||
fileinfo (dictionary): information about the source file. It must have at
|
||
least "line" and "sequence" elements.
|
||
|
||
filepath (string): the full path to the file from where the lines have been
|
||
read.
|
||
|
||
epsg (number): the EPSG code identifying this dataset's CRS. Defaults to unknown.
|
||
|
||
filedata (dictionary): Arbitrary data associated with this file and saved in the
|
||
`file_data` table.
|
||
"""
|
||
|
||
with self.conn.cursor() as cursor:
|
||
hash = self.add_file(filepath, cursor)
|
||
incr = records[0]["point_number"] <= records[-1]["point_number"]
|
||
|
||
# Start by deleting any online data we may have for this sequence
|
||
# FIXME Factor this out into its own function
|
||
qry = """
|
||
DELETE
|
||
FROM raw_lines rl
|
||
USING raw_lines_files rlf
|
||
WHERE
|
||
rl.sequence = rlf.sequence
|
||
AND rlf.hash = '*online*'
|
||
AND rl.sequence = %s;
|
||
"""
|
||
|
||
qry = """
|
||
INSERT INTO raw_lines (sequence, line, remarks, ntbp, incr)
|
||
VALUES (%s, %s, '', %s, %s)
|
||
ON CONFLICT DO NOTHING;
|
||
"""
|
||
|
||
cursor.execute(qry, (fileinfo["sequence"], fileinfo["line"], ntbp, incr))
|
||
|
||
qry = """
|
||
INSERT INTO raw_lines_files (sequence, hash)
|
||
VALUES (%s, %s)
|
||
ON CONFLICT DO NOTHING;
|
||
"""
|
||
|
||
cursor.execute(qry, (fileinfo["sequence"], hash))
|
||
|
||
|
||
shots = [ (fileinfo["sequence"], r["point_number"], r["record_type"], r["objref"], r["tstamp"], hash, r["easting"], r["northing"], epsg) for r in records ]
|
||
|
||
qry = """
|
||
INSERT INTO raw_shots (sequence, point, class, objref, tstamp, hash, geometry)
|
||
VALUES (%s, %s, %s, %s, %s, %s, ST_SetSRID(ST_MakePoint(%s, %s), %s))
|
||
ON CONFLICT (sequence, point, class) DO UPDATE
|
||
SET
|
||
objref = EXCLUDED.objref, tstamp = EXCLUDED.tstamp,
|
||
hash = EXCLUDED.hash, geometry = EXCLUDED.geometry;
|
||
"""
|
||
|
||
cursor.executemany(qry, shots)
|
||
|
||
if filedata is not None:
|
||
self.save_file_data(filepath, json.dumps(filedata), cursor)
|
||
|
||
self.maybe_commit()
|
||
|
||
def save_final_p190(self, records, fileinfo, filepath, epsg = 0, filedata = None):
|
||
"""
|
||
Save final P1 data.
|
||
|
||
Note that despite its name, this function does not care whether the data
|
||
comes from a P1/90 or a P1/11 or something else altogether.
|
||
|
||
Arguments:
|
||
|
||
records (iterable): a collection of P1 records as produced for instance by
|
||
the from_file() function from p190.py.
|
||
|
||
fileinfo (dictionary): information about the source file. It must have at
|
||
least "line" and "sequence" elements.
|
||
|
||
filepath (string): the full path to the file from where the lines have been
|
||
read.
|
||
|
||
epsg (number): the EPSG code identifying this dataset's CRS. Defaults to unknown.
|
||
|
||
filedata (dictionary): Arbitrary data associated with this file and saved in the
|
||
`file_data` table.
|
||
"""
|
||
|
||
with self.conn.cursor() as cursor:
|
||
hash = self.add_file(filepath, cursor)
|
||
#print(records[0])
|
||
#print(records[-1])
|
||
incr = records[0]["point_number"] <= records[-1]["point_number"]
|
||
|
||
qry = """
|
||
INSERT INTO final_lines (sequence, line, remarks)
|
||
VALUES (%s, %s, '')
|
||
ON CONFLICT DO NOTHING;
|
||
"""
|
||
|
||
cursor.execute(qry, (fileinfo["sequence"], fileinfo["line"]))
|
||
|
||
qry = """
|
||
INSERT INTO final_lines_files (sequence, hash)
|
||
VALUES (%s, %s)
|
||
ON CONFLICT DO NOTHING;
|
||
"""
|
||
|
||
cursor.execute(qry, (fileinfo["sequence"], hash))
|
||
|
||
|
||
shots = [ (fileinfo["sequence"], r["point_number"], r["record_type"], r["objref"], r["tstamp"], hash, r["easting"], r["northing"], epsg) for r in records ]
|
||
|
||
qry = """
|
||
INSERT INTO final_shots (sequence, point, class, objref, tstamp, hash, geometry)
|
||
VALUES (%s, %s, %s, %s, %s, %s, ST_SetSRID(ST_MakePoint(%s, %s), %s))
|
||
ON CONFLICT (sequence, point, class) DO UPDATE
|
||
SET
|
||
objref = EXCLUDED.objref, tstamp = EXCLUDED.tstamp,
|
||
hash = EXCLUDED.hash, geometry = EXCLUDED.geometry;
|
||
"""
|
||
|
||
cursor.executemany(qry, shots)
|
||
|
||
if filedata is not None:
|
||
self.save_file_data(filepath, json.dumps(filedata), cursor)
|
||
|
||
self.maybe_commit()
|
||
|
||
def save_raw_p111 (self, records, fileinfo, filepath, epsg = 0, filedata = None, ntbp = False):
|
||
|
||
with self.conn.cursor() as cursor:
|
||
hash = self.add_file(filepath, cursor)
|
||
incr = p111.point_number(records[0]) <= p111.point_number(records[-1])
|
||
|
||
# Start by deleting any online data we may have for this sequence
|
||
# FIXME Factor this out into its own function
|
||
qry = """
|
||
DELETE
|
||
FROM raw_lines rl
|
||
USING raw_lines_files rlf
|
||
WHERE
|
||
rl.sequence = rlf.sequence
|
||
AND rlf.hash = '*online*'
|
||
AND rl.sequence = %s;
|
||
"""
|
||
|
||
cursor.execute(qry, (fileinfo["sequence"],))
|
||
|
||
qry = """
|
||
INSERT INTO raw_lines (sequence, line, remarks, ntbp, incr)
|
||
VALUES (%s, %s, '', %s, %s)
|
||
ON CONFLICT DO NOTHING;
|
||
"""
|
||
|
||
cursor.execute(qry, (fileinfo["sequence"], fileinfo["line"], ntbp, incr))
|
||
|
||
qry = """
|
||
INSERT INTO raw_lines_files (sequence, hash)
|
||
VALUES (%s, %s)
|
||
ON CONFLICT DO NOTHING;
|
||
"""
|
||
|
||
cursor.execute(qry, (fileinfo["sequence"], hash))
|
||
|
||
|
||
shots = [ (fileinfo["sequence"], p111.line(r), p111.point_number(r), r["Object Ref. Number"], r["tstamp"], hash, p111.easting(r), p111.northing(r), epsg) for r in records ]
|
||
|
||
qry = """
|
||
INSERT INTO raw_shots (sequence, line, point, objref, tstamp, hash, geometry)
|
||
VALUES (%s, %s, %s, %s, %s, %s, ST_SetSRID(ST_MakePoint(%s, %s), %s))
|
||
ON CONFLICT (sequence, point) DO UPDATE
|
||
SET
|
||
objref = EXCLUDED.objref, tstamp = EXCLUDED.tstamp,
|
||
hash = EXCLUDED.hash, geometry = EXCLUDED.geometry;
|
||
"""
|
||
|
||
cursor.executemany(qry, shots)
|
||
|
||
if filedata is not None:
|
||
self.save_file_data(filepath, json.dumps(filedata), cursor)
|
||
|
||
self.maybe_commit()
|
||
|
||
def save_final_p111 (self, records, fileinfo, filepath, epsg = 0, filedata = None):
|
||
|
||
with self.conn.cursor() as cursor:
|
||
hash = self.add_file(filepath, cursor)
|
||
|
||
qry = """
|
||
INSERT INTO final_lines (sequence, line, remarks)
|
||
VALUES (%s, %s, '')
|
||
ON CONFLICT DO NOTHING;
|
||
"""
|
||
|
||
cursor.execute(qry, (fileinfo["sequence"], fileinfo["line"]))
|
||
|
||
qry = """
|
||
INSERT INTO final_lines_files (sequence, hash)
|
||
VALUES (%s, %s)
|
||
ON CONFLICT DO NOTHING;
|
||
"""
|
||
|
||
cursor.execute(qry, (fileinfo["sequence"], hash))
|
||
|
||
|
||
shots = [ (fileinfo["sequence"], p111.line(r), p111.point_number(r), r["Object Ref. Number"], r["tstamp"], hash, p111.easting(r), p111.northing(r), epsg) for r in records ]
|
||
|
||
qry = """
|
||
INSERT INTO final_shots (sequence, line, point, objref, tstamp, hash, geometry)
|
||
VALUES (%s, %s, %s, %s, %s, %s, ST_SetSRID(ST_MakePoint(%s, %s), %s))
|
||
ON CONFLICT (sequence, point) DO UPDATE
|
||
SET
|
||
objref = EXCLUDED.objref, tstamp = EXCLUDED.tstamp,
|
||
hash = EXCLUDED.hash, geometry = EXCLUDED.geometry;
|
||
"""
|
||
|
||
cursor.executemany(qry, shots)
|
||
|
||
if filedata is not None:
|
||
self.save_file_data(filepath, json.dumps(filedata), cursor)
|
||
|
||
self.maybe_commit()
|
||
|
||
|
||
def save_file_data(self, path, filedata, cursor = None):
|
||
"""
|
||
Save arbitrary data associated with a file.
|
||
|
||
Arguments:
|
||
|
||
path (string): the full path to the file that the data is to be associated with.
|
||
|
||
filedata (object): arbitrary data – will be converted to JSON.
|
||
|
||
cursor (DB Cursor): if a cursor is passed by the caller, this function will not
|
||
call conn.commit() even if autocommit is True.
|
||
"""
|
||
if cursor is None:
|
||
cur = self.conn.cursor()
|
||
else:
|
||
cur = cursor
|
||
|
||
hash = self.add_file(path, cursor)
|
||
|
||
qry = """
|
||
INSERT INTO file_data (hash, data)
|
||
VALUES (%s, %s::json)
|
||
ON CONFLICT (hash) DO UPDATE
|
||
SET data = EXCLUDED.data;
|
||
"""
|
||
|
||
cur.execute(qry, (hash, filedata))
|
||
|
||
if cursor is None:
|
||
self.maybe_commit()
|
||
# We do not commit if we've been passed a cursor, instead
|
||
# we assume that we are in the middle of a transaction
|
||
|
||
|
||
def apply_survey_configuration(self, cursor = None):
|
||
if cursor is None:
|
||
cur = self.conn.cursor()
|
||
else:
|
||
cur = cursor
|
||
|
||
qry = """
|
||
INSERT INTO labels (name, data)
|
||
SELECT l.key, l.value
|
||
FROM file_data fd,
|
||
json_each(fd.data->'labels') l
|
||
WHERE fd.data::jsonb ? 'labels'
|
||
ON CONFLICT (name) DO UPDATE SET data = excluded.data;
|
||
"""
|
||
|
||
cur.execute(qry)
|
||
|
||
if cursor is None:
|
||
self.maybe_commit()
|
||
# We do not commit if we've been passed a cursor, instead
|
||
# we assume that we are in the middle of a transaction
|