Compare commits

...

37 Commits

Author SHA1 Message Date
D. Berge
6eccbf215a There should be no need to await.
That is because the queue handler will, in theory, only ever
process one event at a time.
2023-09-30 21:29:15 +02:00
D. Berge
8abc05f04e Remove dead code 2023-09-30 21:29:15 +02:00
D. Berge
8f587467f9 Add comment 2023-09-30 21:29:15 +02:00
D. Berge
3d7a91c7ff Rewrite ReportLineChangeTime 2023-09-30 21:29:15 +02:00
D. Berge
3fd408074c Support passing array in opts.sequences to event.list() 2023-09-30 21:29:15 +02:00
D. Berge
f71cbd8f51 Add unique utility function 2023-09-30 21:29:15 +02:00
D. Berge
915df8ac16 Add handler for creation of line change time events 2023-09-30 21:29:15 +02:00
D. Berge
d5ecb08a2d Allow switching to event entry by time.
A ‘Timed’ button is shown when a new (not edited) event is in
the event entry dialogue and the event has sequence and/or
point values. Pressing the button deletes the sequence/point
information and sets the date and time fields to current time.

Fixes #277.
2023-09-30 21:26:32 +02:00
D. Berge
9388cd4861 Make daily_tasks work with new project configuration 2023-09-30 20:36:46 +02:00
D. Berge
180590b411 Mark events as being automatically generated 2023-09-30 01:42:27 +02:00
D. Berge
4ec37539bf Add utils to work with Postgres ranges 2023-09-30 01:41:45 +02:00
D. Berge
8755fe01b6 Refactor events.list.
The SQL has been simplified and the following changes made:

- The `sequence` argument now can only take one individual
  sequence, not a list of sequences.
- A new `sequences` argument is recognised. It takes a list
  of sequences (as a string).
- A new `label` argument is recognised. It takes a label
  name and returns events containing that label.
- A new `jpq` argument is recognised. It takes a JSONPath
  string which is applied to `meta` with jsonb_path_exists(),
  returning any events for which the JSON path expression
  matches.
2023-09-30 01:37:22 +02:00
D. Berge
0bfe54e0c2 Include the meta attribute when posting events 2023-09-30 01:36:18 +02:00
D. Berge
29bc689b84 Merge branch '276-add-soft-start-event-detection' into 'devel'
Resolve "Add soft start event detection"

Closes #276

See merge request wgp/dougal/software!44
2023-09-29 15:02:57 +00:00
D. Berge
65682febc7 Add soft start and full volume events detection 2023-09-29 17:02:03 +02:00
D. Berge
d408665d62 Write meta info to automatic events 2023-09-29 16:49:27 +02:00
D. Berge
64fceb0a01 Merge branch '127-sol-eol-events-not-being-inserted-in-the-log-automatically' into 'devel'
Resolve "SOL / EOL events not being inserted in the log automatically"

Closes #127

See merge request wgp/dougal/software!43
2023-09-29 14:17:46 +00:00
D. Berge
ab58e578c9 Use DEBUG library throughout 2023-09-29 16:16:33 +02:00
D. Berge
0e58b8fa5b Refactor code to identify candidate schemas.
As part of the refactoring, we took into account a slight payload
format change (project configuration details are under the `data`
attribute).
2023-09-29 16:13:35 +02:00
D. Berge
99ac082f00 Use common naming convention both online and offline 2023-09-29 16:11:44 +02:00
D. Berge
4d3fddc051 Merge branch '274-use-new-db-event-notifier-for-event-processing-handlers' into 'devel'
Resolve "Use new DB event notifier for event processing handlers"

Closes #275, #230, and #274

See merge request wgp/dougal/software!42
2023-09-29 14:03:00 +00:00
D. Berge
42456439a9 Remove ad-hoc notifier 2023-09-29 15:59:12 +02:00
D. Berge
ee0c0e7308 Replace ad-hoc notifier with pg-listen based version 2023-09-29 15:59:12 +02:00
D. Berge
998c272bf8 Add var/* to .gitignore 2023-09-29 15:59:12 +02:00
D. Berge
daddd1f0e8 Add script to rewrite packet captures IP and MAC addresses.
Closes #230.
2023-09-29 15:58:59 +02:00
D. Berge
17f20535cb Cope with fragmented UDP packets.
Fixes #275.

Use this as the systemd unit file to run as a service:

[Unit]
Description=Dougal Network Packet Capture
After=network.target remote-fs.target nss-lookup.target

[Service]
ExecStart=/srv/dougal/software/sbin/packet-capture.sh
ExecStop=/bin/kill -s QUIT $MAINPID
Restart=always
User=root
Group=users
Environment=PATH=/usr/bin:/usr/sbin:/usr/local/bin
Environment=INS_HOST=172.31.10.254
WorkingDirectory=/srv/dougal/software/var/
SyslogIdentifier=dougal.pcap

[Install]
WantedBy=multi-user.target
2023-09-29 15:28:11 +02:00
D. Berge
0829ea3ea1 Save a copy of the headers not the original.
Otherwise ExpressJS will complain about trying to modify
headers that have already been sent.
2023-09-24 12:17:16 +02:00
D. Berge
2069d9c3d7 Remove dead code 2023-09-24 12:15:06 +02:00
D. Berge
8a2d526c50 Ignore schema attribute in PATCH payload.
Fixes #273.
2023-09-24 12:14:20 +02:00
D. Berge
8ad96d6f73 Ensure that requiredFields is always defined.
Otherwise, `Object.entries(requiredFields)` may fail.
2023-09-24 11:59:26 +02:00
D. Berge
947faf8c05 Provide default glob specification for map layer imports 2023-09-24 11:34:10 +02:00
D. Berge
a948556455 Fail gracefully if map layer data does not exist.
Fixes #272.
2023-09-24 11:33:32 +02:00
D. Berge
835384b730 Apply path conversion to QC definition files 2023-09-23 22:50:09 +02:00
D. Berge
c5b93794f4 Move path conversion to general utilities 2023-09-23 13:44:53 +02:00
D. Berge
056cd32f0e Merge branch '271-qc-results-not-being-refreshed' into 'devel'
Resolve "QC results not being refreshed"

Closes #271

See merge request wgp/dougal/software!41
2023-09-18 10:08:35 +00:00
D. Berge
49bb413110 Merge branch '270-real-time-interface-stopped-working' into 'devel'
Resolve "Real-time interface stopped working"

Closes #270

See merge request wgp/dougal/software!40
2023-09-18 10:08:27 +00:00
D. Berge
aa3379e1c6 Adapt RTI save function to refactored project configuration in DB 2023-09-18 11:58:55 +02:00
27 changed files with 714 additions and 170 deletions

1
.gitignore vendored
View File

@@ -12,3 +12,4 @@ etc/surveys/*.yaml
!etc/surveys/_*.yaml
etc/ssl/*
etc/config.yaml
var/*

View File

@@ -11,11 +11,9 @@ from datastore import Datastore
if __name__ == '__main__':
print("Reading configuration")
surveys = configuration.surveys()
print("Connecting to database")
db = Datastore()
surveys = db.surveys()
print("Reading surveys")
for survey in surveys:

View File

@@ -115,7 +115,10 @@ if __name__ == '__main__':
process(layer_name, layer, realprefix)
else:
elif os.path.isdir(realprefix):
if not "globs" in layer:
layer["globs"] = [ "**/*.geojson" ]
for globspec in layer["globs"]:
for physical_filepath in pathlib.Path(realprefix).glob(globspec):

