Compare commits

..

48 Commits

Author SHA1 Message Date
D. Berge
ac9353c101 Add database upgrade file 31. 2023-10-17 12:27:06 +02:00
D. Berge
6b5070e634 Add event changes API endpoint description 2023-10-17 12:15:41 +02:00
D. Berge
09ff96ceee Add events change API endpoint 2023-10-17 11:15:36 +02:00
D. Berge
f231acf109 Add events change middleware 2023-10-17 11:15:06 +02:00
D. Berge
e576e1662c Add library function returning event changes after given epoch 2023-10-17 11:13:58 +02:00
D. Berge
73335f9c1e Merge branch '136-add-line-change-time-log-pseudoevent' into 'devel'
Resolve "Add line change time log pseudoevent"

Closes #136

See merge request wgp/dougal/software!45
2023-10-04 12:50:49 +00:00
D. Berge
7b6b81dbc5 Add more debugging statements 2023-10-04 14:50:12 +02:00
D. Berge
2e11c574c2 Throw rather than return.
Otherwise the finally {} block won't run.
2023-10-04 14:49:35 +02:00
D. Berge
d07565807c Do not retry immediately 2023-10-04 14:49:09 +02:00
D. Berge
6eccbf215a There should be no need to await.
That is because the queue handler will, in theory, only ever
process one event at a time.
2023-09-30 21:29:15 +02:00
D. Berge
8abc05f04e Remove dead code 2023-09-30 21:29:15 +02:00
D. Berge
8f587467f9 Add comment 2023-09-30 21:29:15 +02:00
D. Berge
3d7a91c7ff Rewrite ReportLineChangeTime 2023-09-30 21:29:15 +02:00
D. Berge
3fd408074c Support passing array in opts.sequences to event.list() 2023-09-30 21:29:15 +02:00
D. Berge
f71cbd8f51 Add unique utility function 2023-09-30 21:29:15 +02:00
D. Berge
915df8ac16 Add handler for creation of line change time events 2023-09-30 21:29:15 +02:00
D. Berge
d5ecb08a2d Allow switching to event entry by time.
A ‘Timed’ button is shown when a new (not edited) event is in
the event entry dialogue and the event has sequence and/or
point values. Pressing the button deletes the sequence/point
information and sets the date and time fields to current time.

Fixes #277.
2023-09-30 21:26:32 +02:00
D. Berge
9388cd4861 Make daily_tasks work with new project configuration 2023-09-30 20:36:46 +02:00
D. Berge
180590b411 Mark events as being automatically generated 2023-09-30 01:42:27 +02:00
D. Berge
4ec37539bf Add utils to work with Postgres ranges 2023-09-30 01:41:45 +02:00
D. Berge
8755fe01b6 Refactor events.list.
The SQL has been simplified and the following changes made:

- The `sequence` argument now can only take one individual
  sequence, not a list of sequences.
- A new `sequences` argument is recognised. It takes a list
  of sequences (as a string).
- A new `label` argument is recognised. It takes a label
  name and returns events containing that label.
- A new `jpq` argument is recognised. It takes a JSONPath
  string which is applied to `meta` with jsonb_path_exists(),
  returning any events for which the JSON path expression
  matches.
2023-09-30 01:37:22 +02:00
D. Berge
0bfe54e0c2 Include the meta attribute when posting events 2023-09-30 01:36:18 +02:00
D. Berge
29bc689b84 Merge branch '276-add-soft-start-event-detection' into 'devel'
Resolve "Add soft start event detection"

Closes #276

See merge request wgp/dougal/software!44
2023-09-29 15:02:57 +00:00
D. Berge
65682febc7 Add soft start and full volume events detection 2023-09-29 17:02:03 +02:00
D. Berge
d408665d62 Write meta info to automatic events 2023-09-29 16:49:27 +02:00
D. Berge
64fceb0a01 Merge branch '127-sol-eol-events-not-being-inserted-in-the-log-automatically' into 'devel'
Resolve "SOL / EOL events not being inserted in the log automatically"

Closes #127

See merge request wgp/dougal/software!43
2023-09-29 14:17:46 +00:00
D. Berge
ab58e578c9 Use DEBUG library throughout 2023-09-29 16:16:33 +02:00
D. Berge
0e58b8fa5b Refactor code to identify candidate schemas.
As part of the refactoring, we took into account a slight payload
format change (project configuration details are under the `data`
attribute).
2023-09-29 16:13:35 +02:00
D. Berge
99ac082f00 Use common naming convention both online and offline 2023-09-29 16:11:44 +02:00
D. Berge
4d3fddc051 Merge branch '274-use-new-db-event-notifier-for-event-processing-handlers' into 'devel'
Resolve "Use new DB event notifier for event processing handlers"

Closes #275, #230, and #274

See merge request wgp/dougal/software!42
2023-09-29 14:03:00 +00:00
D. Berge
42456439a9 Remove ad-hoc notifier 2023-09-29 15:59:12 +02:00
D. Berge
ee0c0e7308 Replace ad-hoc notifier with pg-listen based version 2023-09-29 15:59:12 +02:00
D. Berge
998c272bf8 Add var/* to .gitignore 2023-09-29 15:59:12 +02:00
D. Berge
daddd1f0e8 Add script to rewrite packet captures IP and MAC addresses.
Closes #230.
2023-09-29 15:58:59 +02:00
D. Berge
17f20535cb Cope with fragmented UDP packets.
Fixes #275.

Use this as the systemd unit file to run as a service:

[Unit]
Description=Dougal Network Packet Capture
After=network.target remote-fs.target nss-lookup.target

[Service]
ExecStart=/srv/dougal/software/sbin/packet-capture.sh
ExecStop=/bin/kill -s QUIT $MAINPID
Restart=always
User=root
Group=users
Environment=PATH=/usr/bin:/usr/sbin:/usr/local/bin
Environment=INS_HOST=172.31.10.254
WorkingDirectory=/srv/dougal/software/var/
SyslogIdentifier=dougal.pcap

[Install]
WantedBy=multi-user.target
2023-09-29 15:28:11 +02:00
D. Berge
0829ea3ea1 Save a copy of the headers not the original.
Otherwise ExpressJS will complain about trying to modify
headers that have already been sent.
2023-09-24 12:17:16 +02:00
D. Berge
2069d9c3d7 Remove dead code 2023-09-24 12:15:06 +02:00
D. Berge
8a2d526c50 Ignore schema attribute in PATCH payload.
Fixes #273.
2023-09-24 12:14:20 +02:00
D. Berge
8ad96d6f73 Ensure that requiredFields is always defined.
Otherwise, `Object.entries(requiredFields)` may fail.
2023-09-24 11:59:26 +02:00
D. Berge
947faf8c05 Provide default glob specification for map layer imports 2023-09-24 11:34:10 +02:00
D. Berge
a948556455 Fail gracefully if map layer data does not exist.
Fixes #272.
2023-09-24 11:33:32 +02:00
D. Berge
835384b730 Apply path conversion to QC definition files 2023-09-23 22:50:09 +02:00
D. Berge
c5b93794f4 Move path conversion to general utilities 2023-09-23 13:44:53 +02:00
D. Berge
056cd32f0e Merge branch '271-qc-results-not-being-refreshed' into 'devel'
Resolve "QC results not being refreshed"

Closes #271

See merge request wgp/dougal/software!41
2023-09-18 10:08:35 +00:00
D. Berge
49bb413110 Merge branch '270-real-time-interface-stopped-working' into 'devel'
Resolve "Real-time interface stopped working"

Closes #270

See merge request wgp/dougal/software!40
2023-09-18 10:08:27 +00:00
D. Berge
ceccc42050 Don't cache response ETags for QC endpoints 2023-09-18 12:06:38 +02:00
D. Berge
aa3379e1c6 Adapt RTI save function to refactored project configuration in DB 2023-09-18 11:58:55 +02:00
D. Berge
4063af0e25 Merge branch '268-inline-crossline-errors-no-longer-being-calculated' into 'devel'
Resolve "Inline/crossline errors no longer being calculated"

Closes #268

See merge request wgp/dougal/software!39
2023-09-15 18:03:51 +00:00
34 changed files with 1012 additions and 183 deletions

1
.gitignore vendored
View File

@@ -12,3 +12,4 @@ etc/surveys/*.yaml
!etc/surveys/_*.yaml
etc/ssl/*
etc/config.yaml
var/*

View File

@@ -11,11 +11,9 @@ from datastore import Datastore
if __name__ == '__main__':
print("Reading configuration")
surveys = configuration.surveys()
print("Connecting to database")
db = Datastore()
surveys = db.surveys()
print("Reading surveys")
for survey in surveys:

View File

@@ -115,7 +115,10 @@ if __name__ == '__main__':
process(layer_name, layer, realprefix)
else:
elif os.path.isdir(realprefix):
if not "globs" in layer:
layer["globs"] = [ "**/*.geojson" ]
for globspec in layer["globs"]:
for physical_filepath in pathlib.Path(realprefix).glob(globspec):

