import os import json import psycopg2 import configuration import preplots import p111 from hashlib import md5 # Because it's good enough """ Interface to the PostgreSQL database. """ def file_hash(file): """ Calculate a file hash based on its name, size, modification and creation times. The hash is used to uniquely identify files in the database and detect if they have changed. """ h = md5() h.update(file.encode()) name_digest = h.hexdigest()[:16] st = os.stat(file) return ":".join([str(v) for v in [st.st_size, st.st_mtime, st.st_ctime, name_digest]]) class Datastore: """ The database interface. Usage: db = Datastore() """ conn = None autocommit = True def __init__(self, **opts): self.settings = configuration.read()["db"] self.options = opts def connect(self, **opts): """ Connect to the database. opts: Dictionary of options to pass to psycopg2.connect in addition to the connection string found in the system configuration (see configuration.py). Does not normally need to be invoked directly, as it is called by the set_survey() method. """ if self.conn is not None: self.conn.close() self.conn = psycopg2.connect(configuration.read()["db"]["connection_string"], **opts) def set_autocommit(self, value = True): """ Enable or disable autocommit. By default, each one of this class' methods commit their results before returning. Sometimes that is not desirable so this methods allows the caller to change that behaviour. If autocommit is disabled, the caller should commit / rollback explicitly, e.g., by calling Datastore#conn.commit() """ self.autocommit = value def maybe_commit(self): if self.autocommit is True: self.conn.commit() def set_survey(self, schema = None): """ Set the schema corresponding to the survey to which subsequent operations will apply. Note that this will replace the existing connection (if any) with a new one. If no schema is given, defaults to 'public'. """ if schema is None: search_path = "public" else: search_path = ",".join([schema, "public"]) if self.conn: self.conn.close() self.connect(options=f"-c search_path={search_path}") def file_in_db(self, filepath): """ Check if the file is already in the database. """ with self.conn: with self.conn.cursor() as cursor: qry = "SELECT path, hash FROM files WHERE path = %s;" cursor.execute(qry, (filepath,)) results = cursor.fetchall() if len(results): return (filepath, file_hash(configuration.translate_path(filepath))) in results def add_file(self, path, cursor = None): """ Add a file to the `files` table of a survey. """ if cursor is None: cur = self.conn.cursor() else: cur = cursor realpath = configuration.translate_path(path) hash = file_hash(realpath) qry = "CALL add_file(%s, %s);" cur.execute(qry, (path, hash)) if cursor is None: self.maybe_commit() # We do not commit if we've been passed a cursor, instead # we assume that we are in the middle of a transaction return hash def del_file(self, path, cursor = None): """ Remove a file from a survey's `file` table. """ if cursor is None: cur = self.conn.cursor() else: cur = cursor qry = "DELETE FROM files WHERE path = %s;" cur.execute(qry, (path,)) if cursor is None: self.maybe_commit() # We do not commit if we've been passed a cursor, instead # we assume that we are in the middle of a transaction def del_hash(self, hash, cursor = None): """ Remove a hash from a survey's `file` table. """ if cursor is None: cur = self.conn.cursor() else: cur = cursor qry = "DELETE FROM files WHERE hash = %s;" cur.execute(qry, (hash,)) if cursor is None: self.maybe_commit() # We do not commit if we've been passed a cursor, instead # we assume that we are in the middle of a transaction def list_files(self, cursor = None): """ List all files known to a survey. """ if cursor is None: cur = self.conn.cursor() else: cur = cursor qry = "SELECT * FROM files WHERE hash NOT LIKE '*%';" cur.execute(qry) res = cur.fetchall() if cursor is None: self.maybe_commit() # We do not commit if we've been passed a cursor, instead # we assume that we are in the middle of a transaction return res def set_ntbp(self, path, ntbp, cursor = None): """ Set or remove a sequence's NTBP flag """ if cursor is None: cur = self.conn.cursor() else: cur = cursor hash = file_hash(configuration.translate_path(path)) qry = """ UPDATE raw_lines rl SET ntbp = %s FROM raw_shots rs, files f WHERE rs.hash = f.hash AND rs.sequence = rl.sequence AND f.hash = %s; """ cur.execute(qry, (ntbp, hash)) if cursor is None: self.maybe_commit() # We do not commit if we've been passed a cursor, instead # we assume that we are in the middle of a transaction def save_preplots(self, lines, filepath, preplot_class, epsg = 0, filedata = None): """ Save preplot data. Arguments: lines (iterable): should be a collection of lines returned from one of the preplot-reading functions (see preplots.py). filepath (string): the full path to the preplot file from where the lines have been read. It will be added to the survey's `file` table so that it can be monitored for changes. preplot_class (string): a valid preplot class code (`V` for saillines, `S` for sources). epsg (number): the EPSG code that identifies the preplot's CRS. Defaults to unknown. """ with self.conn.cursor() as cursor: cursor.execute("BEGIN;") hash = self.add_file(filepath, cursor) count=0 for line in lines: count += 1 print(f"\u001b[2KSaving line {count} / {len(lines)} ({len(line['points'])} points)", end="\r", flush=True) p0 = line["points"][0] p1 = line["points"][-1] incr = p0["point_number"] <= p1["point_number"] geom = f'LINESTRING({p0["easting"]} {p0["northing"]}, {p1["easting"]} {p1["northing"]})' qry = """ INSERT INTO preplot_lines AS pl (line, incr, class, remarks, geometry, hash) VALUES (%s, %s, %s, '', ST_GeomFromText(%s, %s), %s) ON CONFLICT (line, class) DO UPDATE SET incr = EXCLUDED.incr, geometry = EXCLUDED.geometry, hash = EXCLUDED.hash WHERE pl.hash <> EXCLUDED.hash; -- Note that remarks are *not* overwritten """ cursor.execute(qry, (line["line_name"], incr, preplot_class, geom, epsg, hash)) points = [ (line["line_name"], p["point_number"], preplot_class, p["easting"], p["northing"], epsg) for p in line["points"] ] qry = """ INSERT INTO preplot_points (line, point, class, geometry) VALUES (%s, %s, %s, ST_SetSRID(ST_MakePoint(%s, %s), %s)) ON CONFLICT (line, point, class) DO UPDATE SET geometry = EXCLUDED.geometry; """ cursor.executemany(qry, points) if filedata is not None: self.save_file_data(filepath, json.dumps(filedata), cursor) self.maybe_commit() def save_preplot_line_info(self, lines, filepath, filedata = None): """ Save preplot line information Arguments: lines (iterable): should be a collection of lines returned from one of the line info reading functions (see preplots.py). filepath (string): the full path to the preplot file from where the lines have been read. It will be added to the survey's `file` table so that it can be monitored for changes. """ with self.conn.cursor() as cursor: cursor.execute("BEGIN;") # Check which preplot lines we actually have already imported, # as the line info file may contain extra lines. qry = """ SELECT line, class FROM preplot_lines ORDER BY line, class; """ cursor.execute(qry) preplot_lines = cursor.fetchall() hash = self.add_file(filepath, cursor) count=0 for line in lines: count += 1 if not (line["sail_line"], "V") in preplot_lines: print(f"\u001b[2KSkipping line {count} / {len(lines)}", end="\n", flush=True) continue print(f"\u001b[2KSaving line {count} / {len(lines)} ", end="\n", flush=True) sail_line = line["sail_line"] incr = line.get("incr", True) ntba = line.get("ntba", False) remarks = line.get("remarks", None) meta = json.dumps(line.get("meta", {})) source_lines = line.get("source_line", []) for source_line in source_lines: qry = """ INSERT INTO preplot_saillines AS ps (sailline, line, sailline_class, line_class, incr, ntba, remarks, meta, hash) VALUES (%s, %s, 'V', 'S', %s, %s, %s, %s, %s) ON CONFLICT (sailline, sailline_class, line, line_class, incr) DO UPDATE SET incr = EXCLUDED.incr, ntba = EXCLUDED.ntba, remarks = COALESCE(EXCLUDED.remarks, ps.remarks), meta = ps.meta || EXCLUDED.meta, hash = EXCLUDED.hash; """ # NOTE Consider using cursor.executemany() instead. Then again, # we're only expecting a few hundred lines at most. cursor.execute(qry, (sail_line, source_line, incr, ntba, remarks, meta, hash)) if filedata is not None: self.save_file_data(filepath, json.dumps(filedata), cursor) self.maybe_commit() def save_raw_p190(self, records, fileinfo, filepath, epsg = 0, filedata = None, ntbp = False): """ Save raw P1 data. Note that despite its name, this function does not care whether the data comes from a P1/90 or a P1/11 or something else altogether. Arguments: records (iterable): a collection of P1 records as produced for instance by the from_file() function from p190.py. fileinfo (dictionary): information about the source file. It must have at least "line" and "sequence" elements. filepath (string): the full path to the file from where the lines have been read. epsg (number): the EPSG code identifying this dataset's CRS. Defaults to unknown. filedata (dictionary): Arbitrary data associated with this file and saved in the `file_data` table. """ with self.conn.cursor() as cursor: cursor.execute("BEGIN;") hash = self.add_file(filepath, cursor) incr = records[0]["point_number"] <= records[-1]["point_number"] # Start by deleting any online data we may have for this sequence self.del_hash("*online*", cursor) qry = """ INSERT INTO raw_lines (sequence, line, remarks, ntbp, incr) VALUES (%s, %s, '', %s, %s) ON CONFLICT DO NOTHING; """ cursor.execute(qry, (fileinfo["sequence"], fileinfo["line"], ntbp, incr)) qry = """ INSERT INTO raw_lines_files (sequence, hash) VALUES (%s, %s) ON CONFLICT DO NOTHING; """ cursor.execute(qry, (fileinfo["sequence"], hash)) shots = [ (fileinfo["sequence"], r["point_number"], r["record_type"], r["objref"], r["tstamp"], hash, r["easting"], r["northing"], epsg) for r in records ] qry = """ INSERT INTO raw_shots (sequence, point, class, objref, tstamp, hash, geometry) VALUES (%s, %s, %s, %s, %s, %s, ST_SetSRID(ST_MakePoint(%s, %s), %s)) ON CONFLICT (sequence, point, class) DO UPDATE SET objref = EXCLUDED.objref, tstamp = EXCLUDED.tstamp, hash = EXCLUDED.hash, geometry = EXCLUDED.geometry; """ cursor.executemany(qry, shots) if filedata is not None: self.save_file_data(filepath, json.dumps(filedata), cursor) self.maybe_commit() def save_final_p190(self, records, fileinfo, filepath, epsg = 0, filedata = None): """ Save final P1 data. Note that despite its name, this function does not care whether the data comes from a P1/90 or a P1/11 or something else altogether. Arguments: records (iterable): a collection of P1 records as produced for instance by the from_file() function from p190.py. fileinfo (dictionary): information about the source file. It must have at least "line" and "sequence" elements. filepath (string): the full path to the file from where the lines have been read. epsg (number): the EPSG code identifying this dataset's CRS. Defaults to unknown. filedata (dictionary): Arbitrary data associated with this file and saved in the `file_data` table. """ with self.conn.cursor() as cursor: cursor.execute("BEGIN;") hash = self.add_file(filepath, cursor) #print(records[0]) #print(records[-1]) incr = records[0]["point_number"] <= records[-1]["point_number"] qry = """ INSERT INTO final_lines (sequence, line, remarks) VALUES (%s, %s, '') ON CONFLICT DO NOTHING; """ cursor.execute(qry, (fileinfo["sequence"], fileinfo["line"])) qry = """ INSERT INTO final_lines_files (sequence, hash) VALUES (%s, %s) ON CONFLICT DO NOTHING; """ cursor.execute(qry, (fileinfo["sequence"], hash)) shots = [ (fileinfo["sequence"], r["point_number"], r["record_type"], r["objref"], r["tstamp"], hash, r["easting"], r["northing"], epsg) for r in records ] qry = """ INSERT INTO final_shots (sequence, point, class, objref, tstamp, hash, geometry) VALUES (%s, %s, %s, %s, %s, %s, ST_SetSRID(ST_MakePoint(%s, %s), %s)) ON CONFLICT (sequence, point, class) DO UPDATE SET objref = EXCLUDED.objref, tstamp = EXCLUDED.tstamp, hash = EXCLUDED.hash, geometry = EXCLUDED.geometry; """ cursor.executemany(qry, shots) if filedata is not None: self.save_file_data(filepath, json.dumps(filedata), cursor) self.maybe_commit() def save_raw_p111 (self, records, fileinfo, filepath, epsg = 0, filedata = None, ntbp = False): with self.conn.cursor() as cursor: cursor.execute("BEGIN;") hash = self.add_file(filepath, cursor) if not records or len(records) == 0: print("File has no records (or none have been detected)") # We add the file to the database anyway to signal that we have # actually seen it. self.maybe_commit() return incr = p111.point_number(records[0]) <= p111.point_number(records[-1]) # Start by deleting any online data we may have for this sequence self.del_hash("*online*", cursor) qry = """ INSERT INTO raw_lines (sequence, line, remarks, ntbp, incr, meta) VALUES (%s, %s, '', %s, %s, %s) ON CONFLICT (sequence) DO UPDATE SET line = EXCLUDED.line, ntbp = EXCLUDED.ntbp, incr = EXCLUDED.incr, meta = EXCLUDED.meta; """ cursor.execute(qry, (fileinfo["sequence"], fileinfo["line"], ntbp, incr, json.dumps(fileinfo["meta"]))) qry = """ UPDATE raw_lines SET meta = meta || %s WHERE sequence = %s; """ cursor.execute(qry, (json.dumps(fileinfo["meta"]), fileinfo["sequence"])) qry = """ INSERT INTO raw_lines_files (sequence, hash) VALUES (%s, %s) ON CONFLICT DO NOTHING; """ cursor.execute(qry, (fileinfo["sequence"], hash)) shots = [ (fileinfo["sequence"], p111.line(r), p111.point_number(r), r["Object Ref. Number"], r["tstamp"], hash, p111.easting(r), p111.northing(r), epsg) for r in records ] qry = """ INSERT INTO raw_shots (sequence, line, point, objref, tstamp, hash, geometry) VALUES (%s, %s, %s, %s, %s, %s, ST_SetSRID(ST_MakePoint(%s, %s), %s)) ON CONFLICT (sequence, point) DO UPDATE SET objref = EXCLUDED.objref, tstamp = EXCLUDED.tstamp, hash = EXCLUDED.hash, geometry = EXCLUDED.geometry; """ cursor.executemany(qry, shots) if filedata is not None: self.save_file_data(filepath, json.dumps(filedata), cursor) self.maybe_commit() def save_final_p111 (self, records, fileinfo, filepath, epsg = 0, filedata = None): with self.conn.cursor() as cursor: cursor.execute("BEGIN;") hash = self.add_file(filepath, cursor) qry = """ INSERT INTO final_lines (sequence, line, remarks, meta) VALUES (%s, %s, '', %s) ON CONFLICT (sequence) DO UPDATE SET line = EXCLUDED.line, meta = EXCLUDED.meta; """ cursor.execute(qry, (fileinfo["sequence"], fileinfo["line"], json.dumps(fileinfo["meta"]))) qry = """ UPDATE raw_lines SET meta = meta || %s WHERE sequence = %s; """ cursor.execute(qry, (json.dumps(fileinfo["meta"]), fileinfo["sequence"])) qry = """ INSERT INTO final_lines_files (sequence, hash) VALUES (%s, %s) ON CONFLICT DO NOTHING; """ cursor.execute(qry, (fileinfo["sequence"], hash)) shots = [ (fileinfo["sequence"], p111.line(r), p111.point_number(r), r["Object Ref. Number"], r["tstamp"], hash, p111.easting(r), p111.northing(r), epsg) for r in records ] qry = """ INSERT INTO final_shots (sequence, line, point, objref, tstamp, hash, geometry) VALUES (%s, %s, %s, %s, %s, %s, ST_SetSRID(ST_MakePoint(%s, %s), %s)) ON CONFLICT (sequence, point) DO UPDATE SET objref = EXCLUDED.objref, tstamp = EXCLUDED.tstamp, hash = EXCLUDED.hash, geometry = EXCLUDED.geometry; """ cursor.executemany(qry, shots) if filedata is not None: self.save_file_data(filepath, json.dumps(filedata), cursor) cursor.execute("CALL final_line_post_import(%s);", (fileinfo["sequence"],)) self.maybe_commit() def save_raw_smsrc (self, records, fileinfo, filepath, filedata = None): with self.conn.cursor() as cursor: cursor.execute("BEGIN;") hash = self.add_file(filepath, cursor) # Start by deleting any online data we may have for this sequence # NOTE: Do I need to do this? #self.del_hash("*online*", cursor) # The shots should already exist, e.g., from a P1 import # …but what about if the SMSRC file gets read *before* the P1? # We need to check qry = "SELECT count(*) FROM raw_shots WHERE sequence = %s AND hash != '*online*';" values = (fileinfo["sequence"],) cursor.execute(qry, values) shotcount = cursor.fetchone()[0] if shotcount == 0: # No shots yet or not all imported, so we do *not* # save the gun data. It will eventually get picked # up in the next run. # Let's remove the file from the file list and bail # out. print("No raw shots for sequence", fileinfo["sequence"]) self.conn.rollback() return values = [ (json.dumps(record), fileinfo["sequence"], record["shot"]) for record in records ] qry = """ UPDATE raw_shots SET meta = jsonb_set(meta, '{smsrc}', %s::jsonb, true) - 'qc' WHERE sequence = %s AND point = %s; """ cursor.executemany(qry, values) if filedata is not None: self.save_file_data(filepath, json.dumps(filedata), cursor) self.maybe_commit() def save_file_data(self, path, filedata, cursor = None): """ Save arbitrary data associated with a file. Arguments: path (string): the full path to the file that the data is to be associated with. filedata (object): arbitrary data – will be converted to JSON. cursor (DB Cursor): if a cursor is passed by the caller, this function will not call conn.commit() even if autocommit is True. """ if cursor is None: cur = self.conn.cursor() else: cur = cursor hash = self.add_file(path, cursor) qry = """ INSERT INTO file_data (hash, data) VALUES (%s, %s::json) ON CONFLICT (hash) DO UPDATE SET data = EXCLUDED.data; """ cur.execute(qry, (hash, filedata)) if cursor is None: self.maybe_commit() # We do not commit if we've been passed a cursor, instead # we assume that we are in the middle of a transaction def get_file_data(self, path, cursor = None): """ Retrieve arbitrary data associated with a file. """ if cursor is None: cur = self.conn.cursor() else: cur = cursor realpath = configuration.translate_path(path) hash = file_hash(realpath) qry = """ SELECT data FROM file_data WHERE hash = %s; """ cur.execute(qry, (hash,)) res = cur.fetchone() if cursor is None: self.maybe_commit() # We do not commit if we've been passed a cursor, instead # we assume that we are in the middle of a transaction return res[0] def surveys (self, include_archived = False): """ Return list of survey definitions. """ if self.conn is None: self.connect() if include_archived: qry = """ SELECT meta, schema FROM public.projects; """ else: qry = """ SELECT meta, schema FROM public.projects WHERE NOT (meta->'archived')::boolean IS true """ with self.conn: with self.conn.cursor() as cursor: cursor.execute(qry) results = cursor.fetchall() surveys = [] for r in results: if r[0]: r[0]['schema'] = r[1] surveys.append(r[0]) return surveys # TODO Does this need tweaking on account of #246? def apply_survey_configuration(self, cursor = None): if cursor is None: cur = self.conn.cursor() else: cur = cursor qry = """ INSERT INTO labels (name, data) SELECT l.key, l.value FROM file_data fd, jsonb_each(fd.data->'labels') l WHERE fd.data::jsonb ? 'labels' ON CONFLICT (name) DO UPDATE SET data = excluded.data; """ cur.execute(qry) if cursor is None: self.maybe_commit() # We do not commit if we've been passed a cursor, instead # we assume that we are in the middle of a transaction def add_info(self, key, value, cursor = None): """ Add an item of information to the project """ if cursor is None: cur = self.conn.cursor() else: cur = cursor qry = """ INSERT INTO info (key, value) VALUES(%s, %s) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value; """ cur.execute(qry, (key, value)) if cursor is None: self.maybe_commit() # We do not commit if we've been passed a cursor, instead # we assume that we are in the middle of a transaction def get_info(self, key, cursor = None): """ Retrieve an item of information from the project """ if cursor is None: cur = self.conn.cursor() else: cur = cursor qry = "SELECT value FROM info WHERE key = %s;" cur.execute(qry, (key,)) res = cur.fetchone() if cursor is None: self.maybe_commit() # We do not commit if we've been passed a cursor, instead # we assume that we are in the middle of a transaction return res def del_info(self, key, cursor = None): """ Remove a an item of information from the project """ if cursor is None: cur = self.conn.cursor() else: cur = cursor qry = "DELETE FROM info WHERE key = %s;" cur.execute(qry, (key,)) if cursor is None: self.maybe_commit() # We do not commit if we've been passed a cursor, instead # we assume that we are in the middle of a transaction def del_sequence_final(self, sequence, cursor = None): """ Remove final data for a sequence. """ if cursor is None: cur = self.conn.cursor() else: cur = cursor qry = "DELETE FROM files WHERE hash = (SELECT hash FROM final_lines_files WHERE sequence = %s);" cur.execute(qry, (sequence,)) if cursor is None: self.maybe_commit() # We do not commit if we've been passed a cursor, instead # we assume that we are in the middle of a transaction def adjust_planner(self, cursor = None): """ Adjust estimated times on the planner """ if cursor is None: cur = self.conn.cursor() else: cur = cursor qry = "CALL adjust_planner();" cur.execute(qry) if cursor is None: self.maybe_commit() # We do not commit if we've been passed a cursor, instead # we assume that we are in the middle of a transaction def housekeep_event_log(self, cursor = None): """ Call housekeeping actions on the event log """ if cursor is None: cur = self.conn.cursor() else: cur = cursor qry = "CALL augment_event_data();" cur.execute(qry) qry = "CALL scan_placeholders();" cur.execute(qry) if cursor is None: self.maybe_commit() # We do not commit if we've been passed a cursor, instead # we assume that we are in the middle of a transaction def run_daily_tasks(self, cursor = None): """ Run once-a-day tasks """ if cursor is None: cur = self.conn.cursor() else: cur = cursor qry = "CALL log_midnight_shots();" cur.execute(qry) if cursor is None: self.maybe_commit() # We do not commit if we've been passed a cursor, instead # we assume that we are in the middle of a transaction