View File

@@ -44,7 +44,7 @@
<template v-slot:activator="{ on, attrs }">
<v-text-field
v-model="tsDate"
:disabled="!!(sequence || point || entrySequence || entryPoint)"
:disabled="!!(entrySequence || entryPoint)"
label="Date"
suffix="UTC"
prepend-icon="mdi-calendar"
@@ -64,7 +64,7 @@
<v-col>
<v-text-field
v-model="tsTime"
:disabled="!!(sequence || point || entrySequence || entryPoint)"
:disabled="!!(entrySequence || entryPoint)"
label="Time"
suffix="UTC"
prepend-icon="mdi-clock-outline"
@@ -256,6 +256,15 @@
>
Cancel
</v-btn>
<v-btn v-if="!id && (entrySequence || entryPoint)"
color="info"
text
title="Enter an event by time"
@click="timed"
>
<v-icon left small>mdi-clock-outline</v-icon>
Timed
</v-btn>
<v-spacer></v-spacer>
<v-btn
:disabled="!canSave"
@@ -632,6 +641,14 @@ export default {
}
},
timed () {
const tstamp = (new Date()).toISOString();
this.entrySequence = null;
this.entryPoint = null;
this.tsDate = tstamp.substr(0, 10);
this.tsTime = tstamp.substr(11, 8);
},
close () {
this.entryLabels = this.selectedLabels.map(this.labelToItem)
this.$emit("input", false);

View File

@@ -33,7 +33,7 @@ function saveResponse (res) {
const cache = getCache(res);
const req = res.req;
console.log(`Saving ETag: ${req.method} ${req.url}${etag}`);
const headers = res.getHeaders();
const headers = structuredClone(res.getHeaders());
delete headers["set-cookie"];
cache[req.url] = {etag, headers};
}

View File

@@ -69,7 +69,7 @@ class DetectFDSP {
point: prev._point,
remarks: "Last shotpoint of the day",
labels: ["LDSP", "Prod"],
meta: {auto: true, insertedBy: this.constructor.name}
meta: {auto: true, author: `*${this.constructor.name}*`}
};
const fdsp = {
@@ -77,7 +77,7 @@ class DetectFDSP {
point: cur._point,
remarks: "First shotpoint of the day",
labels: ["FDSP", "Prod"],
meta: {auto: true, insertedBy: this.constructor.name}
meta: {auto: true, author: `*${this.constructor.name}*`}
};
INFO("LDSP", ldsp);

View File

@@ -0,0 +1,128 @@
const { schema2pid } = require('../../lib/db/connection');
const { event } = require('../../lib/db');
const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
class DetectSoftStart {
/* Data may come much faster than we can process it, so we put it
* in a queue and process it at our own pace.
*
* The run() method fills the queue with the necessary data and then
* calls processQueue().
*
* The processQueue() method looks takes the first two elements in
* the queue and processes them if they are not already being taken
* care of by a previous processQueue() call this will happen when
* data is coming in faster than it can be processed.
*
* If the processQueue() call is the first to see the two bottommost
* two elements, it will process them and, when finished, it will set
* the `isPending` flag of the bottommost element to `false`, thus
* letting the next call know that it has work to do.
*
* If the queue was empty, run() will set the `isPending` flag of its
* first element to a falsy value, thus bootstrapping the process.
*/
static MAX_QUEUE_SIZE = 125000;
queue = [];
async processQueue () {
DEBUG("Queue length", this.queue.length)
while (this.queue.length > 1) {
if (this.queue[0].isPending) {
DEBUG("Queue busy");
setImmediate(() => this.processQueue());
return;
}
const prev = this.queue.shift();
const cur = this.queue[0];
try {
// DEBUG("Previous", prev);
// DEBUG("Current", cur);
// TODO:
// Consider whether to remember if soft start / full volume events
// have already been emitted and wait until there is an online/offline
// transition before re-emitting.
// This may or may not be a good idea.
// Look for a soft start or full volume event
if (cur.num_active >= 1 && !prev.num_active && cur.num_active < cur.num_guns) {
INFO("Soft start detected @", cur.tstamp);
const projectId = await schema2pid(cur._schema ?? prev._schema);
// TODO: Try and grab the corresponding comment from the configuration?
const payload = {
tstamp: cur.tstamp,
remarks: "Soft start",
labels: [ "Daily", "Guns", "Prod" ],
meta: {auto: true, author: `*${this.constructor.name}*`}
};
DEBUG("Posting event", projectId, payload);
await event.post(projectId, payload);
} else if (cur.num_active == cur.num_guns && prev.num_active < cur.num_active) {
INFO("Full volume detected @", cur.tstamp);
const projectId = await schema2pid(cur._schema ?? prev._schema);
// TODO: Try and grab the corresponding comment from the configuration?
const payload = {
tstamp: cur.tstamp,
remarks: "Full volume",
labels: [ "Daily", "Guns", "Prod" ],
meta: {auto: true, author: `*${this.constructor.name}*`}
};
DEBUG("Posting event", projectId, payload);
await event.post(projectId, payload);
}
// Processing of this shot has already been completed.
// The queue can now move forward.
} catch (err) {
ERROR("DetectSoftStart Error")
ERROR(err);
} finally {
cur.isPending = false;
}
}
}
async run (data) {
if (!data || data.channel !== "realtime") {
return;
}
if (!(data.payload && data.payload.new && data.payload.new.meta)) {
return;
}
const meta = data.payload.new.meta;
if (this.queue.length < DetectSoftStart.MAX_QUEUE_SIZE) {
this.queue.push({
isPending: this.queue.length,
_schema: meta._schema,
tstamp: meta.tstamp ?? meta.time,
shot: meta.shot,
lineStatus: meta.lineStatus,
_sequence: meta._sequence,
_point: meta._point,
lineName: meta.lineName,
num_guns: meta.num_guns,
num_active: meta.num_active
});
} else {
// FIXME Change to alert
ALERT("DetectSoftStart queue full at", this.queue.length);
}
this.processQueue();
}
}
module.exports = DetectSoftStart;

View File

@@ -1,23 +1,24 @@
const { schema2pid } = require('../../lib/db/connection');
const { event } = require('../../lib/db');
const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
class DetectSOLEOL {
/* Data may come much faster than we can process it, so we put it
* in a queue and process it at our own pace.
*
*
* The run() method fills the queue with the necessary data and then
* calls processQueue().
*
*
* The processQueue() method looks takes the first two elements in
* the queue and processes them if they are not already being taken
* care of by a previous processQueue() call this will happen when
* data is coming in faster than it can be processed.
*
*
* If the processQueue() call is the first to see the two bottommost
* two elements, it will process them and, when finished, it will set
* the `isPending` flag of the bottommost element to `false`, thus
* letting the next call know that it has work to do.
*
*
* If the queue was empty, run() will set the `isPending` flag of its
* first element to a falsy value, thus bootstrapping the process.
*/
@@ -26,8 +27,10 @@ class DetectSOLEOL {
queue = [];
async processQueue () {
DEBUG("Queue length", this.queue.length)
while (this.queue.length > 1) {
if (this.queue[0].isPending) {
DEBUG("Queue busy");
setImmediate(() => this.processQueue());
return;
}
@@ -38,9 +41,15 @@ class DetectSOLEOL {
const sequence = Number(cur._sequence);
try {
DEBUG("Sequence", sequence);
// DEBUG("Previous", prev);
// DEBUG("Current", cur);
if (prev.lineName == cur.lineName && prev._sequence == cur._sequence &&
prev.lineStatus != "online" && cur.lineStatus == "online" && sequence) {
INFO("Transition to ONLINE detected");
// DEBUG(cur);
// DEBUG(prev);
// console.log("TRANSITION TO ONLINE", prev, cur);
// Check if there are already FSP, FGSP events for this sequence
@@ -59,16 +68,22 @@ class DetectSOLEOL {
sequence,
point: cur._point,
remarks,
labels
labels,
meta: {auto: true, author: `*${this.constructor.name}*`}
}
// console.log(projectId, payload);
INFO("Posting event", projectId, payload);
await event.post(projectId, payload);
} else {
// A first shot point has been already entered in the log,
// so we have nothing to do here.
INFO("FSP already in the log. Doing nothing");
}
} else if (prev.lineStatus == "online" && cur.lineStatus != "online") {
INFO("Transition to OFFLINE detected");
// DEBUG(cur);
// DEBUG(prev);
// console.log("TRANSITION TO OFFLINE", prev, cur);
// Check if there are already LSP, LGSP events for this sequence
@@ -87,14 +102,17 @@ class DetectSOLEOL {
sequence,
point: prev._point,
remarks,
labels
labels,
meta: {auto: true, author: `*${this.constructor.name}*`}
}
// console.log(projectId, payload);
INFO("Posting event", projectId, payload);
await event.post(projectId, payload);
} else {
// A first shot point has been already entered in the log,
// so we have nothing to do here.
INFO("LSP already in the log. Doing nothing");
}
}
// Processing of this shot has already been completed.

View File

@@ -1,5 +1,7 @@
const Handlers = [
require('./detect-soleol'),
require('./detect-soft-start'),
require('./report-line-change-time'),
require('./detect-fdsp')
];

View File

@@ -0,0 +1,301 @@
const { schema2pid } = require('../../lib/db/connection');
const { event, project } = require('../../lib/db');
const { withinValidity } = require('../../lib/utils/ranges');
const unique = require('../../lib/utils/unique');
const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
class ReportLineChangeTime {
/* Data may come much faster than we can process it, so we put it
* in a queue and process it at our own pace.
*
* The run() method fills the queue with the necessary data and then
* calls processQueue().
*
* The processQueue() method looks takes the first two elements in
* the queue and processes them if they are not already being taken
* care of by a previous processQueue() call this will happen when
* data is coming in faster than it can be processed.
*
* If the processQueue() call is the first to see the two bottommost
* two elements, it will process them and, when finished, it will set
* the `isPending` flag of the bottommost element to `false`, thus
* letting the next call know that it has work to do.
*
* If the queue was empty, run() will set the `isPending` flag of its
* first element to a falsy value, thus bootstrapping the process.
*/
static MAX_QUEUE_SIZE = 125000;
queue = [];
author = `*${this.constructor.name}*`;
async processQueue () {
DEBUG("Queue length", this.queue.length)
while (this.queue.length > 0) {
if (this.queue[0].isPending) {
DEBUG("Queue busy");
setImmediate(() => this.processQueue());
return;
}
const cur = this.queue.shift();
const next = this.queue[0];
const projectId = cur.pid;
// Are we being called because of a LGSP or because of a FGSP?
const forward = (cur.old?.labels?.includes("LGSP") || cur.new?.labels?.includes("LGSP"));
if (!projectId) {
WARNING("No projectID found in event", cur);
return;
}
async function getConfiguration (projectId) {
return await project.configuration.get(projectId);
}
async function getLineChangeTime (data, forward = false) {
if (forward) {
const ospEvents = await event.list(projectId, {label: "FGSP"});
// DEBUG("ospEvents", ospEvents);
const osp = ospEvents.filter(i => i.tstamp > data.tstamp).pop();
DEBUG("fsp", osp);
// DEBUG("data", data);
if (osp) {
DEBUG("lineChangeTime", osp.tstamp - data.tstamp);
return { lineChangeTime: osp.tstamp - data.tstamp, osp };
}
} else {
const ospEvents = await event.list(projectId, {label: "LGSP"});
// DEBUG("ospEvents", ospEvents);
const osp = ospEvents.filter(i => i.tstamp < data.tstamp).shift();
DEBUG("lsp", osp);
// DEBUG("data", data);
if (osp) {
DEBUG("lineChangeTime", data.tstamp - osp.tstamp);
return { lineChangeTime: data.tstamp - osp.tstamp, osp };
}
}
}
function parseInterval (dt) {
const daySeconds = (dt/1000) % 86400;
const d = Math.floor((dt/1000) / 86400);
const dateObject = new Date(null);
dateObject.setSeconds(daySeconds);
const [ h, m, s ] = dateObject.toISOString().slice(11, 19).split(":").map(Number);
return {d, h, m, s};
}
function formatInterval (i) {
let str = "";
for (let [k, v] of Object.entries(i)) {
if (v) {
str += " " + v + " " + k;
}
}
return str.trim();
}
const deleteStaleEvents = async (seq) => {
if (seq) {
DEBUG("Will delete lct events related to sequence(s)", seq);
const jpq = `$."${this.author}"`;
const opts = {jpq};
if (Array.isArray(seq)) {
opts.sequences = unique(seq).filter(i => !!i);
} else {
opts.sequence = seq;
}
const staleEvents = await event.list(projectId, opts);
DEBUG(staleEvents.length ?? 0, "events to delete");
for (let staleEvent of staleEvents) {
DEBUG(`Deleting event id ${staleEvent.id} (seq = ${staleEvent.sequence}, point = ${staleEvent.point})`);
await event.del(projectId, staleEvent.id);
}
}
}
const createLineChangeTimeEvents = async (lineChangeTime, data, osp) => {
const events = [];
const cfg = (await project.configuration.get(projectId));
const nlcd = cfg?.production?.nominalLineChangeDuration * 60*1000; // m → ms
DEBUG("nlcd", nlcd);
if (nlcd && lineChangeTime > nlcd) {
const excess = lineChangeTime-nlcd;
const excessString = formatInterval(parseInterval(excess));
DEBUG("excess", excess, excessString);
// ref: The later of the two events
const ref = forward ? osp : data;
const payload = {
// tstamp: new Date(ref.tstamp-1),
sequence: ref.sequence,
point: ref.point,
remarks: `_Nominal line change duration exceeded by ${excessString}_`,
labels: [ "Nav", "Prod" ],
meta: {
auto: true,
author: this.author,
[this.author]: {
parents: [
data.id,
osp.id
],
type: "excess",
value: excess
}
}
}
events.push(payload);
DEBUG("Created line change duration exceeded event", projectId, payload);
}
const lctString = formatInterval(parseInterval(lineChangeTime));
// ref: The later of the two events
const ref = forward ? osp : data;
const payload = {
// tstamp: new Date(ref.tstamp-1),
sequence: ref.sequence,
point: ref.point,
remarks: `Line change time: ${lctString}`,
labels: [ "Nav", "Prod" ],
meta: {
auto: true,
author: this.author,
[this.author]: {
parents: [
data.id,
osp.id
],
type: "lineChangeTime",
value: lineChangeTime
}
}
};
events.push(payload);
DEBUG("Created line change duration event", projectId, payload);
return events;
}
const maybePostEvent = async (projectId, payload) => {
DEBUG("Posting event", projectId, payload);
await event.post(projectId, payload);
}
try {
// DEBUG("Previous", prev);
DEBUG("Current", cur);
DEBUG("Forward search", forward);
// We have these scenarios to consider:
// INSERT:
// `old` will be NULL
// Add event with line change time:
// - match validity with `new`
// - meta.ReportLineChangeTime.link refers to new.uid (or new.id?)
// UPDATE:
// `old` is not NULL
// `new` is not NULL
// Delete previous event from event_log (not event_log_full)
// Add event with line change time:
// - match validity with `new`
// - meta.ReportLineChangeTime.link refers to new.uid (or new.id?)
// DELETE:
// `old` is not NULL
// `new` will be NULL
// Delete previous event from event_log (not event_log_full)
await deleteStaleEvents([cur.old?.sequence, cur.new?.sequence]);
if (cur.operation == "INSERT") {
// NOTE: UPDATE on the event_log view translates to one UPDATE plus one INSERT
// on event_log_full, so we don't need to worry about UPDATE here.
const data = cur.new;
DEBUG("INSERT seen: will add lct events related to ", data.id);
if (withinValidity(data.validity)) {
DEBUG("Event within validity period", data.validity, new Date());
data.tstamp = new Date(data.tstamp);
const { lineChangeTime, osp } = await getLineChangeTime(data, forward);
if (lineChangeTime) {
const events = await createLineChangeTimeEvents(lineChangeTime, data, osp);
if (events?.length) {
DEBUG("Deleting other events for sequence", events[0].sequence);
await deleteStaleEvents(events[0].sequence);
}
for (let payload of events) {
await maybePostEvent(projectId, payload);
}
}
} else {
DEBUG("Event outside of validity range", data.validity, "lct events not inserted");
}
}
// Processing of this shot has already been completed.
// The queue can now move forward.
} catch (err) {
ERROR("ReportLineChangeTime Error")
ERROR(err);
} finally {
if (next) {
next.isPending = false;
}
}
}
}
async run (data) {
if (!data || data.channel !== "event") {
return;
}
if (!(data.payload?.new?.labels) && !(data.payload?.old?.labels)) {
return;
}
const n = data.payload.new;
const o = data.payload.old;
if (!n?.labels?.includes("FGSP") && !o?.labels?.includes("FGSP") &&
!n?.labels?.includes("LGSP") && !o?.labels?.includes("LGSP")) {
return;
}
if (this.queue.length < ReportLineChangeTime.MAX_QUEUE_SIZE) {
this.queue.push({
...data.payload,
isPending: this.queue.length,
});
} else {
ALERT("ReportLineChangeTime queue full at", this.queue.length);
}
this.processQueue();
}
}
module.exports = ReportLineChangeTime;

View File

@@ -1,4 +1,4 @@
const { listen } = require('../ws/db');
const { listen } = require('../lib/db/notify');
const channels = require('../lib/db/channels');
const handlers = require('./handlers').init();
const { ERROR, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);

View File

@@ -10,25 +10,34 @@ async function list (projectId, opts = {}) {
const offset = Math.abs((opts.page-1)*opts.itemsPerPage) || 0;
const limit = Math.abs(Number(opts.itemsPerPage)) || null;
const filter = opts.sequence
? String(opts.sequence).includes(";")
? [ "sequence = ANY ( $1 )", [ opts.sequence.split(";") ] ]
: [ "sequence = $1", [ opts.sequence ] ]
: opts.date0
? opts.date1
? [ "date(tstamp) BETWEEN SYMMETRIC $1 AND $2", [ opts.date0, opts.date1 ] ]
: [ "date(tstamp) = $1", [ opts.date0 ] ]
: [ "true = true", [] ];
const sequence = opts.sequence && Number(opts.sequence) || null;
const sequences = opts.sequences && (Array.isArray(opts.sequences)
? opts.sequences.map(Number)
: opts.sequences.split(/[^0-9]+/).map(Number)) || null;
const date0 = opts.date0 ?? null;
const date1 = opts.date1 ?? null;
const jpq = opts.jpq || null;
const label = opts.label ?? null;
const text = `
SELECT *
FROM event_log e
WHERE
${filter[0]}
ORDER BY ${sortKey} ${sortDir};
($1::numeric IS NULL OR sequence = $1) AND
($2::numeric[] IS NULL OR sequence = ANY( $2 )) AND
($3::timestamptz IS NULL OR date(tstamp) = $3) AND
($3::timestamptz IS NULL OR
(($4::timestamptz IS NULL AND date(tstamp) = $3) OR
date(tstamp) BETWEEN SYMMETRIC $3 AND $4)) AND
($5::jsonpath IS NULL OR jsonb_path_exists(meta::jsonb, $5::jsonpath)) AND
($6::text IS NULL OR $6 = ANY(labels))
ORDER BY ${sortKey} ${sortDir}
LIMIT ${limit};
`;
const res = await client.query(text, filter[1]);
const values = [ sequence, sequences, date0, date1, jpq, label ];
const res = await client.query(text, values);
client.release();
return res.rows.map(i => replaceMarkers(i));
}

View File

@@ -9,10 +9,10 @@ async function post (projectId, payload, opts = {}) {
const text = `
INSERT
INTO event_log (tstamp, sequence, point, remarks, labels)
VALUES ($1, $2, $3, replace_placeholders($4, $1, $2, $3), $5);
INTO event_log (tstamp, sequence, point, remarks, labels, meta)
VALUES ($1, $2, $3, replace_placeholders($4, $1, $2, $3), $5, $6);
`;
const values = [ p.tstamp, p.sequence, p.point, p.remarks, p.labels ];
const values = [ p.tstamp, p.sequence, p.point, p.remarks, p.labels, p.meta ];
DEBUG("Inserting new values: %O", values);
await client.query(text, values);

View File

@@ -1,6 +1,6 @@
// FIXME This code is in painful need of refactoring
const { DEBUG } = require("DOUGAL_ROOT/debug")(__filename);
const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
const { setSurvey, transaction, pool } = require('../connection');
let last_tstamp = 0;
@@ -8,14 +8,10 @@ let last_tstamp = 0;
async function getAllProjectConfigs () {
const client = await pool.connect();
const res0 = await client.query("SELECT schema FROM projects;");
const text = res0.rows.map(r => {
return `SELECT '${r.schema}' AS schema, data FROM ${r.schema}.file_data WHERE (data->>'archived')::boolean IS NOT true AND data->>'id' IS NOT NULL`;
}).join("\nUNION ALL ");
const res1 = await client.query(text);
const text = `SELECT schema, meta AS data FROM projects;`;
const res = await client.query(text);
client.release();
return res1.rows.map(r => Object.assign(r.data, {schema: r.schema}));
return res.rows;
}
async function getNearestPreplot (candidates) {
@@ -74,9 +70,9 @@ async function getNearestOfflinePreplot (candidates) {
if ("latitude" in candidates[0] && "longitude" in candidates[0]) {
text = `
SELECT
'${c._schema}' AS _schema,
'${c.schema}' AS schema,
ST_Distance(ST_Transform(ST_SetSRID(ST_MakePoint($1, $2), 4326), ST_SRID(geometry)), geometry) AS distance
FROM ${c._schema}.preplot_points
FROM ${c.schema}.preplot_points
ORDER BY distance ASC
LIMIT 1;
`;
@@ -84,9 +80,9 @@ async function getNearestOfflinePreplot (candidates) {
} else if ("easting" in candidates[0] && "northing" in candidates[0]) {
text = `
SELECT
'${c._schema}' AS _schema,
'${c.schema}' AS schema,
ST_Distance(ST_SetSRID(ST_MakePoint($1, $2), ST_SRID(geometry)), geometry) AS distance
FROM ${c._schema}.preplot_points
FROM ${c.schema}.preplot_points
ORDER BY distance ASC
LIMIT 1;
`;
@@ -102,13 +98,13 @@ async function getNearestOfflinePreplot (candidates) {
const results = [];
for (const qry of queries) {
const res = await client.query(qry.text, qry.values);
if (res.rows[0] && res.rows[0]._schema) {
if (res.rows[0] && res.rows[0].schema) {
results.push(res.rows[0]);
}
}
client.release();
const _schema = results.sort( (a, b) => a.distance - b.distance).shift()?._schema;
return candidates.find(c => c._schema == _schema);
const schema = results.sort( (a, b) => a.distance - b.distance).shift()?.schema;
return candidates.find(c => c.schema == schema);
}
async function saveOnline (dataset, opts = {}) {
@@ -141,14 +137,14 @@ async function saveOnline (dataset, opts = {}) {
await client.query(`
INSERT INTO raw_shots
(sequence, line, point, objref, tstamp, geometry, hash)
VALUES ($1, $2, $3, $4, $5, ST_SetSRID(ST_MakePoint($6, $7), (SELECT (data->>'epsg')::integer AS epsg FROM file_data WHERE data ? 'id')), '*online*')
VALUES ($1, $2, $3, $4, $5, ST_SetSRID(ST_MakePoint($6, $7), (select (project_configuration()->>'epsg')::integer as epsg)), '*online*')
ON CONFLICT DO NOTHING;
`, [item.sequence, item.line, item.point, 0, item.tstamp, item.easting, item.northing]);
} else if (item.latitude && item.longitude) {
await client.query(`
INSERT INTO raw_shots
(sequence, line, point, objref, tstamp, geometry, hash)
VALUES ($1, $2, $3, $4, $5, ST_Transform(ST_SetSRID(ST_MakePoint($6, $7), 4326), (SELECT (data->>'epsg')::integer AS epsg FROM file_data WHERE data ? 'id')), '*online*')
VALUES ($1, $2, $3, $4, $5, ST_Transform(ST_SetSRID(ST_MakePoint($6, $7), 4326), (select (project_configuration()->>'epsg')::integer as epsg)), '*online*')
ON CONFLICT DO NOTHING;
`, [item.sequence, item.line, item.point, 0, item.tstamp, item.longitude, item.latitude]);
} else {
@@ -158,8 +154,8 @@ async function saveOnline (dataset, opts = {}) {
}
await transaction.commit(client);
} catch (error) {
console.error("ONLINE DATA INSERT ERROR");
console.error(error);
ERROR("ONLINE DATA INSERT ERROR");
ERROR(error);
await transaction.rollback(client);
} finally {
client.release();
@@ -186,7 +182,7 @@ async function saveOffline (navData, opts = {}) {
} else if (schema && hasEastNorth) {
const text = `
INSERT INTO real_time_inputs (tstamp, geometry, meta)
VALUES ($1, ST_Transform(ST_SetSRID(ST_MakePoint($2, $3), (SELECT (data->>'epsg')::integer AS epsg FROM ${schema}.file_data)), 4326), $4);
VALUES ($1, ST_Transform(ST_SetSRID(ST_MakePoint($2, $3), (select (project_configuration()->>'epsg')::integer as epsg), 4326), $4);
`;
const values = [navData.tstamp, navData.longitude, navData.latitude, navData.payload];
@@ -215,6 +211,37 @@ async function saveOffline (navData, opts = {}) {
client.release();
}
async function getCandidates (navData) {
const configs = await getAllProjectConfigs();
// We just get the bits of interest: pattern and schema
const candidates = configs.map(c => {
if (!c?.data?.online?.line || c?.archived === true) {
return null;
}
const p = c.data.online.line.pattern; // For short
const rx = new RegExp(p.regex, p.flags);
const matches = navData.lineName.match(rx);
if (!matches || ((matches.length+1) < p.captures.length)) {
return null;
}
matches.shift(); // Get rid of the full matched text
const obj = Object.assign({}, navData, {schema: c.schema});
p.captures.forEach( (k, i) => {
obj[k] = matches[i];
});
return obj;
}).filter(c => !!c);
DEBUG("Candidates: %j", candidates.map(c => c.schema));
return candidates;
}
async function save (navData, opts = {}) {
const hasLatLon = ("latitude" in navData && "longitude" in navData);
@@ -222,44 +249,21 @@ async function save (navData, opts = {}) {
const hasLinePoint = ("lineName" in navData && "point" in navData);
if (!(hasLinePoint || hasLatLon || hasEastNorth)) {
// This is of no interest to us
console.warning("Ignoring data without useful values", navData);
NOTICE("Ignoring data without useful values", navData);
return;
}
// DEBUG("navData", navData);
if (navData.online === true) {
// So we have a lineName, see which projects match the line pattern.
// For this we need to get all the project configs
const configs = await getAllProjectConfigs();
// We just get the bits of interest: pattern and schema
const candidates = configs.map(c => {
if (!(c && c.online && c.online.line)) {
return null;
}
const p = c.online.line.pattern; // For short
const rx = new RegExp(p.regex, p.flags);
const matches = navData.lineName.match(rx);
if (!matches || ((matches.length+1) < p.captures.length)) {
return null;
}
matches.shift(); // Get rid of the full matched text
const obj = Object.assign({}, navData, {schema: c.schema});
p.captures.forEach( (k, i) => {
obj[k] = matches[i];
});
return obj;
}).filter(c => !!c);
DEBUG("Candidates: %j", candidates);
// console.log("CANDIDATES", candidates);
const candidates = await getCandidates(navData);
if (candidates.length == 0) {
// This is probably a test line, so we treat it as offline
console.log("No match");
WARNING("No match");
} else {
if (candidates.length == 1) {
// Only one candidate, associate with it
@@ -275,7 +279,7 @@ async function save (navData, opts = {}) {
await saveOnline(candidates.filter(c => c.schema == destinationSchema), opts);
navData.payload._schema = destinationSchema;
} else {
console.log("Nowhere to save to");
WARNING("Nowhere to save to");
}
}
@@ -286,17 +290,18 @@ async function save (navData, opts = {}) {
}
} else {
// We are offline. We only assign _schema once every save_interval seconds at most
// unless there is gun data present.
if (opts.offline_survey_heuristics == "nearest_preplot") {
const now = Date.now();
const do_save = !opts.offline_survey_detect_interval ||
(now - last_tstamp) >= opts.offline_survey_detect_interval;
if (do_save) {
if (do_save || "guns" in navData?.payload) {
const configs = await getAllProjectConfigs();
const candidates = configs.map(c => Object.assign({}, navData, {_schema: c.schema}));
const candidates = await getCandidates(navData);
const bestCandidate = await getNearestOfflinePreplot(candidates);
if (bestCandidate) {
navData.payload._schema = bestCandidate._schema;
navData.payload._schema = bestCandidate.schema;
last_tstamp = now;
}
}

View File

@@ -36,6 +36,9 @@ async function patch (projectId, payload, opts = {}) {
}
}
// We do not allow users to change the schema
delete payload.schema;
const dest = removeNulls(deepMerge(source, payload));
await modify(projectId, dest);
return dest;

View File

@@ -7,10 +7,11 @@ const { INFO, DEBUG, WARNING, ERROR } = require('DOUGAL_ROOT/debug')(__filename)
function checkSyntax (value, type = "project") {
var requiredFields = {};
switch (type) {
case "project":
var requiredFields = {
requiredFields = {
id: "string",
name: "string",
epsg: "number",
@@ -18,7 +19,7 @@ function checkSyntax (value, type = "project") {
};
break;
case "binning":
var requiredFields = {
requiredFields = {
theta: "number",
I_inc: "number",
J_inc: "number",
@@ -28,23 +29,19 @@ function checkSyntax (value, type = "project") {
}
break
case "origin":
var requiredFields = {
requiredFields = {
easting: "number",
northing: "number",
I: "number",
J: "number"
}
break;
break;
default:
return typeof type == "function"
? type(value)
: typeof value == type;
}
// return Object.entries(requiredFields).every( ([field, test]) => {
// return value.hasOwnProperty(field) && checkSyntax(value[field], test);
// });
for (const [field, test] of Object.entries(requiredFields)) {
if (!value.hasOwnProperty(field)) {
return `Missing required property: ${field}`;

View File

@@ -1,6 +1,7 @@
const fs = require('fs');
const YAML = require('yaml');
const flattenQCDefinitions = require('../../../utils/flattenQCDefinitions');
const { translatePath } = require('../../../utils/logicalPath');
const project = require('../../project'); // lib/db/project
@@ -8,7 +9,7 @@ async function get (projectId, opts = {}) {
const qcConfig = (await project.configuration.get(projectId))?.qc;
if (qcConfig?.definitions) {
try {
const definitions = YAML.parse(fs.readFileSync(qcConfig.definitions).toString());
const definitions = YAML.parse(fs.readFileSync(translatePath(qcConfig.definitions)).toString());
return opts.flat ? flattenQCDefinitions(definitions) : definitions;
} catch (err) {

View File

@@ -1,7 +1,7 @@
const fs = require('fs/promises');
const Path = require('path');
const mime = require('./mime-types');
const { translatePath, logicalRoot } = require('./logical');
const { translatePath, logicalRoot } = require('../utils/logicalPath');
const systemCfg = require('../config');
const projectCfg = require('../db/configuration');

View File

@@ -8,6 +8,7 @@ const { pool, setSurvey, transaction, fetchRow } = require('../db/connection')
const { project, sequence, configuration, info } = require('../db')
const flattenQCDefinitions = require('./flatten');
const { projectHash, sequenceHash } = require('./last-modified');
const { translatePath } = require('../utils/logicalPath');
const { runShotsQC, saveShotsQC } = require('./shots');
const { runSequenceQCs, saveSequenceQCs } = require('./sequences');
@@ -46,8 +47,8 @@ async function getProjectQCConfig (projectId) {
console.log("qcConfig", qcConfig);
if (qcConfig?.definitions && qcConfig?.parameters) {
const definitions =
flattenQCDefinitions(YAML.parse(fs.readFileSync(qcConfig.definitions).toString()));
const parameters = YAML.parse(fs.readFileSync(qcConfig.parameters).toString());
flattenQCDefinitions(YAML.parse(fs.readFileSync(translatePath(qcConfig.definitions)).toString()));
const parameters = YAML.parse(fs.readFileSync(translatePath(qcConfig.parameters)).toString());
return { definitions, parameters };
}

View File

@@ -5,5 +5,8 @@ module.exports = {
replaceMarkers: require('./replaceMarkers'),
flattenQCDefinitions: require('./flattenQCDefinitions'),
deepMerge: require('./deepMerge'),
removeNulls: require('./removeNulls')
removeNulls: require('./removeNulls'),
logicalPath: require('./logicalPath'),
ranges: require('./ranges'),
unique: require('./unique')
};

View File

@@ -10,6 +10,7 @@ function translatePath (file) {
return physicalPath;
} else {
// An attempt to break out of the logical path?
console.warn("Attempting to break out of the logical path?", physicalPath, prefix);
throw {
status: 404,
message: "Not found"

View File

@@ -0,0 +1,74 @@
function parseRange (str) {
const rx = /^[\[(].*,.*[)\]]$/
if (rx.test(str)) {
const lower_inclusive = str[0] == '[';
const upper_inclusive = str[str.length-1] == ']';
const [ lower, upper ] = str.slice(1,-1).split(",");
return {
upper,
lower,
upper_inclusive,
lower_inclusive
};
}
}
function parseValidity (str) {
const range = parseRange(str);
if (range) {
ts0 = range.lower ? new Date(range.lower) : null;
ts1 = range.upper ? new Date(range.upper) : null;
return {
...range,
lower: ts0,
upper: ts1
};
}
}
function withinValidity (range, ts) {
if (!ts) {
ts = new Date();
}
if (typeof range === "string") {
range = parseValidity(range);
}
if (range.lower) {
if (range.lower_inclusive) {
if (!(range.lower <= ts)) {
return false;
}
} else {
if (!(range.lower < ts)) {
return false;
}
}
}
if (range.upper) {
if (range.upper_inclusive) {
if (!(range.upper >= ts)) {
return false;
}
} else {
if (!(range.upper > ts)) {
return false;
}
}
}
return true;
}
module.exports = {
parseRange,
parseValidity,
withinValidity
}

View File

@@ -0,0 +1,6 @@
function unique(array) {
return [...new Set(array)];
}
module.exports = unique;

View File

@@ -1,71 +0,0 @@
const { pool } = require('../lib/db/connection');
var client;
const channels = {};
async function notify (data) {
if (data.channel in channels) {
data._received = new Date();
try {
const json = JSON.parse(data.payload);
data.payload = json;
} catch {
// Ignore the error
}
for (const listener of channels[data.channel]) {
await listener(JSON.parse(JSON.stringify(data)));
}
}
}
function reconnect () {
console.log("Reconnecting");
// No need to provide parameters, channels should already be populated.
listen();
}
async function listen (addChannels, callback) {
if (!client) {
try {
client = await pool.connect();
} catch (err) {
console.error("Error connecting to DB", err);
console.log("Will try again in 15 seconds");
setImmediate(() => client = null);
setTimeout(() => {
listen(addChannels, callback);
}, 15000);
return;
}
client.on('notification', notify);
console.log("Websocket client connected", Object.keys(channels));
client.on('error', (err) => console.error("Events client error: ", err));
client.on('end', () => {
console.warn("Websocket events client disconnected. Will attempt to reconnect in five seconds");
setImmediate(() => client = null);
setTimeout(reconnect, 5000);
});
}
if (addChannels) {
if (!Array.isArray(addChannels)) {
addChannels = [addChannels];
}
for (const channel of addChannels) {
if (!(channel in channels)) {
await client.query("LISTEN "+channel);
channels[channel] = [];
console.log("Listening to ", channel);
}
channels[channel].push(callback);
}
}
}
module.exports = {
listen
}

View File

@@ -1,6 +1,6 @@
const ws = require('ws');
const URL = require('url');
const db = require('./db');
const { listen } = require('../lib/db/notify');
const channels = require('../lib/db/channels');
function start (server, pingInterval=30000) {
@@ -22,7 +22,7 @@ function start (server, pingInterval=30000) {
}
});
db.listen(channels, (data) => {
listen(channels, (data) => {
wsServer.clients.forEach( (socket) => {
socket.send(JSON.stringify(data));
})

View File

@@ -16,7 +16,12 @@ OUTPATH="$OUTDIR/$OUTNAME"
# 30000/UDP: Navigation system headers
# Not all inputs will be present in all systems.
#
EXPR="udp and (port 4461 or port 4462 or port 30000)"
# NOTE: $INS_HOST must be defined and point to the
# navigation server. The reason we don't use a port
# filter for this data is because that doesn't work
# with fragmented UDP packets.
#
EXPR="udp and (port 4461 or port 4462 or src host $INS_HOST)"
if [[ ! -d "$OUTDIR" ]]; then
mkdir "$OUTDIR"

42
sbin/rewrite-captures.sh Executable file
View File

@@ -0,0 +1,42 @@
#!/bin/bash
#
# Rewrite packet captures in order to be able to replay them.
#
# SINET: Rewrite all packets with this source IP address
# SETHER: Rewrite all packets with this MAC
#
# DINET: Rewrite all packets with this destination IP address
# DETHER: Rewrite all packets with this destination MAC address
#
# The resulting files have the original name with "-rewritten.pcap"
# appended as a suffix. Those packets may then be replayed from a
# different computer or virtual container, for instance with:
#
# sudo bittwist -i 1 -v -m10 capture-rewritten.pcap
#
# Where -i n is the interface name (use bittwist -d to list available
# interfaces), -v is the verbose flag and -m10 replays at 10× speed.
#
SINET=${SINET:-$(ip -o -4 addr |grep -v " lo " |head -n 1 |sed -r 's/^.*inet\s([0-9.]+).*$/\1/')}
SETHER=${SETHER:-$(ip -o link |grep -v " lo" |head -n 1 |sed -r 's/^.*ether\s([0-9a-fA-F:]+).*$/\1/')}
DINET=${DINET:-$(ip -o -4 addr |grep -v " lo " |head -n 1 |sed -r 's/^.*inet\s([0-9.]+).*$/\1/')}
DETHER=${DETHER:-$(ip -o link |grep -v " lo" |head -n 1 |sed -r 's/^.*ether\s([0-9a-fA-F:]+).*$/\1/')}
for f in $*; do
OUTFNAME=$f-rewritten.pcap
echo $f$OUTFNAME
if [[ -n "$SINET" && -n "$SETHER" ]]; then
tcprewrite -S 0.0.0.0/0:$SINET --enet-smac=$SETHER \
-D 0.0.0.0/0:$DINET --enet-dmac=$DETHER \
--infile "$f" \
--outfile "$OUTFNAME"
else
tcprewrite -D 0.0.0.0/0:$DINET --enet-dmac=$DETHER \
--infile "$f" \
--outfile "$OUTFNAME"
fi
done