View File

@@ -0,0 +1,104 @@
-- Add event_log_changes function
--
-- New schema version: 0.4.4
--
-- ATTENTION:
--
-- ENSURE YOU HAVE BACKED UP THE DATABASE BEFORE RUNNING THIS SCRIPT.
--
--
-- NOTE: This upgrade affects all schemas in the database.
-- NOTE: Each application starts a transaction, which must be committed
-- or rolled back.
--
-- This adds a function event_log_changes which returns the subset of
-- events from event_log_full which have been modified on or after a
-- given timestamp.
--
-- To apply, run as the dougal user:
--
-- psql <<EOF
-- \i $THIS_FILE
-- COMMIT;
-- EOF
--
-- NOTE: It can be applied multiple times without ill effect.
--
BEGIN;
CREATE OR REPLACE PROCEDURE pg_temp.show_notice (notice text) AS $$
BEGIN
RAISE NOTICE '%', notice;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE PROCEDURE pg_temp.upgrade_survey_schema (schema_name text) AS $outer$
BEGIN
RAISE NOTICE 'Updating schema %', schema_name;
-- We need to set the search path because some of the trigger
-- functions reference other tables in survey schemas assuming
-- they are in the search path.
EXECUTE format('SET search_path TO %I,public', schema_name);
CREATE OR REPLACE FUNCTION event_log_changes(ts0 timestamptz)
RETURNS SETOF event_log_full
LANGUAGE sql
AS $$
SELECT *
FROM event_log_full
WHERE lower(validity) > ts0 OR upper(validity) IS NOT NULL AND upper(validity) > ts0
ORDER BY lower(validity);
$$;
END;
$outer$ LANGUAGE plpgsql;
CREATE OR REPLACE PROCEDURE pg_temp.upgrade () AS $outer$
DECLARE
row RECORD;
current_db_version TEXT;
BEGIN
SELECT value->>'db_schema' INTO current_db_version FROM public.info WHERE key = 'version';
IF current_db_version >= '0.4.4' THEN
RAISE EXCEPTION
USING MESSAGE='Patch already applied';
END IF;
IF current_db_version != '0.4.3' THEN
RAISE EXCEPTION
USING MESSAGE='Invalid database version: ' || current_db_version,
HINT='Ensure all previous patches have been applied.';
END IF;
FOR row IN
SELECT schema_name FROM information_schema.schemata
WHERE schema_name LIKE 'survey_%'
ORDER BY schema_name
LOOP
CALL pg_temp.upgrade_survey_schema(row.schema_name);
END LOOP;
END;
$outer$ LANGUAGE plpgsql;
CALL pg_temp.upgrade();
CALL pg_temp.show_notice('Cleaning up');
DROP PROCEDURE pg_temp.upgrade_survey_schema (schema_name text);
DROP PROCEDURE pg_temp.upgrade ();
CALL pg_temp.show_notice('Updating db_schema version');
INSERT INTO public.info VALUES ('version', '{"db_schema": "0.4.4"}')
ON CONFLICT (key) DO UPDATE
SET value = public.info.value || '{"db_schema": "0.4.4"}' WHERE public.info.key = 'version';
CALL pg_temp.show_notice('All done. You may now run "COMMIT;" to persist the changes');
DROP PROCEDURE pg_temp.show_notice (notice text);
--
--NOTE Run `COMMIT;` now if all went well
--

View File

