mirror of
https://gitlab.com/wgp/dougal/software.git
synced 2025-12-06 10:17:07 +00:00
Compare commits
37 Commits
270-real-t
...
v2023.39.1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6eccbf215a | ||
|
|
8abc05f04e | ||
|
|
8f587467f9 | ||
|
|
3d7a91c7ff | ||
|
|
3fd408074c | ||
|
|
f71cbd8f51 | ||
|
|
915df8ac16 | ||
|
|
d5ecb08a2d | ||
|
|
9388cd4861 | ||
|
|
180590b411 | ||
|
|
4ec37539bf | ||
|
|
8755fe01b6 | ||
|
|
0bfe54e0c2 | ||
|
|
29bc689b84 | ||
|
|
65682febc7 | ||
|
|
d408665d62 | ||
|
|
64fceb0a01 | ||
|
|
ab58e578c9 | ||
|
|
0e58b8fa5b | ||
|
|
99ac082f00 | ||
|
|
4d3fddc051 | ||
|
|
42456439a9 | ||
|
|
ee0c0e7308 | ||
|
|
998c272bf8 | ||
|
|
daddd1f0e8 | ||
|
|
17f20535cb | ||
|
|
0829ea3ea1 | ||
|
|
2069d9c3d7 | ||
|
|
8a2d526c50 | ||
|
|
8ad96d6f73 | ||
|
|
947faf8c05 | ||
|
|
a948556455 | ||
|
|
835384b730 | ||
|
|
c5b93794f4 | ||
|
|
056cd32f0e | ||
|
|
49bb413110 | ||
|
|
ceccc42050 |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -12,3 +12,4 @@ etc/surveys/*.yaml
|
||||
!etc/surveys/_*.yaml
|
||||
etc/ssl/*
|
||||
etc/config.yaml
|
||||
var/*
|
||||
|
||||
@@ -11,11 +11,9 @@ from datastore import Datastore
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
print("Reading configuration")
|
||||
surveys = configuration.surveys()
|
||||
|
||||
print("Connecting to database")
|
||||
db = Datastore()
|
||||
surveys = db.surveys()
|
||||
|
||||
print("Reading surveys")
|
||||
for survey in surveys:
|
||||
|
||||
@@ -115,7 +115,10 @@ if __name__ == '__main__':
|
||||
|
||||
process(layer_name, layer, realprefix)
|
||||
|
||||
else:
|
||||
elif os.path.isdir(realprefix):
|
||||
|
||||
if not "globs" in layer:
|
||||
layer["globs"] = [ "**/*.geojson" ]
|
||||
|
||||
for globspec in layer["globs"]:
|
||||
for physical_filepath in pathlib.Path(realprefix).glob(globspec):
|
||||
|
||||
@@ -44,7 +44,7 @@
|
||||
<template v-slot:activator="{ on, attrs }">
|
||||
<v-text-field
|
||||
v-model="tsDate"
|
||||
:disabled="!!(sequence || point || entrySequence || entryPoint)"
|
||||
:disabled="!!(entrySequence || entryPoint)"
|
||||
label="Date"
|
||||
suffix="UTC"
|
||||
prepend-icon="mdi-calendar"
|
||||
@@ -64,7 +64,7 @@
|
||||
<v-col>
|
||||
<v-text-field
|
||||
v-model="tsTime"
|
||||
:disabled="!!(sequence || point || entrySequence || entryPoint)"
|
||||
:disabled="!!(entrySequence || entryPoint)"
|
||||
label="Time"
|
||||
suffix="UTC"
|
||||
prepend-icon="mdi-clock-outline"
|
||||
@@ -256,6 +256,15 @@
|
||||
>
|
||||
Cancel
|
||||
</v-btn>
|
||||
<v-btn v-if="!id && (entrySequence || entryPoint)"
|
||||
color="info"
|
||||
text
|
||||
title="Enter an event by time"
|
||||
@click="timed"
|
||||
>
|
||||
<v-icon left small>mdi-clock-outline</v-icon>
|
||||
Timed
|
||||
</v-btn>
|
||||
<v-spacer></v-spacer>
|
||||
<v-btn
|
||||
:disabled="!canSave"
|
||||
@@ -632,6 +641,14 @@ export default {
|
||||
}
|
||||
},
|
||||
|
||||
timed () {
|
||||
const tstamp = (new Date()).toISOString();
|
||||
this.entrySequence = null;
|
||||
this.entryPoint = null;
|
||||
this.tsDate = tstamp.substr(0, 10);
|
||||
this.tsTime = tstamp.substr(11, 8);
|
||||
},
|
||||
|
||||
close () {
|
||||
this.entryLabels = this.selectedLabels.map(this.labelToItem)
|
||||
this.$emit("input", false);
|
||||
|
||||
@@ -200,25 +200,25 @@ app.map({
|
||||
'/project/:project/qc': {
|
||||
'/results': {
|
||||
// Get all QC results for :project
|
||||
get: [ mw.qc.results.get ],
|
||||
get: [ mw.etag.noSave, mw.qc.results.get ],
|
||||
|
||||
// Delete all QC results for :project
|
||||
delete: [ mw.auth.access.write, mw.qc.results.delete ],
|
||||
delete: [ mw.etag.noSave, mw.auth.access.write, mw.qc.results.delete ],
|
||||
|
||||
'/accept': {
|
||||
post: [ mw.auth.access.write, mw.qc.results.accept ]
|
||||
post: [ mw.etag.noSave, mw.auth.access.write, mw.qc.results.accept ]
|
||||
},
|
||||
|
||||
'/unaccept': {
|
||||
post: [ mw.auth.access.write, mw.qc.results.unaccept ]
|
||||
post: [ mw.etag.noSave, mw.auth.access.write, mw.qc.results.unaccept ]
|
||||
},
|
||||
|
||||
'/sequence/:sequence': {
|
||||
// Get QC results for :project, :sequence
|
||||
get: [ mw.qc.results.get ],
|
||||
get: [ mw.etag.noSave, mw.qc.results.get ],
|
||||
|
||||
// Delete QC results for :project, :sequence
|
||||
delete: [ mw.auth.access.write, mw.qc.results.delete ]
|
||||
delete: [ mw.etag.noSave, mw.auth.access.write, mw.qc.results.delete ]
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
@@ -33,7 +33,7 @@ function saveResponse (res) {
|
||||
const cache = getCache(res);
|
||||
const req = res.req;
|
||||
console.log(`Saving ETag: ${req.method} ${req.url} → ${etag}`);
|
||||
const headers = res.getHeaders();
|
||||
const headers = structuredClone(res.getHeaders());
|
||||
delete headers["set-cookie"];
|
||||
cache[req.url] = {etag, headers};
|
||||
}
|
||||
|
||||
@@ -69,7 +69,7 @@ class DetectFDSP {
|
||||
point: prev._point,
|
||||
remarks: "Last shotpoint of the day",
|
||||
labels: ["LDSP", "Prod"],
|
||||
meta: {auto: true, insertedBy: this.constructor.name}
|
||||
meta: {auto: true, author: `*${this.constructor.name}*`}
|
||||
};
|
||||
|
||||
const fdsp = {
|
||||
@@ -77,7 +77,7 @@ class DetectFDSP {
|
||||
point: cur._point,
|
||||
remarks: "First shotpoint of the day",
|
||||
labels: ["FDSP", "Prod"],
|
||||
meta: {auto: true, insertedBy: this.constructor.name}
|
||||
meta: {auto: true, author: `*${this.constructor.name}*`}
|
||||
};
|
||||
|
||||
INFO("LDSP", ldsp);
|
||||
|
||||
128
lib/www/server/events/handlers/detect-soft-start.js
Normal file
128
lib/www/server/events/handlers/detect-soft-start.js
Normal file
@@ -0,0 +1,128 @@
|
||||
const { schema2pid } = require('../../lib/db/connection');
|
||||
const { event } = require('../../lib/db');
|
||||
const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
|
||||
|
||||
class DetectSoftStart {
|
||||
/* Data may come much faster than we can process it, so we put it
|
||||
* in a queue and process it at our own pace.
|
||||
*
|
||||
* The run() method fills the queue with the necessary data and then
|
||||
* calls processQueue().
|
||||
*
|
||||
* The processQueue() method looks takes the first two elements in
|
||||
* the queue and processes them if they are not already being taken
|
||||
* care of by a previous processQueue() call – this will happen when
|
||||
* data is coming in faster than it can be processed.
|
||||
*
|
||||
* If the processQueue() call is the first to see the two bottommost
|
||||
* two elements, it will process them and, when finished, it will set
|
||||
* the `isPending` flag of the bottommost element to `false`, thus
|
||||
* letting the next call know that it has work to do.
|
||||
*
|
||||
* If the queue was empty, run() will set the `isPending` flag of its
|
||||
* first element to a falsy value, thus bootstrapping the process.
|
||||
*/
|
||||
static MAX_QUEUE_SIZE = 125000;
|
||||
|
||||
queue = [];
|
||||
|
||||
async processQueue () {
|
||||
DEBUG("Queue length", this.queue.length)
|
||||
while (this.queue.length > 1) {
|
||||
if (this.queue[0].isPending) {
|
||||
DEBUG("Queue busy");
|
||||
setImmediate(() => this.processQueue());
|
||||
return;
|
||||
}
|
||||
|
||||
const prev = this.queue.shift();
|
||||
const cur = this.queue[0];
|
||||
|
||||
try {
|
||||
// DEBUG("Previous", prev);
|
||||
// DEBUG("Current", cur);
|
||||
|
||||
// TODO:
|
||||
// Consider whether to remember if soft start / full volume events
|
||||
// have already been emitted and wait until there is an online/offline
|
||||
// transition before re-emitting.
|
||||
// This may or may not be a good idea.
|
||||
|
||||
// Look for a soft start or full volume event
|
||||
if (cur.num_active >= 1 && !prev.num_active && cur.num_active < cur.num_guns) {
|
||||
INFO("Soft start detected @", cur.tstamp);
|
||||
|
||||
const projectId = await schema2pid(cur._schema ?? prev._schema);
|
||||
|
||||
// TODO: Try and grab the corresponding comment from the configuration?
|
||||
const payload = {
|
||||
tstamp: cur.tstamp,
|
||||
remarks: "Soft start",
|
||||
labels: [ "Daily", "Guns", "Prod" ],
|
||||
meta: {auto: true, author: `*${this.constructor.name}*`}
|
||||
};
|
||||
DEBUG("Posting event", projectId, payload);
|
||||
await event.post(projectId, payload);
|
||||
|
||||
} else if (cur.num_active == cur.num_guns && prev.num_active < cur.num_active) {
|
||||
INFO("Full volume detected @", cur.tstamp);
|
||||
|
||||
const projectId = await schema2pid(cur._schema ?? prev._schema);
|
||||
|
||||
// TODO: Try and grab the corresponding comment from the configuration?
|
||||
const payload = {
|
||||
tstamp: cur.tstamp,
|
||||
remarks: "Full volume",
|
||||
labels: [ "Daily", "Guns", "Prod" ],
|
||||
meta: {auto: true, author: `*${this.constructor.name}*`}
|
||||
};
|
||||
DEBUG("Posting event", projectId, payload);
|
||||
await event.post(projectId, payload);
|
||||
}
|
||||
// Processing of this shot has already been completed.
|
||||
// The queue can now move forward.
|
||||
} catch (err) {
|
||||
ERROR("DetectSoftStart Error")
|
||||
ERROR(err);
|
||||
} finally {
|
||||
cur.isPending = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async run (data) {
|
||||
if (!data || data.channel !== "realtime") {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!(data.payload && data.payload.new && data.payload.new.meta)) {
|
||||
return;
|
||||
}
|
||||
|
||||
const meta = data.payload.new.meta;
|
||||
|
||||
if (this.queue.length < DetectSoftStart.MAX_QUEUE_SIZE) {
|
||||
|
||||
this.queue.push({
|
||||
isPending: this.queue.length,
|
||||
_schema: meta._schema,
|
||||
tstamp: meta.tstamp ?? meta.time,
|
||||
shot: meta.shot,
|
||||
lineStatus: meta.lineStatus,
|
||||
_sequence: meta._sequence,
|
||||
_point: meta._point,
|
||||
lineName: meta.lineName,
|
||||
num_guns: meta.num_guns,
|
||||
num_active: meta.num_active
|
||||
});
|
||||
|
||||
} else {
|
||||
// FIXME Change to alert
|
||||
ALERT("DetectSoftStart queue full at", this.queue.length);
|
||||
}
|
||||
|
||||
this.processQueue();
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = DetectSoftStart;
|
||||
@@ -1,23 +1,24 @@
|
||||
const { schema2pid } = require('../../lib/db/connection');
|
||||
const { event } = require('../../lib/db');
|
||||
const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
|
||||
|
||||
class DetectSOLEOL {
|
||||
/* Data may come much faster than we can process it, so we put it
|
||||
* in a queue and process it at our own pace.
|
||||
*
|
||||
*
|
||||
* The run() method fills the queue with the necessary data and then
|
||||
* calls processQueue().
|
||||
*
|
||||
*
|
||||
* The processQueue() method looks takes the first two elements in
|
||||
* the queue and processes them if they are not already being taken
|
||||
* care of by a previous processQueue() call – this will happen when
|
||||
* data is coming in faster than it can be processed.
|
||||
*
|
||||
*
|
||||
* If the processQueue() call is the first to see the two bottommost
|
||||
* two elements, it will process them and, when finished, it will set
|
||||
* the `isPending` flag of the bottommost element to `false`, thus
|
||||
* letting the next call know that it has work to do.
|
||||
*
|
||||
*
|
||||
* If the queue was empty, run() will set the `isPending` flag of its
|
||||
* first element to a falsy value, thus bootstrapping the process.
|
||||
*/
|
||||
@@ -26,8 +27,10 @@ class DetectSOLEOL {
|
||||
queue = [];
|
||||
|
||||
async processQueue () {
|
||||
DEBUG("Queue length", this.queue.length)
|
||||
while (this.queue.length > 1) {
|
||||
if (this.queue[0].isPending) {
|
||||
DEBUG("Queue busy");
|
||||
setImmediate(() => this.processQueue());
|
||||
return;
|
||||
}
|
||||
@@ -38,9 +41,15 @@ class DetectSOLEOL {
|
||||
const sequence = Number(cur._sequence);
|
||||
|
||||
try {
|
||||
DEBUG("Sequence", sequence);
|
||||
// DEBUG("Previous", prev);
|
||||
// DEBUG("Current", cur);
|
||||
|
||||
if (prev.lineName == cur.lineName && prev._sequence == cur._sequence &&
|
||||
prev.lineStatus != "online" && cur.lineStatus == "online" && sequence) {
|
||||
INFO("Transition to ONLINE detected");
|
||||
// DEBUG(cur);
|
||||
// DEBUG(prev);
|
||||
// console.log("TRANSITION TO ONLINE", prev, cur);
|
||||
|
||||
// Check if there are already FSP, FGSP events for this sequence
|
||||
@@ -59,16 +68,22 @@ class DetectSOLEOL {
|
||||
sequence,
|
||||
point: cur._point,
|
||||
remarks,
|
||||
labels
|
||||
labels,
|
||||
meta: {auto: true, author: `*${this.constructor.name}*`}
|
||||
}
|
||||
|
||||
// console.log(projectId, payload);
|
||||
INFO("Posting event", projectId, payload);
|
||||
await event.post(projectId, payload);
|
||||
} else {
|
||||
// A first shot point has been already entered in the log,
|
||||
// so we have nothing to do here.
|
||||
INFO("FSP already in the log. Doing nothing");
|
||||
}
|
||||
} else if (prev.lineStatus == "online" && cur.lineStatus != "online") {
|
||||
INFO("Transition to OFFLINE detected");
|
||||
// DEBUG(cur);
|
||||
// DEBUG(prev);
|
||||
// console.log("TRANSITION TO OFFLINE", prev, cur);
|
||||
|
||||
// Check if there are already LSP, LGSP events for this sequence
|
||||
@@ -87,14 +102,17 @@ class DetectSOLEOL {
|
||||
sequence,
|
||||
point: prev._point,
|
||||
remarks,
|
||||
labels
|
||||
labels,
|
||||
meta: {auto: true, author: `*${this.constructor.name}*`}
|
||||
}
|
||||
|
||||
// console.log(projectId, payload);
|
||||
INFO("Posting event", projectId, payload);
|
||||
await event.post(projectId, payload);
|
||||
} else {
|
||||
// A first shot point has been already entered in the log,
|
||||
// so we have nothing to do here.
|
||||
INFO("LSP already in the log. Doing nothing");
|
||||
}
|
||||
}
|
||||
// Processing of this shot has already been completed.
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
const Handlers = [
|
||||
require('./detect-soleol'),
|
||||
require('./detect-soft-start'),
|
||||
require('./report-line-change-time'),
|
||||
require('./detect-fdsp')
|
||||
];
|
||||
|
||||
|
||||
301
lib/www/server/events/handlers/report-line-change-time.js
Normal file
301
lib/www/server/events/handlers/report-line-change-time.js
Normal file
@@ -0,0 +1,301 @@
|
||||
const { schema2pid } = require('../../lib/db/connection');
|
||||
const { event, project } = require('../../lib/db');
|
||||
const { withinValidity } = require('../../lib/utils/ranges');
|
||||
const unique = require('../../lib/utils/unique');
|
||||
const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
|
||||
|
||||
class ReportLineChangeTime {
|
||||
/* Data may come much faster than we can process it, so we put it
|
||||
* in a queue and process it at our own pace.
|
||||
*
|
||||
* The run() method fills the queue with the necessary data and then
|
||||
* calls processQueue().
|
||||
*
|
||||
* The processQueue() method looks takes the first two elements in
|
||||
* the queue and processes them if they are not already being taken
|
||||
* care of by a previous processQueue() call – this will happen when
|
||||
* data is coming in faster than it can be processed.
|
||||
*
|
||||
* If the processQueue() call is the first to see the two bottommost
|
||||
* two elements, it will process them and, when finished, it will set
|
||||
* the `isPending` flag of the bottommost element to `false`, thus
|
||||
* letting the next call know that it has work to do.
|
||||
*
|
||||
* If the queue was empty, run() will set the `isPending` flag of its
|
||||
* first element to a falsy value, thus bootstrapping the process.
|
||||
*/
|
||||
static MAX_QUEUE_SIZE = 125000;
|
||||
|
||||
queue = [];
|
||||
|
||||
author = `*${this.constructor.name}*`;
|
||||
|
||||
async processQueue () {
|
||||
DEBUG("Queue length", this.queue.length)
|
||||
while (this.queue.length > 0) {
|
||||
if (this.queue[0].isPending) {
|
||||
DEBUG("Queue busy");
|
||||
setImmediate(() => this.processQueue());
|
||||
return;
|
||||
}
|
||||
|
||||
const cur = this.queue.shift();
|
||||
const next = this.queue[0];
|
||||
const projectId = cur.pid;
|
||||
// Are we being called because of a LGSP or because of a FGSP?
|
||||
const forward = (cur.old?.labels?.includes("LGSP") || cur.new?.labels?.includes("LGSP"));
|
||||
|
||||
if (!projectId) {
|
||||
WARNING("No projectID found in event", cur);
|
||||
return;
|
||||
}
|
||||
|
||||
async function getConfiguration (projectId) {
|
||||
return await project.configuration.get(projectId);
|
||||
}
|
||||
|
||||
|
||||
async function getLineChangeTime (data, forward = false) {
|
||||
if (forward) {
|
||||
const ospEvents = await event.list(projectId, {label: "FGSP"});
|
||||
// DEBUG("ospEvents", ospEvents);
|
||||
const osp = ospEvents.filter(i => i.tstamp > data.tstamp).pop();
|
||||
DEBUG("fsp", osp);
|
||||
// DEBUG("data", data);
|
||||
|
||||
if (osp) {
|
||||
DEBUG("lineChangeTime", osp.tstamp - data.tstamp);
|
||||
return { lineChangeTime: osp.tstamp - data.tstamp, osp };
|
||||
}
|
||||
} else {
|
||||
const ospEvents = await event.list(projectId, {label: "LGSP"});
|
||||
// DEBUG("ospEvents", ospEvents);
|
||||
const osp = ospEvents.filter(i => i.tstamp < data.tstamp).shift();
|
||||
DEBUG("lsp", osp);
|
||||
// DEBUG("data", data);
|
||||
|
||||
if (osp) {
|
||||
DEBUG("lineChangeTime", data.tstamp - osp.tstamp);
|
||||
return { lineChangeTime: data.tstamp - osp.tstamp, osp };
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function parseInterval (dt) {
|
||||
const daySeconds = (dt/1000) % 86400;
|
||||
const d = Math.floor((dt/1000) / 86400);
|
||||
const dateObject = new Date(null);
|
||||
dateObject.setSeconds(daySeconds);
|
||||
const [ h, m, s ] = dateObject.toISOString().slice(11, 19).split(":").map(Number);
|
||||
return {d, h, m, s};
|
||||
}
|
||||
|
||||
function formatInterval (i) {
|
||||
let str = "";
|
||||
for (let [k, v] of Object.entries(i)) {
|
||||
if (v) {
|
||||
str += " " + v + " " + k;
|
||||
}
|
||||
}
|
||||
return str.trim();
|
||||
}
|
||||
|
||||
const deleteStaleEvents = async (seq) => {
|
||||
if (seq) {
|
||||
DEBUG("Will delete lct events related to sequence(s)", seq);
|
||||
|
||||
const jpq = `$."${this.author}"`;
|
||||
|
||||
const opts = {jpq};
|
||||
|
||||
if (Array.isArray(seq)) {
|
||||
opts.sequences = unique(seq).filter(i => !!i);
|
||||
} else {
|
||||
opts.sequence = seq;
|
||||
}
|
||||
|
||||
const staleEvents = await event.list(projectId, opts);
|
||||
DEBUG(staleEvents.length ?? 0, "events to delete");
|
||||
for (let staleEvent of staleEvents) {
|
||||
DEBUG(`Deleting event id ${staleEvent.id} (seq = ${staleEvent.sequence}, point = ${staleEvent.point})`);
|
||||
await event.del(projectId, staleEvent.id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const createLineChangeTimeEvents = async (lineChangeTime, data, osp) => {
|
||||
|
||||
const events = [];
|
||||
const cfg = (await project.configuration.get(projectId));
|
||||
const nlcd = cfg?.production?.nominalLineChangeDuration * 60*1000; // m → ms
|
||||
DEBUG("nlcd", nlcd);
|
||||
if (nlcd && lineChangeTime > nlcd) {
|
||||
const excess = lineChangeTime-nlcd;
|
||||
const excessString = formatInterval(parseInterval(excess));
|
||||
DEBUG("excess", excess, excessString);
|
||||
|
||||
// ref: The later of the two events
|
||||
const ref = forward ? osp : data;
|
||||
const payload = {
|
||||
// tstamp: new Date(ref.tstamp-1),
|
||||
sequence: ref.sequence,
|
||||
point: ref.point,
|
||||
remarks: `_Nominal line change duration exceeded by ${excessString}_`,
|
||||
labels: [ "Nav", "Prod" ],
|
||||
meta: {
|
||||
auto: true,
|
||||
author: this.author,
|
||||
[this.author]: {
|
||||
parents: [
|
||||
data.id,
|
||||
osp.id
|
||||
],
|
||||
type: "excess",
|
||||
value: excess
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
events.push(payload);
|
||||
DEBUG("Created line change duration exceeded event", projectId, payload);
|
||||
}
|
||||
|
||||
|
||||
const lctString = formatInterval(parseInterval(lineChangeTime));
|
||||
|
||||
// ref: The later of the two events
|
||||
const ref = forward ? osp : data;
|
||||
const payload = {
|
||||
// tstamp: new Date(ref.tstamp-1),
|
||||
sequence: ref.sequence,
|
||||
point: ref.point,
|
||||
remarks: `Line change time: ${lctString}`,
|
||||
labels: [ "Nav", "Prod" ],
|
||||
meta: {
|
||||
auto: true,
|
||||
author: this.author,
|
||||
[this.author]: {
|
||||
parents: [
|
||||
data.id,
|
||||
osp.id
|
||||
],
|
||||
type: "lineChangeTime",
|
||||
value: lineChangeTime
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
events.push(payload);
|
||||
DEBUG("Created line change duration event", projectId, payload);
|
||||
|
||||
return events;
|
||||
}
|
||||
|
||||
const maybePostEvent = async (projectId, payload) => {
|
||||
DEBUG("Posting event", projectId, payload);
|
||||
await event.post(projectId, payload);
|
||||
}
|
||||
|
||||
try {
|
||||
// DEBUG("Previous", prev);
|
||||
DEBUG("Current", cur);
|
||||
DEBUG("Forward search", forward);
|
||||
|
||||
// We have these scenarios to consider:
|
||||
// INSERT:
|
||||
// `old` will be NULL
|
||||
// Add event with line change time:
|
||||
// - match validity with `new`
|
||||
// - meta.ReportLineChangeTime.link refers to new.uid (or new.id?)
|
||||
// UPDATE:
|
||||
// `old` is not NULL
|
||||
// `new` is not NULL
|
||||
// Delete previous event from event_log (not event_log_full)
|
||||
// Add event with line change time:
|
||||
// - match validity with `new`
|
||||
// - meta.ReportLineChangeTime.link refers to new.uid (or new.id?)
|
||||
// DELETE:
|
||||
// `old` is not NULL
|
||||
// `new` will be NULL
|
||||
// Delete previous event from event_log (not event_log_full)
|
||||
|
||||
await deleteStaleEvents([cur.old?.sequence, cur.new?.sequence]);
|
||||
|
||||
if (cur.operation == "INSERT") {
|
||||
// NOTE: UPDATE on the event_log view translates to one UPDATE plus one INSERT
|
||||
// on event_log_full, so we don't need to worry about UPDATE here.
|
||||
const data = cur.new;
|
||||
DEBUG("INSERT seen: will add lct events related to ", data.id);
|
||||
|
||||
if (withinValidity(data.validity)) {
|
||||
DEBUG("Event within validity period", data.validity, new Date());
|
||||
|
||||
data.tstamp = new Date(data.tstamp);
|
||||
const { lineChangeTime, osp } = await getLineChangeTime(data, forward);
|
||||
|
||||
if (lineChangeTime) {
|
||||
|
||||
const events = await createLineChangeTimeEvents(lineChangeTime, data, osp);
|
||||
|
||||
if (events?.length) {
|
||||
DEBUG("Deleting other events for sequence", events[0].sequence);
|
||||
await deleteStaleEvents(events[0].sequence);
|
||||
}
|
||||
|
||||
for (let payload of events) {
|
||||
await maybePostEvent(projectId, payload);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
DEBUG("Event outside of validity range", data.validity, "lct events not inserted");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
// Processing of this shot has already been completed.
|
||||
// The queue can now move forward.
|
||||
} catch (err) {
|
||||
ERROR("ReportLineChangeTime Error")
|
||||
ERROR(err);
|
||||
} finally {
|
||||
if (next) {
|
||||
next.isPending = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async run (data) {
|
||||
if (!data || data.channel !== "event") {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!(data.payload?.new?.labels) && !(data.payload?.old?.labels)) {
|
||||
return;
|
||||
}
|
||||
|
||||
const n = data.payload.new;
|
||||
const o = data.payload.old;
|
||||
|
||||
if (!n?.labels?.includes("FGSP") && !o?.labels?.includes("FGSP") &&
|
||||
!n?.labels?.includes("LGSP") && !o?.labels?.includes("LGSP")) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.queue.length < ReportLineChangeTime.MAX_QUEUE_SIZE) {
|
||||
|
||||
this.queue.push({
|
||||
...data.payload,
|
||||
isPending: this.queue.length,
|
||||
});
|
||||
|
||||
} else {
|
||||
ALERT("ReportLineChangeTime queue full at", this.queue.length);
|
||||
}
|
||||
|
||||
this.processQueue();
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = ReportLineChangeTime;
|
||||
@@ -1,4 +1,4 @@
|
||||
const { listen } = require('../ws/db');
|
||||
const { listen } = require('../lib/db/notify');
|
||||
const channels = require('../lib/db/channels');
|
||||
const handlers = require('./handlers').init();
|
||||
const { ERROR, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
|
||||
|
||||
@@ -10,25 +10,34 @@ async function list (projectId, opts = {}) {
|
||||
const offset = Math.abs((opts.page-1)*opts.itemsPerPage) || 0;
|
||||
const limit = Math.abs(Number(opts.itemsPerPage)) || null;
|
||||
|
||||
const filter = opts.sequence
|
||||
? String(opts.sequence).includes(";")
|
||||
? [ "sequence = ANY ( $1 )", [ opts.sequence.split(";") ] ]
|
||||
: [ "sequence = $1", [ opts.sequence ] ]
|
||||
: opts.date0
|
||||
? opts.date1
|
||||
? [ "date(tstamp) BETWEEN SYMMETRIC $1 AND $2", [ opts.date0, opts.date1 ] ]
|
||||
: [ "date(tstamp) = $1", [ opts.date0 ] ]
|
||||
: [ "true = true", [] ];
|
||||
const sequence = opts.sequence && Number(opts.sequence) || null;
|
||||
const sequences = opts.sequences && (Array.isArray(opts.sequences)
|
||||
? opts.sequences.map(Number)
|
||||
: opts.sequences.split(/[^0-9]+/).map(Number)) || null;
|
||||
const date0 = opts.date0 ?? null;
|
||||
const date1 = opts.date1 ?? null;
|
||||
const jpq = opts.jpq || null;
|
||||
const label = opts.label ?? null;
|
||||
|
||||
const text = `
|
||||
SELECT *
|
||||
FROM event_log e
|
||||
WHERE
|
||||
${filter[0]}
|
||||
ORDER BY ${sortKey} ${sortDir};
|
||||
($1::numeric IS NULL OR sequence = $1) AND
|
||||
($2::numeric[] IS NULL OR sequence = ANY( $2 )) AND
|
||||
($3::timestamptz IS NULL OR date(tstamp) = $3) AND
|
||||
($3::timestamptz IS NULL OR
|
||||
(($4::timestamptz IS NULL AND date(tstamp) = $3) OR
|
||||
date(tstamp) BETWEEN SYMMETRIC $3 AND $4)) AND
|
||||
($5::jsonpath IS NULL OR jsonb_path_exists(meta::jsonb, $5::jsonpath)) AND
|
||||
($6::text IS NULL OR $6 = ANY(labels))
|
||||
ORDER BY ${sortKey} ${sortDir}
|
||||
LIMIT ${limit};
|
||||
`;
|
||||
|
||||
const res = await client.query(text, filter[1]);
|
||||
const values = [ sequence, sequences, date0, date1, jpq, label ];
|
||||
|
||||
const res = await client.query(text, values);
|
||||
client.release();
|
||||
return res.rows.map(i => replaceMarkers(i));
|
||||
}
|
||||
|
||||
@@ -9,10 +9,10 @@ async function post (projectId, payload, opts = {}) {
|
||||
|
||||
const text = `
|
||||
INSERT
|
||||
INTO event_log (tstamp, sequence, point, remarks, labels)
|
||||
VALUES ($1, $2, $3, replace_placeholders($4, $1, $2, $3), $5);
|
||||
INTO event_log (tstamp, sequence, point, remarks, labels, meta)
|
||||
VALUES ($1, $2, $3, replace_placeholders($4, $1, $2, $3), $5, $6);
|
||||
`;
|
||||
const values = [ p.tstamp, p.sequence, p.point, p.remarks, p.labels ];
|
||||
const values = [ p.tstamp, p.sequence, p.point, p.remarks, p.labels, p.meta ];
|
||||
|
||||
DEBUG("Inserting new values: %O", values);
|
||||
await client.query(text, values);
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
// FIXME This code is in painful need of refactoring
|
||||
|
||||
const { DEBUG } = require("DOUGAL_ROOT/debug")(__filename);
|
||||
const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
|
||||
const { setSurvey, transaction, pool } = require('../connection');
|
||||
|
||||
let last_tstamp = 0;
|
||||
@@ -70,9 +70,9 @@ async function getNearestOfflinePreplot (candidates) {
|
||||
if ("latitude" in candidates[0] && "longitude" in candidates[0]) {
|
||||
text = `
|
||||
SELECT
|
||||
'${c._schema}' AS _schema,
|
||||
'${c.schema}' AS schema,
|
||||
ST_Distance(ST_Transform(ST_SetSRID(ST_MakePoint($1, $2), 4326), ST_SRID(geometry)), geometry) AS distance
|
||||
FROM ${c._schema}.preplot_points
|
||||
FROM ${c.schema}.preplot_points
|
||||
ORDER BY distance ASC
|
||||
LIMIT 1;
|
||||
`;
|
||||
@@ -80,9 +80,9 @@ async function getNearestOfflinePreplot (candidates) {
|
||||
} else if ("easting" in candidates[0] && "northing" in candidates[0]) {
|
||||
text = `
|
||||
SELECT
|
||||
'${c._schema}' AS _schema,
|
||||
'${c.schema}' AS schema,
|
||||
ST_Distance(ST_SetSRID(ST_MakePoint($1, $2), ST_SRID(geometry)), geometry) AS distance
|
||||
FROM ${c._schema}.preplot_points
|
||||
FROM ${c.schema}.preplot_points
|
||||
ORDER BY distance ASC
|
||||
LIMIT 1;
|
||||
`;
|
||||
@@ -98,13 +98,13 @@ async function getNearestOfflinePreplot (candidates) {
|
||||
const results = [];
|
||||
for (const qry of queries) {
|
||||
const res = await client.query(qry.text, qry.values);
|
||||
if (res.rows[0] && res.rows[0]._schema) {
|
||||
if (res.rows[0] && res.rows[0].schema) {
|
||||
results.push(res.rows[0]);
|
||||
}
|
||||
}
|
||||
client.release();
|
||||
const _schema = results.sort( (a, b) => a.distance - b.distance).shift()?._schema;
|
||||
return candidates.find(c => c._schema == _schema);
|
||||
const schema = results.sort( (a, b) => a.distance - b.distance).shift()?.schema;
|
||||
return candidates.find(c => c.schema == schema);
|
||||
}
|
||||
|
||||
async function saveOnline (dataset, opts = {}) {
|
||||
@@ -154,8 +154,8 @@ async function saveOnline (dataset, opts = {}) {
|
||||
}
|
||||
await transaction.commit(client);
|
||||
} catch (error) {
|
||||
console.error("ONLINE DATA INSERT ERROR");
|
||||
console.error(error);
|
||||
ERROR("ONLINE DATA INSERT ERROR");
|
||||
ERROR(error);
|
||||
await transaction.rollback(client);
|
||||
} finally {
|
||||
client.release();
|
||||
@@ -211,6 +211,37 @@ async function saveOffline (navData, opts = {}) {
|
||||
client.release();
|
||||
}
|
||||
|
||||
async function getCandidates (navData) {
|
||||
|
||||
const configs = await getAllProjectConfigs();
|
||||
|
||||
// We just get the bits of interest: pattern and schema
|
||||
const candidates = configs.map(c => {
|
||||
if (!c?.data?.online?.line || c?.archived === true) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const p = c.data.online.line.pattern; // For short
|
||||
|
||||
const rx = new RegExp(p.regex, p.flags);
|
||||
const matches = navData.lineName.match(rx);
|
||||
|
||||
if (!matches || ((matches.length+1) < p.captures.length)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
matches.shift(); // Get rid of the full matched text
|
||||
const obj = Object.assign({}, navData, {schema: c.schema});
|
||||
p.captures.forEach( (k, i) => {
|
||||
obj[k] = matches[i];
|
||||
});
|
||||
return obj;
|
||||
}).filter(c => !!c);
|
||||
DEBUG("Candidates: %j", candidates.map(c => c.schema));
|
||||
|
||||
return candidates;
|
||||
}
|
||||
|
||||
async function save (navData, opts = {}) {
|
||||
|
||||
const hasLatLon = ("latitude" in navData && "longitude" in navData);
|
||||
@@ -218,44 +249,21 @@ async function save (navData, opts = {}) {
|
||||
const hasLinePoint = ("lineName" in navData && "point" in navData);
|
||||
if (!(hasLinePoint || hasLatLon || hasEastNorth)) {
|
||||
// This is of no interest to us
|
||||
console.warning("Ignoring data without useful values", navData);
|
||||
NOTICE("Ignoring data without useful values", navData);
|
||||
return;
|
||||
}
|
||||
|
||||
// DEBUG("navData", navData);
|
||||
|
||||
if (navData.online === true) {
|
||||
|
||||
// So we have a lineName, see which projects match the line pattern.
|
||||
// For this we need to get all the project configs
|
||||
const configs = await getAllProjectConfigs();
|
||||
|
||||
// We just get the bits of interest: pattern and schema
|
||||
const candidates = configs.map(c => {
|
||||
if (!(c && c.online && c.online.line)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const p = c.online.line.pattern; // For short
|
||||
|
||||
const rx = new RegExp(p.regex, p.flags);
|
||||
const matches = navData.lineName.match(rx);
|
||||
|
||||
if (!matches || ((matches.length+1) < p.captures.length)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
matches.shift(); // Get rid of the full matched text
|
||||
const obj = Object.assign({}, navData, {schema: c.schema});
|
||||
p.captures.forEach( (k, i) => {
|
||||
obj[k] = matches[i];
|
||||
});
|
||||
return obj;
|
||||
}).filter(c => !!c);
|
||||
DEBUG("Candidates: %j", candidates);
|
||||
// console.log("CANDIDATES", candidates);
|
||||
const candidates = await getCandidates(navData);
|
||||
|
||||
if (candidates.length == 0) {
|
||||
// This is probably a test line, so we treat it as offline
|
||||
console.log("No match");
|
||||
WARNING("No match");
|
||||
} else {
|
||||
if (candidates.length == 1) {
|
||||
// Only one candidate, associate with it
|
||||
@@ -271,7 +279,7 @@ async function save (navData, opts = {}) {
|
||||
await saveOnline(candidates.filter(c => c.schema == destinationSchema), opts);
|
||||
navData.payload._schema = destinationSchema;
|
||||
} else {
|
||||
console.log("Nowhere to save to");
|
||||
WARNING("Nowhere to save to");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -282,17 +290,18 @@ async function save (navData, opts = {}) {
|
||||
}
|
||||
} else {
|
||||
// We are offline. We only assign _schema once every save_interval seconds at most
|
||||
// unless there is gun data present.
|
||||
if (opts.offline_survey_heuristics == "nearest_preplot") {
|
||||
const now = Date.now();
|
||||
const do_save = !opts.offline_survey_detect_interval ||
|
||||
(now - last_tstamp) >= opts.offline_survey_detect_interval;
|
||||
|
||||
if (do_save) {
|
||||
if (do_save || "guns" in navData?.payload) {
|
||||
const configs = await getAllProjectConfigs();
|
||||
const candidates = configs.map(c => Object.assign({}, navData, {_schema: c.schema}));
|
||||
const candidates = await getCandidates(navData);
|
||||
const bestCandidate = await getNearestOfflinePreplot(candidates);
|
||||
if (bestCandidate) {
|
||||
navData.payload._schema = bestCandidate._schema;
|
||||
navData.payload._schema = bestCandidate.schema;
|
||||
last_tstamp = now;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -36,6 +36,9 @@ async function patch (projectId, payload, opts = {}) {
|
||||
}
|
||||
}
|
||||
|
||||
// We do not allow users to change the schema
|
||||
delete payload.schema;
|
||||
|
||||
const dest = removeNulls(deepMerge(source, payload));
|
||||
await modify(projectId, dest);
|
||||
return dest;
|
||||
|
||||
@@ -7,10 +7,11 @@ const { INFO, DEBUG, WARNING, ERROR } = require('DOUGAL_ROOT/debug')(__filename)
|
||||
|
||||
|
||||
function checkSyntax (value, type = "project") {
|
||||
var requiredFields = {};
|
||||
|
||||
switch (type) {
|
||||
case "project":
|
||||
var requiredFields = {
|
||||
requiredFields = {
|
||||
id: "string",
|
||||
name: "string",
|
||||
epsg: "number",
|
||||
@@ -18,7 +19,7 @@ function checkSyntax (value, type = "project") {
|
||||
};
|
||||
break;
|
||||
case "binning":
|
||||
var requiredFields = {
|
||||
requiredFields = {
|
||||
theta: "number",
|
||||
I_inc: "number",
|
||||
J_inc: "number",
|
||||
@@ -28,23 +29,19 @@ function checkSyntax (value, type = "project") {
|
||||
}
|
||||
break
|
||||
case "origin":
|
||||
var requiredFields = {
|
||||
requiredFields = {
|
||||
easting: "number",
|
||||
northing: "number",
|
||||
I: "number",
|
||||
J: "number"
|
||||
}
|
||||
break;
|
||||
break;
|
||||
default:
|
||||
return typeof type == "function"
|
||||
? type(value)
|
||||
: typeof value == type;
|
||||
}
|
||||
|
||||
// return Object.entries(requiredFields).every( ([field, test]) => {
|
||||
// return value.hasOwnProperty(field) && checkSyntax(value[field], test);
|
||||
// });
|
||||
|
||||
for (const [field, test] of Object.entries(requiredFields)) {
|
||||
if (!value.hasOwnProperty(field)) {
|
||||
return `Missing required property: ${field}`;
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
const fs = require('fs');
|
||||
const YAML = require('yaml');
|
||||
const flattenQCDefinitions = require('../../../utils/flattenQCDefinitions');
|
||||
const { translatePath } = require('../../../utils/logicalPath');
|
||||
const project = require('../../project'); // lib/db/project
|
||||
|
||||
|
||||
@@ -8,7 +9,7 @@ async function get (projectId, opts = {}) {
|
||||
const qcConfig = (await project.configuration.get(projectId))?.qc;
|
||||
if (qcConfig?.definitions) {
|
||||
try {
|
||||
const definitions = YAML.parse(fs.readFileSync(qcConfig.definitions).toString());
|
||||
const definitions = YAML.parse(fs.readFileSync(translatePath(qcConfig.definitions)).toString());
|
||||
|
||||
return opts.flat ? flattenQCDefinitions(definitions) : definitions;
|
||||
} catch (err) {
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
const fs = require('fs/promises');
|
||||
const Path = require('path');
|
||||
const mime = require('./mime-types');
|
||||
const { translatePath, logicalRoot } = require('./logical');
|
||||
const { translatePath, logicalRoot } = require('../utils/logicalPath');
|
||||
const systemCfg = require('../config');
|
||||
const projectCfg = require('../db/configuration');
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ const { pool, setSurvey, transaction, fetchRow } = require('../db/connection')
|
||||
const { project, sequence, configuration, info } = require('../db')
|
||||
const flattenQCDefinitions = require('./flatten');
|
||||
const { projectHash, sequenceHash } = require('./last-modified');
|
||||
const { translatePath } = require('../utils/logicalPath');
|
||||
|
||||
const { runShotsQC, saveShotsQC } = require('./shots');
|
||||
const { runSequenceQCs, saveSequenceQCs } = require('./sequences');
|
||||
@@ -46,8 +47,8 @@ async function getProjectQCConfig (projectId) {
|
||||
console.log("qcConfig", qcConfig);
|
||||
if (qcConfig?.definitions && qcConfig?.parameters) {
|
||||
const definitions =
|
||||
flattenQCDefinitions(YAML.parse(fs.readFileSync(qcConfig.definitions).toString()));
|
||||
const parameters = YAML.parse(fs.readFileSync(qcConfig.parameters).toString());
|
||||
flattenQCDefinitions(YAML.parse(fs.readFileSync(translatePath(qcConfig.definitions)).toString()));
|
||||
const parameters = YAML.parse(fs.readFileSync(translatePath(qcConfig.parameters)).toString());
|
||||
|
||||
return { definitions, parameters };
|
||||
}
|
||||
|
||||
@@ -5,5 +5,8 @@ module.exports = {
|
||||
replaceMarkers: require('./replaceMarkers'),
|
||||
flattenQCDefinitions: require('./flattenQCDefinitions'),
|
||||
deepMerge: require('./deepMerge'),
|
||||
removeNulls: require('./removeNulls')
|
||||
removeNulls: require('./removeNulls'),
|
||||
logicalPath: require('./logicalPath'),
|
||||
ranges: require('./ranges'),
|
||||
unique: require('./unique')
|
||||
};
|
||||
|
||||
@@ -10,6 +10,7 @@ function translatePath (file) {
|
||||
return physicalPath;
|
||||
} else {
|
||||
// An attempt to break out of the logical path?
|
||||
console.warn("Attempting to break out of the logical path?", physicalPath, prefix);
|
||||
throw {
|
||||
status: 404,
|
||||
message: "Not found"
|
||||
74
lib/www/server/lib/utils/ranges.js
Normal file
74
lib/www/server/lib/utils/ranges.js
Normal file
@@ -0,0 +1,74 @@
|
||||
|
||||
function parseRange (str) {
|
||||
const rx = /^[\[(].*,.*[)\]]$/
|
||||
|
||||
if (rx.test(str)) {
|
||||
const lower_inclusive = str[0] == '[';
|
||||
const upper_inclusive = str[str.length-1] == ']';
|
||||
const [ lower, upper ] = str.slice(1,-1).split(",");
|
||||
return {
|
||||
upper,
|
||||
lower,
|
||||
upper_inclusive,
|
||||
lower_inclusive
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
function parseValidity (str) {
|
||||
const range = parseRange(str);
|
||||
|
||||
if (range) {
|
||||
ts0 = range.lower ? new Date(range.lower) : null;
|
||||
ts1 = range.upper ? new Date(range.upper) : null;
|
||||
|
||||
return {
|
||||
...range,
|
||||
lower: ts0,
|
||||
upper: ts1
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
function withinValidity (range, ts) {
|
||||
if (!ts) {
|
||||
ts = new Date();
|
||||
}
|
||||
|
||||
if (typeof range === "string") {
|
||||
range = parseValidity(range);
|
||||
}
|
||||
|
||||
if (range.lower) {
|
||||
if (range.lower_inclusive) {
|
||||
if (!(range.lower <= ts)) {
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
if (!(range.lower < ts)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (range.upper) {
|
||||
if (range.upper_inclusive) {
|
||||
if (!(range.upper >= ts)) {
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
if (!(range.upper > ts)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
parseRange,
|
||||
parseValidity,
|
||||
withinValidity
|
||||
}
|
||||
|
||||
6
lib/www/server/lib/utils/unique.js
Normal file
6
lib/www/server/lib/utils/unique.js
Normal file
@@ -0,0 +1,6 @@
|
||||
|
||||
function unique(array) {
|
||||
return [...new Set(array)];
|
||||
}
|
||||
|
||||
module.exports = unique;
|
||||
@@ -1,71 +0,0 @@
|
||||
const { pool } = require('../lib/db/connection');
|
||||
|
||||
var client;
|
||||
|
||||
const channels = {};
|
||||
|
||||
async function notify (data) {
|
||||
|
||||
if (data.channel in channels) {
|
||||
data._received = new Date();
|
||||
try {
|
||||
const json = JSON.parse(data.payload);
|
||||
data.payload = json;
|
||||
} catch {
|
||||
// Ignore the error
|
||||
}
|
||||
for (const listener of channels[data.channel]) {
|
||||
await listener(JSON.parse(JSON.stringify(data)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function reconnect () {
|
||||
console.log("Reconnecting");
|
||||
// No need to provide parameters, channels should already be populated.
|
||||
listen();
|
||||
}
|
||||
|
||||
async function listen (addChannels, callback) {
|
||||
if (!client) {
|
||||
try {
|
||||
client = await pool.connect();
|
||||
} catch (err) {
|
||||
console.error("Error connecting to DB", err);
|
||||
console.log("Will try again in 15 seconds");
|
||||
setImmediate(() => client = null);
|
||||
setTimeout(() => {
|
||||
listen(addChannels, callback);
|
||||
}, 15000);
|
||||
return;
|
||||
}
|
||||
client.on('notification', notify);
|
||||
console.log("Websocket client connected", Object.keys(channels));
|
||||
client.on('error', (err) => console.error("Events client error: ", err));
|
||||
client.on('end', () => {
|
||||
console.warn("Websocket events client disconnected. Will attempt to reconnect in five seconds");
|
||||
setImmediate(() => client = null);
|
||||
setTimeout(reconnect, 5000);
|
||||
});
|
||||
}
|
||||
|
||||
if (addChannels) {
|
||||
if (!Array.isArray(addChannels)) {
|
||||
addChannels = [addChannels];
|
||||
}
|
||||
|
||||
for (const channel of addChannels) {
|
||||
if (!(channel in channels)) {
|
||||
await client.query("LISTEN "+channel);
|
||||
channels[channel] = [];
|
||||
console.log("Listening to ", channel);
|
||||
}
|
||||
|
||||
channels[channel].push(callback);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
listen
|
||||
}
|
||||
@@ -1,6 +1,6 @@
|
||||
const ws = require('ws');
|
||||
const URL = require('url');
|
||||
const db = require('./db');
|
||||
const { listen } = require('../lib/db/notify');
|
||||
const channels = require('../lib/db/channels');
|
||||
|
||||
function start (server, pingInterval=30000) {
|
||||
@@ -22,7 +22,7 @@ function start (server, pingInterval=30000) {
|
||||
}
|
||||
});
|
||||
|
||||
db.listen(channels, (data) => {
|
||||
listen(channels, (data) => {
|
||||
wsServer.clients.forEach( (socket) => {
|
||||
socket.send(JSON.stringify(data));
|
||||
})
|
||||
|
||||
@@ -16,7 +16,12 @@ OUTPATH="$OUTDIR/$OUTNAME"
|
||||
# 30000/UDP: Navigation system headers
|
||||
# Not all inputs will be present in all systems.
|
||||
#
|
||||
EXPR="udp and (port 4461 or port 4462 or port 30000)"
|
||||
# NOTE: $INS_HOST must be defined and point to the
|
||||
# navigation server. The reason we don't use a port
|
||||
# filter for this data is because that doesn't work
|
||||
# with fragmented UDP packets.
|
||||
#
|
||||
EXPR="udp and (port 4461 or port 4462 or src host $INS_HOST)"
|
||||
|
||||
if [[ ! -d "$OUTDIR" ]]; then
|
||||
mkdir "$OUTDIR"
|
||||
|
||||
42
sbin/rewrite-captures.sh
Executable file
42
sbin/rewrite-captures.sh
Executable file
@@ -0,0 +1,42 @@
|
||||
#!/bin/bash
|
||||
#
|
||||
# Rewrite packet captures in order to be able to replay them.
|
||||
#
|
||||
# SINET: Rewrite all packets with this source IP address
|
||||
# SETHER: Rewrite all packets with this MAC
|
||||
#
|
||||
# DINET: Rewrite all packets with this destination IP address
|
||||
# DETHER: Rewrite all packets with this destination MAC address
|
||||
#
|
||||
# The resulting files have the original name with "-rewritten.pcap"
|
||||
# appended as a suffix. Those packets may then be replayed from a
|
||||
# different computer or virtual container, for instance with:
|
||||
#
|
||||
# sudo bittwist -i 1 -v -m10 capture-rewritten.pcap
|
||||
#
|
||||
# Where -i n is the interface name (use bittwist -d to list available
|
||||
# interfaces), -v is the verbose flag and -m10 replays at 10× speed.
|
||||
#
|
||||
|
||||
SINET=${SINET:-$(ip -o -4 addr |grep -v " lo " |head -n 1 |sed -r 's/^.*inet\s([0-9.]+).*$/\1/')}
|
||||
SETHER=${SETHER:-$(ip -o link |grep -v " lo" |head -n 1 |sed -r 's/^.*ether\s([0-9a-fA-F:]+).*$/\1/')}
|
||||
|
||||
DINET=${DINET:-$(ip -o -4 addr |grep -v " lo " |head -n 1 |sed -r 's/^.*inet\s([0-9.]+).*$/\1/')}
|
||||
DETHER=${DETHER:-$(ip -o link |grep -v " lo" |head -n 1 |sed -r 's/^.*ether\s([0-9a-fA-F:]+).*$/\1/')}
|
||||
|
||||
for f in $*; do
|
||||
|
||||
OUTFNAME=$f-rewritten.pcap
|
||||
echo $f → $OUTFNAME
|
||||
if [[ -n "$SINET" && -n "$SETHER" ]]; then
|
||||
tcprewrite -S 0.0.0.0/0:$SINET --enet-smac=$SETHER \
|
||||
-D 0.0.0.0/0:$DINET --enet-dmac=$DETHER \
|
||||
--infile "$f" \
|
||||
--outfile "$OUTFNAME"
|
||||
else
|
||||
tcprewrite -D 0.0.0.0/0:$DINET --enet-dmac=$DETHER \
|
||||
--infile "$f" \
|
||||
--outfile "$OUTFNAME"
|
||||
fi
|
||||
|
||||
done
|
||||
Reference in New Issue
Block a user