Files
dougal-software/bin/datastore.py
2023-08-30 14:16:08 +02:00

740 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import json
import psycopg2
import configuration
import preplots
import p111
from hashlib import md5 # Because it's good enough
"""
Interface to the PostgreSQL database.
"""
def file_hash(file):
"""
Calculate a file hash based on its name, size, modification and creation times.
The hash is used to uniquely identify files in the database and detect if they
have changed.
"""
h = md5()
h.update(file.encode())
name_digest = h.hexdigest()[:16]
st = os.stat(file)
return ":".join([str(v) for v in [st.st_size, st.st_mtime, st.st_ctime, name_digest]])
class Datastore:
"""
The database interface.
Usage:
db = Datastore()
"""
conn = None
autocommit = True
def __init__(self, **opts):
self.settings = configuration.read()["db"]
self.options = opts
def connect(self, **opts):
"""
Connect to the database.
opts: Dictionary of options to pass to psycopg2.connect in addition to the
connection string found in the system configuration (see configuration.py).
Does not normally need to be invoked directly, as it is called by the
set_survey() method.
"""
if self.conn is not None:
self.conn.close()
self.conn = psycopg2.connect(configuration.read()["db"]["connection_string"], **opts)
def set_autocommit(self, value = True):
"""
Enable or disable autocommit.
By default, each one of this class' methods commit their results before returning.
Sometimes that is not desirable so this methods allows the caller to change that
behaviour. If autocommit is disabled, the caller should commit / rollback explicitly,
e.g., by calling Datastore#conn.commit()
"""
self.autocommit = value
def maybe_commit(self):
if self.autocommit is True:
self.conn.commit()
def set_survey(self, schema = None):
"""
Set the schema corresponding to the survey to which subsequent operations will apply.
Note that this will replace the existing connection (if any) with a new one.
If no schema is given, defaults to 'public'.
"""
if schema is None:
search_path = "public"
else:
search_path = ",".join([schema, "public"])
if self.conn:
self.conn.close()
self.connect(options=f"-c search_path={search_path}")
def file_in_db(self, filepath):
"""
Check if the file is already in the database.
"""
with self.conn:
with self.conn.cursor() as cursor:
qry = "SELECT path, hash FROM files WHERE path = %s;"
cursor.execute(qry, (filepath,))
results = cursor.fetchall()
if len(results):
return (filepath, file_hash(filepath)) in results
def add_file(self, path, cursor = None):
"""
Add a file to the `files` table of a survey.
"""
if cursor is None:
cur = self.conn.cursor()
else:
cur = cursor
hash = file_hash(path)
qry = "CALL add_file(%s, %s);"
cur.execute(qry, (path, hash))
if cursor is None:
self.maybe_commit()
# We do not commit if we've been passed a cursor, instead
# we assume that we are in the middle of a transaction
return hash
def del_file(self, path, cursor = None):
"""
Remove a file from a survey's `file` table.
"""
if cursor is None:
cur = self.conn.cursor()
else:
cur = cursor
qry = "DELETE FROM files WHERE path = %s;"
cur.execute(qry, (path,))
if cursor is None:
self.maybe_commit()
# We do not commit if we've been passed a cursor, instead
# we assume that we are in the middle of a transaction
def del_hash(self, hash, cursor = None):
"""
Remove a hash from a survey's `file` table.
"""
if cursor is None:
cur = self.conn.cursor()
else:
cur = cursor
qry = "DELETE FROM files WHERE hash = %s;"
cur.execute(qry, (hash,))
if cursor is None:
self.maybe_commit()
# We do not commit if we've been passed a cursor, instead
# we assume that we are in the middle of a transaction
def list_files(self, cursor = None):
"""
List all files known to a survey.
"""
if cursor is None:
cur = self.conn.cursor()
else:
cur = cursor
qry = "SELECT * FROM files WHERE hash NOT LIKE '*%';"
cur.execute(qry)
res = cur.fetchall()
if cursor is None:
self.maybe_commit()
# We do not commit if we've been passed a cursor, instead
# we assume that we are in the middle of a transaction
return res
def set_ntbp(self, path, ntbp, cursor = None):
"""
Set or remove a sequence's NTBP flag
"""
if cursor is None:
cur = self.conn.cursor()
else:
cur = cursor
hash = file_hash(path)
qry = """
UPDATE raw_lines rl
SET ntbp = %s
FROM raw_shots rs, files f
WHERE rs.hash = f.hash AND rs.sequence = rl.sequence AND f.hash = %s;
"""
cur.execute(qry, (ntbp, hash))
if cursor is None:
self.maybe_commit()
# We do not commit if we've been passed a cursor, instead
# we assume that we are in the middle of a transaction
def save_preplots(self, lines, filepath, preplot_class, epsg = 0, filedata = None):
"""
Save preplot data.
Arguments:
lines (iterable): should be a collection of lines returned from
one of the preplot-reading functions (see preplots.py).
filepath (string): the full path to the preplot file from where the lines
have been read. It will be added to the survey's `file` table so that
it can be monitored for changes.
preplot_class (string): a valid preplot class code (`V` for saillines,
`S` for sources).
epsg (number): the EPSG code that identifies the preplot's CRS. Defaults
to unknown.
"""
with self.conn.cursor() as cursor:
cursor.execute("BEGIN;")
hash = self.add_file(filepath, cursor)
count=0
for line in lines:
count += 1
print(f"\u001b[2KSaving line {count} / {len(lines)} ({len(line['points'])} points)", end="\r", flush=True)
p0 = line["points"][0]
p1 = line["points"][-1]
incr = p0["point_number"] <= p1["point_number"]
geom = f'LINESTRING({p0["easting"]} {p0["northing"]}, {p1["easting"]} {p1["northing"]})'
qry = """
INSERT INTO preplot_lines AS pl
(line, incr, class, remarks, geometry, hash)
VALUES
(%s, %s, %s, '', ST_GeomFromText(%s, %s), %s)
ON CONFLICT (line, class) DO UPDATE
SET incr = EXCLUDED.incr, geometry = EXCLUDED.geometry, hash = EXCLUDED.hash
WHERE pl.hash <> EXCLUDED.hash;
-- Note that remarks are *not* overwritten
"""
cursor.execute(qry, (line["line_name"], incr, preplot_class, geom, epsg, hash))
points = [ (line["line_name"], p["point_number"], preplot_class, p["easting"], p["northing"], epsg) for p in line["points"] ]
qry = """
INSERT INTO preplot_points
(line, point, class, geometry)
VALUES
(%s, %s, %s, ST_SetSRID(ST_MakePoint(%s, %s), %s))
ON CONFLICT (line, point, class) DO UPDATE
SET geometry = EXCLUDED.geometry;
"""
cursor.executemany(qry, points)
if filedata is not None:
self.save_file_data(filepath, json.dumps(filedata), cursor)
self.maybe_commit()
def save_raw_p190(self, records, fileinfo, filepath, epsg = 0, filedata = None, ntbp = False):
"""
Save raw P1 data.
Note that despite its name, this function does not care whether the data
comes from a P1/90 or a P1/11 or something else altogether.
Arguments:
records (iterable): a collection of P1 records as produced for instance by
the from_file() function from p190.py.
fileinfo (dictionary): information about the source file. It must have at
least "line" and "sequence" elements.
filepath (string): the full path to the file from where the lines have been
read.
epsg (number): the EPSG code identifying this dataset's CRS. Defaults to unknown.
filedata (dictionary): Arbitrary data associated with this file and saved in the
`file_data` table.
"""
with self.conn.cursor() as cursor:
cursor.execute("BEGIN;")
hash = self.add_file(filepath, cursor)
incr = records[0]["point_number"] <= records[-1]["point_number"]
# Start by deleting any online data we may have for this sequence
self.del_hash("*online*", cursor)
qry = """
INSERT INTO raw_lines (sequence, line, remarks, ntbp, incr)
VALUES (%s, %s, '', %s, %s)
ON CONFLICT DO NOTHING;
"""
cursor.execute(qry, (fileinfo["sequence"], fileinfo["line"], ntbp, incr))
qry = """
INSERT INTO raw_lines_files (sequence, hash)
VALUES (%s, %s)
ON CONFLICT DO NOTHING;
"""
cursor.execute(qry, (fileinfo["sequence"], hash))
shots = [ (fileinfo["sequence"], r["point_number"], r["record_type"], r["objref"], r["tstamp"], hash, r["easting"], r["northing"], epsg) for r in records ]
qry = """
INSERT INTO raw_shots (sequence, point, class, objref, tstamp, hash, geometry)
VALUES (%s, %s, %s, %s, %s, %s, ST_SetSRID(ST_MakePoint(%s, %s), %s))
ON CONFLICT (sequence, point, class) DO UPDATE
SET
objref = EXCLUDED.objref, tstamp = EXCLUDED.tstamp,
hash = EXCLUDED.hash, geometry = EXCLUDED.geometry;
"""
cursor.executemany(qry, shots)
if filedata is not None:
self.save_file_data(filepath, json.dumps(filedata), cursor)
self.maybe_commit()
def save_final_p190(self, records, fileinfo, filepath, epsg = 0, filedata = None):
"""
Save final P1 data.
Note that despite its name, this function does not care whether the data
comes from a P1/90 or a P1/11 or something else altogether.
Arguments:
records (iterable): a collection of P1 records as produced for instance by
the from_file() function from p190.py.
fileinfo (dictionary): information about the source file. It must have at
least "line" and "sequence" elements.
filepath (string): the full path to the file from where the lines have been
read.
epsg (number): the EPSG code identifying this dataset's CRS. Defaults to unknown.
filedata (dictionary): Arbitrary data associated with this file and saved in the
`file_data` table.
"""
with self.conn.cursor() as cursor:
cursor.execute("BEGIN;")
hash = self.add_file(filepath, cursor)
#print(records[0])
#print(records[-1])
incr = records[0]["point_number"] <= records[-1]["point_number"]
qry = """
INSERT INTO final_lines (sequence, line, remarks)
VALUES (%s, %s, '')
ON CONFLICT DO NOTHING;
"""
cursor.execute(qry, (fileinfo["sequence"], fileinfo["line"]))
qry = """
INSERT INTO final_lines_files (sequence, hash)
VALUES (%s, %s)
ON CONFLICT DO NOTHING;
"""
cursor.execute(qry, (fileinfo["sequence"], hash))
shots = [ (fileinfo["sequence"], r["point_number"], r["record_type"], r["objref"], r["tstamp"], hash, r["easting"], r["northing"], epsg) for r in records ]
qry = """
INSERT INTO final_shots (sequence, point, class, objref, tstamp, hash, geometry)
VALUES (%s, %s, %s, %s, %s, %s, ST_SetSRID(ST_MakePoint(%s, %s), %s))
ON CONFLICT (sequence, point, class) DO UPDATE
SET
objref = EXCLUDED.objref, tstamp = EXCLUDED.tstamp,
hash = EXCLUDED.hash, geometry = EXCLUDED.geometry;
"""
cursor.executemany(qry, shots)
if filedata is not None:
self.save_file_data(filepath, json.dumps(filedata), cursor)
self.maybe_commit()
def save_raw_p111 (self, records, fileinfo, filepath, epsg = 0, filedata = None, ntbp = False):
with self.conn.cursor() as cursor:
cursor.execute("BEGIN;")
hash = self.add_file(filepath, cursor)
if not records or len(records) == 0:
print("File has no records (or none have been detected)")
# We add the file to the database anyway to signal that we have
# actually seen it.
self.maybe_commit()
return
incr = p111.point_number(records[0]) <= p111.point_number(records[-1])
# Start by deleting any online data we may have for this sequence
self.del_hash("*online*", cursor)
qry = """
INSERT INTO raw_lines (sequence, line, remarks, ntbp, incr, meta)
VALUES (%s, %s, '', %s, %s, %s)
ON CONFLICT (sequence) DO UPDATE SET
line = EXCLUDED.line,
ntbp = EXCLUDED.ntbp,
incr = EXCLUDED.incr,
meta = EXCLUDED.meta;
"""
cursor.execute(qry, (fileinfo["sequence"], fileinfo["line"], ntbp, incr, json.dumps(fileinfo["meta"])))
qry = """
UPDATE raw_lines
SET meta = meta || %s
WHERE sequence = %s;
"""
cursor.execute(qry, (json.dumps(fileinfo["meta"]), fileinfo["sequence"]))
qry = """
INSERT INTO raw_lines_files (sequence, hash)
VALUES (%s, %s)
ON CONFLICT DO NOTHING;
"""
cursor.execute(qry, (fileinfo["sequence"], hash))
shots = [ (fileinfo["sequence"], p111.line(r), p111.point_number(r), r["Object Ref. Number"], r["tstamp"], hash, p111.easting(r), p111.northing(r), epsg) for r in records ]
qry = """
INSERT INTO raw_shots (sequence, line, point, objref, tstamp, hash, geometry)
VALUES (%s, %s, %s, %s, %s, %s, ST_SetSRID(ST_MakePoint(%s, %s), %s))
ON CONFLICT (sequence, point) DO UPDATE
SET
objref = EXCLUDED.objref, tstamp = EXCLUDED.tstamp,
hash = EXCLUDED.hash, geometry = EXCLUDED.geometry;
"""
cursor.executemany(qry, shots)
if filedata is not None:
self.save_file_data(filepath, json.dumps(filedata), cursor)
self.maybe_commit()
def save_final_p111 (self, records, fileinfo, filepath, epsg = 0, filedata = None):
with self.conn.cursor() as cursor:
cursor.execute("BEGIN;")
hash = self.add_file(filepath, cursor)
qry = """
INSERT INTO final_lines (sequence, line, remarks, meta)
VALUES (%s, %s, '', %s)
ON CONFLICT (sequence) DO UPDATE SET
line = EXCLUDED.line,
meta = EXCLUDED.meta;
"""
cursor.execute(qry, (fileinfo["sequence"], fileinfo["line"], json.dumps(fileinfo["meta"])))
qry = """
UPDATE raw_lines
SET meta = meta || %s
WHERE sequence = %s;
"""
cursor.execute(qry, (json.dumps(fileinfo["meta"]), fileinfo["sequence"]))
qry = """
INSERT INTO final_lines_files (sequence, hash)
VALUES (%s, %s)
ON CONFLICT DO NOTHING;
"""
cursor.execute(qry, (fileinfo["sequence"], hash))
shots = [ (fileinfo["sequence"], p111.line(r), p111.point_number(r), r["Object Ref. Number"], r["tstamp"], hash, p111.easting(r), p111.northing(r), epsg) for r in records ]
qry = """
INSERT INTO final_shots (sequence, line, point, objref, tstamp, hash, geometry)
VALUES (%s, %s, %s, %s, %s, %s, ST_SetSRID(ST_MakePoint(%s, %s), %s))
ON CONFLICT (sequence, point) DO UPDATE
SET
objref = EXCLUDED.objref, tstamp = EXCLUDED.tstamp,
hash = EXCLUDED.hash, geometry = EXCLUDED.geometry;
"""
cursor.executemany(qry, shots)
if filedata is not None:
self.save_file_data(filepath, json.dumps(filedata), cursor)
cursor.execute("CALL final_line_post_import(%s);", (fileinfo["sequence"],))
self.maybe_commit()
def save_raw_smsrc (self, records, fileinfo, filepath, filedata = None):
with self.conn.cursor() as cursor:
cursor.execute("BEGIN;")
hash = self.add_file(filepath, cursor)
# Start by deleting any online data we may have for this sequence
# NOTE: Do I need to do this?
#self.del_hash("*online*", cursor)
# The shots should already exist, e.g., from a P1 import
# …but what about if the SMSRC file gets read *before* the P1?
# We need to check
qry = "SELECT count(*) FROM raw_shots WHERE sequence = %s AND hash != '*online*';"
values = (fileinfo["sequence"],)
cursor.execute(qry, values)
shotcount = cursor.fetchone()[0]
if shotcount == 0:
# No shots yet or not all imported, so we do *not*
# save the gun data. It will eventually get picked
# up in the next run.
# Let's remove the file from the file list and bail
# out.
print("No raw shots for sequence", fileinfo["sequence"])
self.conn.rollback()
return
values = [ (json.dumps(record), fileinfo["sequence"], record["shot"]) for record in records ]
qry = """
UPDATE raw_shots
SET meta = jsonb_set(meta, '{smsrc}', %s::jsonb, true) - 'qc'
WHERE sequence = %s AND point = %s;
"""
cursor.executemany(qry, values)
if filedata is not None:
self.save_file_data(filepath, json.dumps(filedata), cursor)
self.maybe_commit()
def save_file_data(self, path, filedata, cursor = None):
"""
Save arbitrary data associated with a file.
Arguments:
path (string): the full path to the file that the data is to be associated with.
filedata (object): arbitrary data will be converted to JSON.
cursor (DB Cursor): if a cursor is passed by the caller, this function will not
call conn.commit() even if autocommit is True.
"""
if cursor is None:
cur = self.conn.cursor()
else:
cur = cursor
hash = self.add_file(path, cursor)
qry = """
INSERT INTO file_data (hash, data)
VALUES (%s, %s::json)
ON CONFLICT (hash) DO UPDATE
SET data = EXCLUDED.data;
"""
cur.execute(qry, (hash, filedata))
if cursor is None:
self.maybe_commit()
# We do not commit if we've been passed a cursor, instead
# we assume that we are in the middle of a transaction
def apply_survey_configuration(self, cursor = None):
if cursor is None:
cur = self.conn.cursor()
else:
cur = cursor
qry = """
INSERT INTO labels (name, data)
SELECT l.key, l.value
FROM file_data fd,
jsonb_each(fd.data->'labels') l
WHERE fd.data::jsonb ? 'labels'
ON CONFLICT (name) DO UPDATE SET data = excluded.data;
"""
cur.execute(qry)
if cursor is None:
self.maybe_commit()
# We do not commit if we've been passed a cursor, instead
# we assume that we are in the middle of a transaction
def add_info(self, key, value, cursor = None):
"""
Add an item of information to the project
"""
if cursor is None:
cur = self.conn.cursor()
else:
cur = cursor
qry = """
INSERT INTO info (key, value)
VALUES(%s, %s)
ON CONFLICT (key) DO UPDATE
SET value = EXCLUDED.value;
"""
cur.execute(qry, (key, value))
if cursor is None:
self.maybe_commit()
# We do not commit if we've been passed a cursor, instead
# we assume that we are in the middle of a transaction
def get_info(self, key, cursor = None):
"""
Retrieve an item of information from the project
"""
if cursor is None:
cur = self.conn.cursor()
else:
cur = cursor
qry = "SELECT value FROM info WHERE key = %s;"
cur.execute(qry, (key,))
res = cur.fetchone()
if cursor is None:
self.maybe_commit()
# We do not commit if we've been passed a cursor, instead
# we assume that we are in the middle of a transaction
return res
def del_info(self, key, cursor = None):
"""
Remove a an item of information from the project
"""
if cursor is None:
cur = self.conn.cursor()
else:
cur = cursor
qry = "DELETE FROM info WHERE key = %s;"
cur.execute(qry, (key,))
if cursor is None:
self.maybe_commit()
# We do not commit if we've been passed a cursor, instead
# we assume that we are in the middle of a transaction
def del_sequence_final(self, sequence, cursor = None):
"""
Remove final data for a sequence.
"""
if cursor is None:
cur = self.conn.cursor()
else:
cur = cursor
qry = "DELETE FROM files WHERE hash = (SELECT hash FROM final_lines_files WHERE sequence = %s);"
cur.execute(qry, (sequence,))
if cursor is None:
self.maybe_commit()
# We do not commit if we've been passed a cursor, instead
# we assume that we are in the middle of a transaction
def adjust_planner(self, cursor = None):
"""
Adjust estimated times on the planner
"""
if cursor is None:
cur = self.conn.cursor()
else:
cur = cursor
qry = "CALL adjust_planner();"
cur.execute(qry)
if cursor is None:
self.maybe_commit()
# We do not commit if we've been passed a cursor, instead
# we assume that we are in the middle of a transaction
def housekeep_event_log(self, cursor = None):
"""
Call housekeeping actions on the event log
"""
if cursor is None:
cur = self.conn.cursor()
else:
cur = cursor
qry = "CALL augment_event_data();"
cur.execute(qry)
qry = "CALL scan_placeholders();"
cur.execute(qry)
if cursor is None:
self.maybe_commit()
# We do not commit if we've been passed a cursor, instead
# we assume that we are in the middle of a transaction
def run_daily_tasks(self, cursor = None):
"""
Run once-a-day tasks
"""
if cursor is None:
cur = self.conn.cursor()
else:
cur = cursor
qry = "CALL log_midnight_shots();"
cur.execute(qry)
if cursor is None:
self.maybe_commit()
# We do not commit if we've been passed a cursor, instead
# we assume that we are in the middle of a transaction