@@ -44,7 +44,7 @@
<template v-slot:activator="{ on, attrs }">
<v-text-field
v-model="tsDate"
:disabled="!!(sequence || point || entrySequence || entryPoint)"
:disabled="!!(entrySequence || entryPoint)"
label="Date"
suffix="UTC"
prepend-icon="mdi-calendar"
@@ -64,7 +64,7 @@
<v-col>
<v-text-field
v-model="tsTime"
:disabled="!!(sequence || point || entrySequence || entryPoint)"
:disabled="!!(entrySequence || entryPoint)"
label="Time"
suffix="UTC"
prepend-icon="mdi-clock-outline"
@@ -256,6 +256,15 @@
>
Cancel
</v-btn>
<v-btn v-if="!id && (entrySequence || entryPoint)"
color="info"
text
title="Enter an event by time"
@click="timed"
>
<v-icon left small>mdi-clock-outline</v-icon>
Timed
</v-btn>
<v-spacer></v-spacer>
<v-btn
:disabled="!canSave"
@@ -632,6 +641,14 @@ export default {
}
},
timed () {
const tstamp = (new Date()).toISOString();
this.entrySequence = null;
this.entryPoint = null;
this.tsDate = tstamp.substr(0, 10);
this.tsTime = tstamp.substr(11, 8);
},
close () {
this.entryLabels = this.selectedLabels.map(this.labelToItem)
this.$emit("input", false);

View File

@@ -181,6 +181,9 @@ app.map({
post: [ mw.auth.access.write, mw.event.post ],
put: [ mw.auth.access.write, mw.event.put ],
delete: [ mw.auth.access.write, mw.event.delete ],
'changes/:since': {
get: [ mw.event.changes ]
},
// TODO Rename -/:sequence → sequence/:sequence
'-/:sequence/': { // NOTE: We need to avoid conflict with the next endpoint ☹
get: [ mw.event.sequence.get ],
@@ -200,25 +203,25 @@ app.map({
'/project/:project/qc': {
'/results': {
// Get all QC results for :project
get: [ mw.qc.results.get ],
get: [ mw.etag.noSave, mw.qc.results.get ],
// Delete all QC results for :project
delete: [ mw.auth.access.write, mw.qc.results.delete ],
delete: [ mw.etag.noSave, mw.auth.access.write, mw.qc.results.delete ],
'/accept': {
post: [ mw.auth.access.write, mw.qc.results.accept ]
post: [ mw.etag.noSave, mw.auth.access.write, mw.qc.results.accept ]
},
'/unaccept': {
post: [ mw.auth.access.write, mw.qc.results.unaccept ]
post: [ mw.etag.noSave, mw.auth.access.write, mw.qc.results.unaccept ]
},
'/sequence/:sequence': {
// Get QC results for :project, :sequence
get: [ mw.qc.results.get ],
get: [ mw.etag.noSave, mw.qc.results.get ],
// Delete QC results for :project, :sequence
delete: [ mw.auth.access.write, mw.qc.results.delete ]
delete: [ mw.etag.noSave, mw.auth.access.write, mw.qc.results.delete ]
}
}
},

View File

@@ -33,7 +33,7 @@ function saveResponse (res) {
const cache = getCache(res);
const req = res.req;
console.log(`Saving ETag: ${req.method} ${req.url}${etag}`);
const headers = res.getHeaders();
const headers = structuredClone(res.getHeaders());
delete headers["set-cookie"];
cache[req.url] = {etag, headers};
}

View File

@@ -0,0 +1,14 @@
const { event } = require('../../../lib/db');
const json = async function (req, res, next) {
try {
const response = await event.changes(req.params.project, req.params.since, req.query);
res.status(200).send(response);
next();
} catch (err) {
next(err);
}
};
module.exports = json;

View File

@@ -6,5 +6,6 @@ module.exports = {
post: require('./post'),
put: require('./put'),
patch: require('./patch'),
delete: require('./delete')
delete: require('./delete'),
changes: require('./changes')
}

View File

@@ -69,7 +69,7 @@ class DetectFDSP {
point: prev._point,
remarks: "Last shotpoint of the day",
labels: ["LDSP", "Prod"],
meta: {auto: true, insertedBy: this.constructor.name}
meta: {auto: true, author: `*${this.constructor.name}*`}
};
const fdsp = {
@@ -77,7 +77,7 @@ class DetectFDSP {
point: cur._point,
remarks: "First shotpoint of the day",
labels: ["FDSP", "Prod"],
meta: {auto: true, insertedBy: this.constructor.name}
meta: {auto: true, author: `*${this.constructor.name}*`}
};
INFO("LDSP", ldsp);

View File

@@ -0,0 +1,128 @@
const { schema2pid } = require('../../lib/db/connection');
const { event } = require('../../lib/db');
const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
class DetectSoftStart {
/* Data may come much faster than we can process it, so we put it
* in a queue and process it at our own pace.
*
* The run() method fills the queue with the necessary data and then
* calls processQueue().
*
* The processQueue() method looks takes the first two elements in
* the queue and processes them if they are not already being taken
* care of by a previous processQueue() call this will happen when
* data is coming in faster than it can be processed.
*
* If the processQueue() call is the first to see the two bottommost
* two elements, it will process them and, when finished, it will set
* the `isPending` flag of the bottommost element to `false`, thus
* letting the next call know that it has work to do.
*
* If the queue was empty, run() will set the `isPending` flag of its
* first element to a falsy value, thus bootstrapping the process.
*/
static MAX_QUEUE_SIZE = 125000;
queue = [];
async processQueue () {
DEBUG("Queue length", this.queue.length)
while (this.queue.length > 1) {
if (this.queue[0].isPending) {
DEBUG("Queue busy");
setImmediate(() => this.processQueue());
return;
}
const prev = this.queue.shift();
const cur = this.queue[0];
try {
// DEBUG("Previous", prev);
// DEBUG("Current", cur);
// TODO:
// Consider whether to remember if soft start / full volume events
// have already been emitted and wait until there is an online/offline
// transition before re-emitting.
// This may or may not be a good idea.
// Look for a soft start or full volume event
if (cur.num_active >= 1 && !prev.num_active && cur.num_active < cur.num_guns) {
INFO("Soft start detected @", cur.tstamp);
const projectId = await schema2pid(cur._schema ?? prev._schema);
// TODO: Try and grab the corresponding comment from the configuration?
const payload = {
tstamp: cur.tstamp,
remarks: "Soft start",
labels: [ "Daily", "Guns", "Prod" ],
meta: {auto: true, author: `*${this.constructor.name}*`}
};
DEBUG("Posting event", projectId, payload);
await event.post(projectId, payload);
} else if (cur.num_active == cur.num_guns && prev.num_active < cur.num_active) {
INFO("Full volume detected @", cur.tstamp);
const projectId = await schema2pid(cur._schema ?? prev._schema);
// TODO: Try and grab the corresponding comment from the configuration?
const payload = {
tstamp: cur.tstamp,
remarks: "Full volume",
labels: [ "Daily", "Guns", "Prod" ],
meta: {auto: true, author: `*${this.constructor.name}*`}
};
DEBUG("Posting event", projectId, payload);
await event.post(projectId, payload);
}
// Processing of this shot has already been completed.
// The queue can now move forward.
} catch (err) {
ERROR("DetectSoftStart Error")
ERROR(err);
} finally {
cur.isPending = false;
}
}
}
async run (data) {
if (!data || data.channel !== "realtime") {
return;
}
if (!(data.payload && data.payload.new && data.payload.new.meta)) {
return;
}
const meta = data.payload.new.meta;
if (this.queue.length < DetectSoftStart.MAX_QUEUE_SIZE) {
this.queue.push({
isPending: this.queue.length,
_schema: meta._schema,
tstamp: meta.tstamp ?? meta.time,
shot: meta.shot,
lineStatus: meta.lineStatus,
_sequence: meta._sequence,
_point: meta._point,
lineName: meta.lineName,
num_guns: meta.num_guns,
num_active: meta.num_active
});
} else {
// FIXME Change to alert
ALERT("DetectSoftStart queue full at", this.queue.length);
}
this.processQueue();
}
}
module.exports = DetectSoftStart;

View File

@@ -1,23 +1,24 @@
const { schema2pid } = require('../../lib/db/connection');
const { event } = require('../../lib/db');
const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
class DetectSOLEOL {
/* Data may come much faster than we can process it, so we put it
* in a queue and process it at our own pace.
*
*
* The run() method fills the queue with the necessary data and then
* calls processQueue().
*
*
* The processQueue() method looks takes the first two elements in
* the queue and processes them if they are not already being taken
* care of by a previous processQueue() call this will happen when
* data is coming in faster than it can be processed.
*
*
* If the processQueue() call is the first to see the two bottommost
* two elements, it will process them and, when finished, it will set
* the `isPending` flag of the bottommost element to `false`, thus
* letting the next call know that it has work to do.
*
*
* If the queue was empty, run() will set the `isPending` flag of its
* first element to a falsy value, thus bootstrapping the process.
*/
@@ -26,8 +27,10 @@ class DetectSOLEOL {
queue = [];
async processQueue () {
DEBUG("Queue length", this.queue.length)
while (this.queue.length > 1) {
if (this.queue[0].isPending) {
DEBUG("Queue busy");
setImmediate(() => this.processQueue());
return;
}
@@ -38,9 +41,15 @@ class DetectSOLEOL {
const sequence = Number(cur._sequence);
try {
DEBUG("Sequence", sequence);
// DEBUG("Previous", prev);
// DEBUG("Current", cur);
if (prev.lineName == cur.lineName && prev._sequence == cur._sequence &&
prev.lineStatus != "online" && cur.lineStatus == "online" && sequence) {
INFO("Transition to ONLINE detected");
// DEBUG(cur);
// DEBUG(prev);
// console.log("TRANSITION TO ONLINE", prev, cur);
// Check if there are already FSP, FGSP events for this sequence
@@ -59,16 +68,22 @@ class DetectSOLEOL {
sequence,
point: cur._point,
remarks,
labels
labels,
meta: {auto: true, author: `*${this.constructor.name}*`}
}
// console.log(projectId, payload);
INFO("Posting event", projectId, payload);
await event.post(projectId, payload);
} else {
// A first shot point has been already entered in the log,
// so we have nothing to do here.
INFO("FSP already in the log. Doing nothing");
}
} else if (prev.lineStatus == "online" && cur.lineStatus != "online") {
INFO("Transition to OFFLINE detected");
// DEBUG(cur);
// DEBUG(prev);
// console.log("TRANSITION TO OFFLINE", prev, cur);
// Check if there are already LSP, LGSP events for this sequence
@@ -87,14 +102,17 @@ class DetectSOLEOL {
sequence,
point: prev._point,
remarks,
labels
labels,
meta: {auto: true, author: `*${this.constructor.name}*`}
}
// console.log(projectId, payload);
INFO("Posting event", projectId, payload);
await event.post(projectId, payload);
} else {
// A first shot point has been already entered in the log,
// so we have nothing to do here.
INFO("LSP already in the log. Doing nothing");
}
}
// Processing of this shot has already been completed.

View File

@@ -1,5 +1,7 @@
const Handlers = [
require('./detect-soleol'),
require('./detect-soft-start'),
require('./report-line-change-time'),
require('./detect-fdsp')
];

View File

@@ -0,0 +1,304 @@
const { schema2pid } = require('../../lib/db/connection');
const { event, project } = require('../../lib/db');
const { withinValidity } = require('../../lib/utils/ranges');
const unique = require('../../lib/utils/unique');
const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
class ReportLineChangeTime {
/* Data may come much faster than we can process it, so we put it
* in a queue and process it at our own pace.
*
* The run() method fills the queue with the necessary data and then
* calls processQueue().
*
* The processQueue() method looks takes the first two elements in
* the queue and processes them if they are not already being taken
* care of by a previous processQueue() call this will happen when
* data is coming in faster than it can be processed.
*
* If the processQueue() call is the first to see the two bottommost
* two elements, it will process them and, when finished, it will set
* the `isPending` flag of the bottommost element to `false`, thus
* letting the next call know that it has work to do.
*
* If the queue was empty, run() will set the `isPending` flag of its
* first element to a falsy value, thus bootstrapping the process.
*/
static MAX_QUEUE_SIZE = 125000;
queue = [];
author = `*${this.constructor.name}*`;
async processQueue () {
DEBUG("Queue length", this.queue.length)
while (this.queue.length > 0) {
if (this.queue[0].isPending) {
DEBUG("Queue busy");
setTimeout(() => this.processQueue(), 1000); // We're not in a hurry
return;
}
const cur = this.queue.shift();
const next = this.queue[0];
const projectId = cur.pid;
// Are we being called because of a LGSP or because of a FGSP?
const forward = (cur.old?.labels?.includes("LGSP") || cur.new?.labels?.includes("LGSP"));
if (!projectId) {
throw {message: "No projectID found in event", cur};
return;
}
async function getConfiguration (projectId) {
return await project.configuration.get(projectId);
}
async function getLineChangeTime (data, forward = false) {
if (forward) {
const ospEvents = await event.list(projectId, {label: "FGSP"});
// DEBUG("ospEvents", ospEvents);
const osp = ospEvents.filter(i => i.tstamp > data.tstamp).pop();
DEBUG("fsp", osp);
// DEBUG("data", data);
if (osp) {
DEBUG("lineChangeTime", osp.tstamp - data.tstamp);
return { lineChangeTime: osp.tstamp - data.tstamp, osp };
}
} else {
const ospEvents = await event.list(projectId, {label: "LGSP"});
// DEBUG("ospEvents", ospEvents);
const osp = ospEvents.filter(i => i.tstamp < data.tstamp).shift();
DEBUG("lsp", osp);
// DEBUG("data", data);
if (osp) {
DEBUG("lineChangeTime", data.tstamp - osp.tstamp);
return { lineChangeTime: data.tstamp - osp.tstamp, osp };
}
}
}
function parseInterval (dt) {
const daySeconds = (dt/1000) % 86400;
const d = Math.floor((dt/1000) / 86400);
const dateObject = new Date(null);
dateObject.setSeconds(daySeconds);
const [ h, m, s ] = dateObject.toISOString().slice(11, 19).split(":").map(Number);
return {d, h, m, s};
}
function formatInterval (i) {
let str = "";
for (let [k, v] of Object.entries(i)) {
if (v) {
str += " " + v + " " + k;
}
}
return str.trim();
}
const deleteStaleEvents = async (seq) => {
if (seq) {
DEBUG("Will delete lct events related to sequence(s)", seq);
const jpq = `$."${this.author}"`;
const opts = {jpq};
if (Array.isArray(seq)) {
opts.sequences = unique(seq).filter(i => !!i);
} else {
opts.sequence = seq;
}
const staleEvents = await event.list(projectId, opts);
DEBUG(staleEvents.length ?? 0, "events to delete");
for (let staleEvent of staleEvents) {
DEBUG(`Deleting event id ${staleEvent.id} (seq = ${staleEvent.sequence}, point = ${staleEvent.point})`);
await event.del(projectId, staleEvent.id);
}
}
}
const createLineChangeTimeEvents = async (lineChangeTime, data, osp) => {
const events = [];
const cfg = (await project.configuration.get(projectId));
const nlcd = cfg?.production?.nominalLineChangeDuration * 60*1000; // m → ms
DEBUG("nlcd", nlcd);
if (nlcd && lineChangeTime > nlcd) {
const excess = lineChangeTime-nlcd;
const excessString = formatInterval(parseInterval(excess));
DEBUG("excess", excess, excessString);
// ref: The later of the two events
const ref = forward ? osp : data;
const payload = {
// tstamp: new Date(ref.tstamp-1),
sequence: ref.sequence,
point: ref.point,
remarks: `_Nominal line change duration exceeded by ${excessString}_`,
labels: [ "Nav", "Prod" ],
meta: {
auto: true,
author: this.author,
[this.author]: {
parents: [
data.id,
osp.id
],
type: "excess",
value: excess
}
}
}
events.push(payload);
DEBUG("Created line change duration exceeded event", projectId, payload);
}
const lctString = formatInterval(parseInterval(lineChangeTime));
// ref: The later of the two events
const ref = forward ? osp : data;
const payload = {
// tstamp: new Date(ref.tstamp-1),
sequence: ref.sequence,
point: ref.point,
remarks: `Line change time: ${lctString}`,
labels: [ "Nav", "Prod" ],
meta: {
auto: true,
author: this.author,
[this.author]: {
parents: [
data.id,
osp.id
],
type: "lineChangeTime",
value: lineChangeTime
}
}
};
events.push(payload);
DEBUG("Created line change duration event", projectId, payload);
return events;
}
const maybePostEvent = async (projectId, payload) => {
DEBUG("Posting event", projectId, payload);
await event.post(projectId, payload);
}
try {
// DEBUG("Previous", prev);
DEBUG("Current", cur);
DEBUG("Forward search", forward);
// We have these scenarios to consider:
// INSERT:
// `old` will be NULL
// Add event with line change time:
// - match validity with `new`
// - meta.ReportLineChangeTime.link refers to new.uid (or new.id?)
// UPDATE:
// `old` is not NULL
// `new` is not NULL
// Delete previous event from event_log (not event_log_full)
// Add event with line change time:
// - match validity with `new`
// - meta.ReportLineChangeTime.link refers to new.uid (or new.id?)
// DELETE:
// `old` is not NULL
// `new` will be NULL
// Delete previous event from event_log (not event_log_full)
await deleteStaleEvents([cur.old?.sequence, cur.new?.sequence]);
if (cur.operation == "INSERT") {
// NOTE: UPDATE on the event_log view translates to one UPDATE plus one INSERT
// on event_log_full, so we don't need to worry about UPDATE here.
const data = cur.new;
DEBUG("INSERT seen: will add lct events related to ", data.id);
if (withinValidity(data.validity)) {
DEBUG("Event within validity period", data.validity, new Date());
data.tstamp = new Date(data.tstamp);
const { lineChangeTime, osp } = await getLineChangeTime(data, forward);
if (lineChangeTime) {
const events = await createLineChangeTimeEvents(lineChangeTime, data, osp);
if (events?.length) {
DEBUG("Deleting other events for sequence", events[0].sequence);
await deleteStaleEvents(events[0].sequence);
}
for (let payload of events) {
await maybePostEvent(projectId, payload);
}
}
} else {
DEBUG("Event outside of validity range", data.validity, "lct events not inserted");
}
}
// Processing of this shot has already been completed.
// The queue can now move forward.
} catch (err) {
ERROR("ReportLineChangeTime Error")
ERROR(err);
} finally {
if (next) {
next.isPending = false;
}
}
}
}
async run (data) {
DEBUG("Seen", data);
if (!data || data.channel !== "event") {
return;
}
if (!(data.payload?.new?.labels) && !(data.payload?.old?.labels)) {
return;
}
const n = data.payload.new;
const o = data.payload.old;
if (!n?.labels?.includes("FGSP") && !o?.labels?.includes("FGSP") &&
!n?.labels?.includes("LGSP") && !o?.labels?.includes("LGSP")) {
return;
}
if (this.queue.length < ReportLineChangeTime.MAX_QUEUE_SIZE) {
const item = {
...structuredClone(data.payload),
isPending: this.queue.length,
};
DEBUG("Queueing", item);
this.queue.push(item);
} else {
ALERT("ReportLineChangeTime queue full at", this.queue.length);
}
this.processQueue();
}
}
module.exports = ReportLineChangeTime;

View File

@@ -1,4 +1,4 @@
const { listen } = require('../ws/db');
const { listen } = require('../lib/db/notify');
const channels = require('../lib/db/channels');
const handlers = require('./handlers').init();
const { ERROR, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);

View File

@@ -0,0 +1,61 @@
const { setSurvey } = require('../connection');
const { replaceMarkers } = require('../../utils');
function parseValidity (row) {
if (row.validity) {
const rx = /^(.)("([\d :.+-]+)")?,("([\d :.+-]+)")?([\]\)])$/;
const m = row.validity.match(rx);
row.validity = [ m[1], m[3], m[5], m[6] ];
}
return row;
}
function transform (row) {
if (row.validity[2]) {
return {
uid: row.uid,
id: row.id,
is_deleted: true
}
} else {
row.is_deleted = false;
row.has_edits = row.id != row.uid;
row.modified_on = row.validity[1];
delete row.uid;
delete row.validity;
return row;
}
}
function unique (rows) {
const o = {};
rows.forEach(row => o[row.id] = row);
return Object.values(o);
}
/**
* Get the event change history from a given epoch (ts0),
* for all events.
*/
async function changes (projectId, ts0, opts = {}) {
if (!projectId || !ts0) {
throw {status: 400, message: "Invalid request" };
return;
}
const client = await setSurvey(projectId);
const text = `
SELECT *
FROM event_log_changes($1);
`;
const res = await client.query(text, [ts0]);
client.release();
return opts.unique
? unique(res.rows.map(i => transform(replaceMarkers(parseValidity(i)))))
: res.rows.map(i => transform(replaceMarkers(parseValidity(i))));
}
module.exports = changes;

View File

@@ -5,5 +5,6 @@ module.exports = {
post: require('./post'),
put: require('./put'),
patch: require('./patch'),
del: require('./delete')
del: require('./delete'),
changes: require('./changes')
}

View File

@@ -10,25 +10,34 @@ async function list (projectId, opts = {}) {
const offset = Math.abs((opts.page-1)*opts.itemsPerPage) || 0;
const limit = Math.abs(Number(opts.itemsPerPage)) || null;
const filter = opts.sequence
? String(opts.sequence).includes(";")
? [ "sequence = ANY ( $1 )", [ opts.sequence.split(";") ] ]
: [ "sequence = $1", [ opts.sequence ] ]
: opts.date0
? opts.date1
? [ "date(tstamp) BETWEEN SYMMETRIC $1 AND $2", [ opts.date0, opts.date1 ] ]
: [ "date(tstamp) = $1", [ opts.date0 ] ]
: [ "true = true", [] ];
const sequence = opts.sequence && Number(opts.sequence) || null;
const sequences = opts.sequences && (Array.isArray(opts.sequences)
? opts.sequences.map(Number)
: opts.sequences.split(/[^0-9]+/).map(Number)) || null;
const date0 = opts.date0 ?? null;
const date1 = opts.date1 ?? null;
const jpq = opts.jpq || null;
const label = opts.label ?? null;
const text = `
SELECT *
FROM event_log e
WHERE
${filter[0]}
ORDER BY ${sortKey} ${sortDir};
($1::numeric IS NULL OR sequence = $1) AND
($2::numeric[] IS NULL OR sequence = ANY( $2 )) AND
($3::timestamptz IS NULL OR date(tstamp) = $3) AND
($3::timestamptz IS NULL OR
(($4::timestamptz IS NULL AND date(tstamp) = $3) OR
date(tstamp) BETWEEN SYMMETRIC $3 AND $4)) AND
($5::jsonpath IS NULL OR jsonb_path_exists(meta::jsonb, $5::jsonpath)) AND
($6::text IS NULL OR $6 = ANY(labels))
ORDER BY ${sortKey} ${sortDir}
LIMIT ${limit};
`;
const res = await client.query(text, filter[1]);
const values = [ sequence, sequences, date0, date1, jpq, label ];
const res = await client.query(text, values);
client.release();
return res.rows.map(i => replaceMarkers(i));
}

View File

@@ -9,10 +9,10 @@ async function post (projectId, payload, opts = {}) {
const text = `
INSERT
INTO event_log (tstamp, sequence, point, remarks, labels)
VALUES ($1, $2, $3, replace_placeholders($4, $1, $2, $3), $5);
INTO event_log (tstamp, sequence, point, remarks, labels, meta)
VALUES ($1, $2, $3, replace_placeholders($4, $1, $2, $3), $5, $6);
`;
const values = [ p.tstamp, p.sequence, p.point, p.remarks, p.labels ];
const values = [ p.tstamp, p.sequence, p.point, p.remarks, p.labels, p.meta ];
DEBUG("Inserting new values: %O", values);
await client.query(text, values);

View File

@@ -1,6 +1,6 @@
// FIXME This code is in painful need of refactoring
const { DEBUG } = require("DOUGAL_ROOT/debug")(__filename);
const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
const { setSurvey, transaction, pool } = require('../connection');
let last_tstamp = 0;
@@ -8,14 +8,10 @@ let last_tstamp = 0;
async function getAllProjectConfigs () {
const client = await pool.connect();
const res0 = await client.query("SELECT schema FROM projects;");
const text = res0.rows.map(r => {
return `SELECT '${r.schema}' AS schema, data FROM ${r.schema}.file_data WHERE (data->>'archived')::boolean IS NOT true AND data->>'id' IS NOT NULL`;
}).join("\nUNION ALL ");
const res1 = await client.query(text);
const text = `SELECT schema, meta AS data FROM projects;`;
const res = await client.query(text);
client.release();
return res1.rows.map(r => Object.assign(r.data, {schema: r.schema}));
return res.rows;
}
async function getNearestPreplot (candidates) {
@@ -74,9 +70,9 @@ async function getNearestOfflinePreplot (candidates) {
if ("latitude" in candidates[0] && "longitude" in candidates[0]) {
text = `
SELECT
'${c._schema}' AS _schema,
'${c.schema}' AS schema,
ST_Distance(ST_Transform(ST_SetSRID(ST_MakePoint($1, $2), 4326), ST_SRID(geometry)), geometry) AS distance
FROM ${c._schema}.preplot_points
FROM ${c.schema}.preplot_points
ORDER BY distance ASC
LIMIT 1;
`;
@@ -84,9 +80,9 @@ async function getNearestOfflinePreplot (candidates) {
} else if ("easting" in candidates[0] && "northing" in candidates[0]) {
text = `
SELECT
'${c._schema}' AS _schema,
'${c.schema}' AS schema,
ST_Distance(ST_SetSRID(ST_MakePoint($1, $2), ST_SRID(geometry)), geometry) AS distance
FROM ${c._schema}.preplot_points
FROM ${c.schema}.preplot_points
ORDER BY distance ASC
LIMIT 1;
`;
@@ -102,13 +98,13 @@ async function getNearestOfflinePreplot (candidates) {
const results = [];
for (const qry of queries) {
const res = await client.query(qry.text, qry.values);
if (res.rows[0] && res.rows[0]._schema) {
if (res.rows[0] && res.rows[0].schema) {
results.push(res.rows[0]);
}
}
client.release();
const _schema = results.sort( (a, b) => a.distance - b.distance).shift()?._schema;
return candidates.find(c => c._schema == _schema);
const schema = results.sort( (a, b) => a.distance - b.distance).shift()?.schema;
return candidates.find(c => c.schema == schema);
}
async function saveOnline (dataset, opts = {}) {
@@ -141,14 +137,14 @@ async function saveOnline (dataset, opts = {}) {
await client.query(`
INSERT INTO raw_shots
(sequence, line, point, objref, tstamp, geometry, hash)
VALUES ($1, $2, $3, $4, $5, ST_SetSRID(ST_MakePoint($6, $7), (SELECT (data->>'epsg')::integer AS epsg FROM file_data WHERE data ? 'id')), '*online*')
VALUES ($1, $2, $3, $4, $5, ST_SetSRID(ST_MakePoint($6, $7), (select (project_configuration()->>'epsg')::integer as epsg)), '*online*')
ON CONFLICT DO NOTHING;
`, [item.sequence, item.line, item.point, 0, item.tstamp, item.easting, item.northing]);
} else if (item.latitude && item.longitude) {
await client.query(`
INSERT INTO raw_shots
(sequence, line, point, objref, tstamp, geometry, hash)
VALUES ($1, $2, $3, $4, $5, ST_Transform(ST_SetSRID(ST_MakePoint($6, $7), 4326), (SELECT (data->>'epsg')::integer AS epsg FROM file_data WHERE data ? 'id')), '*online*')
VALUES ($1, $2, $3, $4, $5, ST_Transform(ST_SetSRID(ST_MakePoint($6, $7), 4326), (select (project_configuration()->>'epsg')::integer as epsg)), '*online*')
ON CONFLICT DO NOTHING;
`, [item.sequence, item.line, item.point, 0, item.tstamp, item.longitude, item.latitude]);
} else {
@@ -158,8 +154,8 @@ async function saveOnline (dataset, opts = {}) {
}
await transaction.commit(client);
} catch (error) {
console.error("ONLINE DATA INSERT ERROR");
console.error(error);
ERROR("ONLINE DATA INSERT ERROR");
ERROR(error);
await transaction.rollback(client);
} finally {
client.release();
@@ -186,7 +182,7 @@ async function saveOffline (navData, opts = {}) {
} else if (schema && hasEastNorth) {
const text = `
INSERT INTO real_time_inputs (tstamp, geometry, meta)
VALUES ($1, ST_Transform(ST_SetSRID(ST_MakePoint($2, $3), (SELECT (data->>'epsg')::integer AS epsg FROM ${schema}.file_data)), 4326), $4);
VALUES ($1, ST_Transform(ST_SetSRID(ST_MakePoint($2, $3), (select (project_configuration()->>'epsg')::integer as epsg), 4326), $4);
`;
const values = [navData.tstamp, navData.longitude, navData.latitude, navData.payload];
@@ -215,6 +211,37 @@ async function saveOffline (navData, opts = {}) {
client.release();
}
async function getCandidates (navData) {
const configs = await getAllProjectConfigs();
// We just get the bits of interest: pattern and schema
const candidates = configs.map(c => {
if (!c?.data?.online?.line || c?.archived === true) {
return null;
}
const p = c.data.online.line.pattern; // For short
const rx = new RegExp(p.regex, p.flags);
const matches = navData.lineName.match(rx);
if (!matches || ((matches.length+1) < p.captures.length)) {
return null;
}
matches.shift(); // Get rid of the full matched text
const obj = Object.assign({}, navData, {schema: c.schema});
p.captures.forEach( (k, i) => {
obj[k] = matches[i];
});
return obj;
}).filter(c => !!c);
DEBUG("Candidates: %j", candidates.map(c => c.schema));
return candidates;
}
async function save (navData, opts = {}) {
const hasLatLon = ("latitude" in navData && "longitude" in navData);
@@ -222,44 +249,21 @@ async function save (navData, opts = {}) {
const hasLinePoint = ("lineName" in navData && "point" in navData);
if (!(hasLinePoint || hasLatLon || hasEastNorth)) {
// This is of no interest to us
console.warning("Ignoring data without useful values", navData);
NOTICE("Ignoring data without useful values", navData);
return;
}
// DEBUG("navData", navData);
if (navData.online === true) {
// So we have a lineName, see which projects match the line pattern.
// For this we need to get all the project configs
const configs = await getAllProjectConfigs();
// We just get the bits of interest: pattern and schema
const candidates = configs.map(c => {
if (!(c && c.online && c.online.line)) {
return null;
}
const p = c.online.line.pattern; // For short
const rx = new RegExp(p.regex, p.flags);
const matches = navData.lineName.match(rx);
if (!matches || ((matches.length+1) < p.captures.length)) {
return null;
}
matches.shift(); // Get rid of the full matched text
const obj = Object.assign({}, navData, {schema: c.schema});
p.captures.forEach( (k, i) => {
obj[k] = matches[i];
});
return obj;
}).filter(c => !!c);
DEBUG("Candidates: %j", candidates);
// console.log("CANDIDATES", candidates);
const candidates = await getCandidates(navData);
if (candidates.length == 0) {
// This is probably a test line, so we treat it as offline
console.log("No match");
WARNING("No match");
} else {
if (candidates.length == 1) {
// Only one candidate, associate with it
@@ -275,7 +279,7 @@ async function save (navData, opts = {}) {
await saveOnline(candidates.filter(c => c.schema == destinationSchema), opts);
navData.payload._schema = destinationSchema;
} else {
console.log("Nowhere to save to");
WARNING("Nowhere to save to");
}
}
@@ -286,17 +290,18 @@ async function save (navData, opts = {}) {
}
} else {
// We are offline. We only assign _schema once every save_interval seconds at most
// unless there is gun data present.
if (opts.offline_survey_heuristics == "nearest_preplot") {
const now = Date.now();
const do_save = !opts.offline_survey_detect_interval ||
(now - last_tstamp) >= opts.offline_survey_detect_interval;
if (do_save) {
if (do_save || "guns" in navData?.payload) {
const configs = await getAllProjectConfigs();
const candidates = configs.map(c => Object.assign({}, navData, {_schema: c.schema}));
const candidates = await getCandidates(navData);
const bestCandidate = await getNearestOfflinePreplot(candidates);
if (bestCandidate) {
navData.payload._schema = bestCandidate._schema;
navData.payload._schema = bestCandidate.schema;
last_tstamp = now;
}
}

View File

@@ -36,6 +36,9 @@ async function patch (projectId, payload, opts = {}) {
}
}
// We do not allow users to change the schema
delete payload.schema;
const dest = removeNulls(deepMerge(source, payload));
await modify(projectId, dest);
return dest;

View File

@@ -7,10 +7,11 @@ const { INFO, DEBUG, WARNING, ERROR } = require('DOUGAL_ROOT/debug')(__filename)
function checkSyntax (value, type = "project") {
var requiredFields = {};
switch (type) {
case "project":
var requiredFields = {
requiredFields = {
id: "string",
name: "string",
epsg: "number",
@@ -18,7 +19,7 @@ function checkSyntax (value, type = "project") {
};
break;
case "binning":
var requiredFields = {
requiredFields = {
theta: "number",
I_inc: "number",
J_inc: "number",
@@ -28,23 +29,19 @@ function checkSyntax (value, type = "project") {
}
break
case "origin":
var requiredFields = {
requiredFields = {
easting: "number",
northing: "number",
I: "number",
J: "number"
}
break;
break;
default:
return typeof type == "function"
? type(value)
: typeof value == type;
}
// return Object.entries(requiredFields).every( ([field, test]) => {
// return value.hasOwnProperty(field) && checkSyntax(value[field], test);
// });
for (const [field, test] of Object.entries(requiredFields)) {
if (!value.hasOwnProperty(field)) {
return `Missing required property: ${field}`;

View File

@@ -1,6 +1,7 @@
const fs = require('fs');
const YAML = require('yaml');
const flattenQCDefinitions = require('../../../utils/flattenQCDefinitions');
const { translatePath } = require('../../../utils/logicalPath');
const project = require('../../project'); // lib/db/project
@@ -8,7 +9,7 @@ async function get (projectId, opts = {}) {
const qcConfig = (await project.configuration.get(projectId))?.qc;
if (qcConfig?.definitions) {
try {
const definitions = YAML.parse(fs.readFileSync(qcConfig.definitions).toString());
const definitions = YAML.parse(fs.readFileSync(translatePath(qcConfig.definitions)).toString());
return opts.flat ? flattenQCDefinitions(definitions) : definitions;
} catch (err) {

View File

@@ -1,7 +1,7 @@
const fs = require('fs/promises');
const Path = require('path');
const mime = require('./mime-types');
const { translatePath, logicalRoot } = require('./logical');
const { translatePath, logicalRoot } = require('../utils/logicalPath');
const systemCfg = require('../config');
const projectCfg = require('../db/configuration');

View File

@@ -8,6 +8,7 @@ const { pool, setSurvey, transaction, fetchRow } = require('../db/connection')
const { project, sequence, configuration, info } = require('../db')
const flattenQCDefinitions = require('./flatten');
const { projectHash, sequenceHash } = require('./last-modified');
const { translatePath } = require('../utils/logicalPath');
const { runShotsQC, saveShotsQC } = require('./shots');
const { runSequenceQCs, saveSequenceQCs } = require('./sequences');
@@ -46,8 +47,8 @@ async function getProjectQCConfig (projectId) {
console.log("qcConfig", qcConfig);
if (qcConfig?.definitions && qcConfig?.parameters) {
const definitions =
flattenQCDefinitions(YAML.parse(fs.readFileSync(qcConfig.definitions).toString()));
const parameters = YAML.parse(fs.readFileSync(qcConfig.parameters).toString());
flattenQCDefinitions(YAML.parse(fs.readFileSync(translatePath(qcConfig.definitions)).toString()));
const parameters = YAML.parse(fs.readFileSync(translatePath(qcConfig.parameters)).toString());
return { definitions, parameters };
}

View File

@@ -5,5 +5,8 @@ module.exports = {
replaceMarkers: require('./replaceMarkers'),
flattenQCDefinitions: require('./flattenQCDefinitions'),
deepMerge: require('./deepMerge'),
removeNulls: require('./removeNulls')
removeNulls: require('./removeNulls'),
logicalPath: require('./logicalPath'),
ranges: require('./ranges'),
unique: require('./unique')
};

View File

@@ -10,6 +10,7 @@ function translatePath (file) {
return physicalPath;
} else {
// An attempt to break out of the logical path?
console.warn("Attempting to break out of the logical path?", physicalPath, prefix);
throw {
status: 404,
message: "Not found"

View File

@@ -0,0 +1,74 @@
function parseRange (str) {
const rx = /^[\[(].*,.*[)\]]$/
if (rx.test(str)) {
const lower_inclusive = str[0] == '[';
const upper_inclusive = str[str.length-1] == ']';
const [ lower, upper ] = str.slice(1,-1).split(",");
return {
upper,
lower,
upper_inclusive,
lower_inclusive
};
}
}
function parseValidity (str) {
const range = parseRange(str);
if (range) {
ts0 = range.lower ? new Date(range.lower) : null;
ts1 = range.upper ? new Date(range.upper) : null;
return {
...range,
lower: ts0,
upper: ts1
};
}
}
function withinValidity (range, ts) {
if (!ts) {
ts = new Date();
}
if (typeof range === "string") {
range = parseValidity(range);
}
if (range.lower) {
if (range.lower_inclusive) {
if (!(range.lower <= ts)) {
return false;
}
} else {
if (!(range.lower < ts)) {
return false;
}
}
}
if (range.upper) {
if (range.upper_inclusive) {
if (!(range.upper >= ts)) {
return false;
}
} else {
if (!(range.upper > ts)) {
return false;
}
}
}
return true;
}
module.exports = {
parseRange,
parseValidity,
withinValidity
}

View File

@@ -0,0 +1,6 @@
function unique(array) {
return [...new Set(array)];
}
module.exports = unique;

View File

@@ -180,6 +180,16 @@ components:
required: true
example: 14707
Since:
description: Starting epoch
name: since
in: path
schema:
type: string
format: date-time
required: true
example: 1970-01-01T00:00:00Z
QueryLimit:
description: Maximum number of results to return
name: limit
@@ -206,6 +216,16 @@ components:
pattern: "(([^\\s,;:]+)(\\s*[,;:\\s]\\s*)?)+"
example: "line,point,tstamp"
Unique:
description: |
Return unique results. Any value at all represents `true`.
name: unique
in: query
schema:
type: string
pattern: ".+"
example: "t"
schemas:
Duration:
@@ -602,14 +622,26 @@ components:
Flag to indicate that this event is read-only. It cannot be edited by the user or deleted. Typically this concerns system-generated events such as QC results or midnight shots.
additionalProperties: true
EventIDAbstract:
type: object
properties:
id:
type: number
description: Event ID.
EventUIDAbstract:
type: object
properties:
uid:
type: number
description: Event instance unique ID. When an event is modified, the new entry acquires a different `uid` while keeping the same `id` as the original event.
EventAbstract:
allOf:
-
type: object
properties:
id:
type: number
description: Event ID.
$ref: "#/components/schemas/EventIDAbstract"
-
$ref: "#/components/schemas/EventNew"
@@ -659,6 +691,47 @@ components:
* The third element is either an ISO-8601 timestamp or `null`. The latter indicates +∞. These are the events returned by endpoints that do not concern themselves with event history.
* The fourth element is one of `]` or `)`. As before, it indicates either an open or closed interval.
EventChangesIsDeletedAbstract:
type: object
properties:
is_deleted:
type: boolean
description: >
Flag to indicate whether this event or event instance (depending on the presence of a `uid` attribute) has been deleted.
EventChangesModified:
description: An event modification.
allOf:
-
$ref: "#/components/schemas/EventAbstract"
-
$ref: "#/components/schemas/EventChangesIsDeletedAbstract"
EventChangesDeleted:
description: |
Identification of a deleted event or event instance.
**Note:** the details of the deleted event are not included, only its `id` and `uid`.
allOf:
-
$ref: "#/components/schemas/EventIDAbstract"
-
$ref: "#/components/schemas/EventUIDAbstract"
-
$ref: "#/components/schemas/EventChangesIsDeletedAbstract"
EventChanges:
description: List of event changes since the given epoch.
type: array
items:
anyOf:
-
$ref: "#/components/schemas/EventChangesDeleted"
-
$ref: "#/components/schemas/EventChangesModified"
SeisExportEntryFSP:
type: object
properties:
@@ -1382,6 +1455,31 @@ paths:
$ref: "#/components/responses/401"
/project/{project}/changes/{since}:
get:
summary: Get event change history since epoch.
tags: [ "log" ]
security:
- BearerAuthGuest: []
- CookieAuthGuest: []
parameters:
- $ref: "#/components/parameters/Project"
- $ref: "#/components/parameters/Since"
- $ref: "#/components/parameters/Unique"
responses:
"200":
description: List of project event changes. If `unique` is given, only the latest version of each event will be returned, otherwise the entire modification history is given, potentially including the same event `id` multiple times.
content:
application/json:
schema:
type: array
items:
$ref: "#/components/schemas/EventChanges"
"401":
$ref: "#/components/responses/401"
/project/{project}/label:
get:
summary: Get project labels.

View File

@@ -1,71 +0,0 @@
const { pool } = require('../lib/db/connection');
var client;
const channels = {};
async function notify (data) {
if (data.channel in channels) {
data._received = new Date();
try {
const json = JSON.parse(data.payload);
data.payload = json;
} catch {
// Ignore the error
}
for (const listener of channels[data.channel]) {
await listener(JSON.parse(JSON.stringify(data)));
}
}
}
function reconnect () {
console.log("Reconnecting");
// No need to provide parameters, channels should already be populated.
listen();
}
async function listen (addChannels, callback) {
if (!client) {
try {
client = await pool.connect();
} catch (err) {
console.error("Error connecting to DB", err);
console.log("Will try again in 15 seconds");
setImmediate(() => client = null);
setTimeout(() => {
listen(addChannels, callback);
}, 15000);
return;
}
client.on('notification', notify);
console.log("Websocket client connected", Object.keys(channels));
client.on('error', (err) => console.error("Events client error: ", err));
client.on('end', () => {
console.warn("Websocket events client disconnected. Will attempt to reconnect in five seconds");
setImmediate(() => client = null);
setTimeout(reconnect, 5000);
});
}
if (addChannels) {
if (!Array.isArray(addChannels)) {
addChannels = [addChannels];
}
for (const channel of addChannels) {
if (!(channel in channels)) {
await client.query("LISTEN "+channel);
channels[channel] = [];
console.log("Listening to ", channel);
}
channels[channel].push(callback);
}
}
}
module.exports = {
listen
}

View File

@@ -1,6 +1,6 @@
const ws = require('ws');
const URL = require('url');
const db = require('./db');
const { listen } = require('../lib/db/notify');
const channels = require('../lib/db/channels');
function start (server, pingInterval=30000) {
@@ -22,7 +22,7 @@ function start (server, pingInterval=30000) {
}
});
db.listen(channels, (data) => {
listen(channels, (data) => {
wsServer.clients.forEach( (socket) => {
socket.send(JSON.stringify(data));
})

View File

@@ -16,7 +16,12 @@ OUTPATH="$OUTDIR/$OUTNAME"
# 30000/UDP: Navigation system headers
# Not all inputs will be present in all systems.
#
EXPR="udp and (port 4461 or port 4462 or port 30000)"
# NOTE: $INS_HOST must be defined and point to the
# navigation server. The reason we don't use a port
# filter for this data is because that doesn't work
# with fragmented UDP packets.
#
EXPR="udp and (port 4461 or port 4462 or src host $INS_HOST)"
if [[ ! -d "$OUTDIR" ]]; then
mkdir "$OUTDIR"

42
sbin/rewrite-captures.sh Executable file
View File

@@ -0,0 +1,42 @@
#!/bin/bash
#
# Rewrite packet captures in order to be able to replay them.
#
# SINET: Rewrite all packets with this source IP address
# SETHER: Rewrite all packets with this MAC
#
# DINET: Rewrite all packets with this destination IP address
# DETHER: Rewrite all packets with this destination MAC address
#
# The resulting files have the original name with "-rewritten.pcap"
# appended as a suffix. Those packets may then be replayed from a
# different computer or virtual container, for instance with:
#
# sudo bittwist -i 1 -v -m10 capture-rewritten.pcap
#
# Where -i n is the interface name (use bittwist -d to list available
# interfaces), -v is the verbose flag and -m10 replays at 10× speed.
#
SINET=${SINET:-$(ip -o -4 addr |grep -v " lo " |head -n 1 |sed -r 's/^.*inet\s([0-9.]+).*$/\1/')}
SETHER=${SETHER:-$(ip -o link |grep -v " lo" |head -n 1 |sed -r 's/^.*ether\s([0-9a-fA-F:]+).*$/\1/')}
DINET=${DINET:-$(ip -o -4 addr |grep -v " lo " |head -n 1 |sed -r 's/^.*inet\s([0-9.]+).*$/\1/')}
DETHER=${DETHER:-$(ip -o link |grep -v " lo" |head -n 1 |sed -r 's/^.*ether\s([0-9a-fA-F:]+).*$/\1/')}
for f in $*; do
OUTFNAME=$f-rewritten.pcap
echo $f$OUTFNAME
if [[ -n "$SINET" && -n "$SETHER" ]]; then
tcprewrite -S 0.0.0.0/0:$SINET --enet-smac=$SETHER \
-D 0.0.0.0/0:$DINET --enet-dmac=$DETHER \
--infile "$f" \
--outfile "$OUTFNAME"
else
tcprewrite -D 0.0.0.0/0:$DINET --enet-dmac=$DETHER \
--infile "$f" \
--outfile "$OUTFNAME"
fi
done