Merge branch '278-rewrite-events-queue' into 'devel'

Resolve "Rewrite events queue"

Closes #278

See merge request wgp/dougal/software!46
This commit is contained in:
D. Berge
2023-10-17 10:28:21 +00:00
18 changed files with 782 additions and 529 deletions

View File

@@ -0,0 +1,164 @@
-- Support notification payloads larger than Postgres' NOTIFY limit.
--
-- New schema version: 0.4.3
--
-- ATTENTION:
--
-- ENSURE YOU HAVE BACKED UP THE DATABASE BEFORE RUNNING THIS SCRIPT.
--
--
-- NOTE: This upgrade affects the public schema only.
-- NOTE: Each application starts a transaction, which must be committed
-- or rolled back.
--
-- This creates a new table where large notification payloads are stored
-- temporarily and from which they might be recalled by the notification
-- listeners. It also creates a purge_notifications() procedure used to
-- clean up old notifications from the notifications log and finally,
-- modifies notify() to support these changes. When a large payload is
-- encountered, the payload is stored in the notify_payloads table and
-- a trimmed down version containing a notification_id is sent to listeners
-- instead. Listeners can then query notify_payloads to retrieve the full
-- payloads. It is the application layer's responsibility to delete old
-- notifications.
--
-- To apply, run as the dougal user:
--
-- psql <<EOF
-- \i $THIS_FILE
-- COMMIT;
-- EOF
--
-- NOTE: It can be applied multiple times without ill effect.
--
BEGIN;
CREATE OR REPLACE PROCEDURE pg_temp.show_notice (notice text) AS $$
BEGIN
RAISE NOTICE '%', notice;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE PROCEDURE pg_temp.upgrade_schema () AS $outer$
BEGIN
RAISE NOTICE 'Updating public schema';
-- We need to set the search path because some of the trigger
-- functions reference other tables in survey schemas assuming
-- they are in the search path.
EXECUTE format('SET search_path TO public');
CREATE TABLE IF NOT EXISTS public.notify_payloads (
id SERIAL,
tstamp timestamptz NOT NULL DEFAULT CURRENT_TIMESTAMP,
payload text NOT NULL DEFAULT '',
PRIMARY KEY (id)
);
CREATE INDEX IF NOT EXISTS notify_payload_tstamp ON notify_payloads (tstamp);
CREATE OR REPLACE FUNCTION public.notify() RETURNS trigger
LANGUAGE plpgsql
AS $$
DECLARE
channel text := TG_ARGV[0];
pid text;
payload text;
notification text;
payload_id integer;
BEGIN
SELECT projects.pid INTO pid FROM projects WHERE schema = TG_TABLE_SCHEMA;
payload := json_build_object(
'tstamp', CURRENT_TIMESTAMP,
'operation', TG_OP,
'schema', TG_TABLE_SCHEMA,
'table', TG_TABLE_NAME,
'old', row_to_json(OLD),
'new', row_to_json(NEW),
'pid', pid
)::text;
IF octet_length(payload) < 1000 THEN
PERFORM pg_notify(channel, payload);
ELSE
-- We need to find another solution
-- FIXME Consider storing the payload in a temporary memory table,
-- referenced by some form of autogenerated ID. Then send the ID
-- as the payload and then it's up to the user to fetch the original
-- payload if interested. This needs a mechanism to expire older payloads
-- in the interest of conserving memory.
INSERT INTO notify_payloads (payload) VALUES (payload) RETURNING id INTO payload_id;
notification := json_build_object(
'tstamp', CURRENT_TIMESTAMP,
'operation', TG_OP,
'schema', TG_TABLE_SCHEMA,
'table', TG_TABLE_NAME,
'pid', pid,
'payload_id', payload_id
)::text;
PERFORM pg_notify(channel, notification);
RAISE INFO 'Payload over limit';
END IF;
RETURN NULL;
END;
$$;
CREATE PROCEDURE public.purge_notifications (age_seconds numeric DEFAULT 120) AS $$
DELETE FROM notify_payloads WHERE EXTRACT(epoch FROM CURRENT_TIMESTAMP - tstamp) > age_seconds;
$$ LANGUAGE sql;
END;
$outer$ LANGUAGE plpgsql;
CREATE OR REPLACE PROCEDURE pg_temp.upgrade () AS $outer$
DECLARE
row RECORD;
current_db_version TEXT;
BEGIN
SELECT value->>'db_schema' INTO current_db_version FROM public.info WHERE key = 'version';
IF current_db_version >= '0.4.3' THEN
RAISE EXCEPTION
USING MESSAGE='Patch already applied';
END IF;
IF current_db_version != '0.4.2' THEN
RAISE EXCEPTION
USING MESSAGE='Invalid database version: ' || current_db_version,
HINT='Ensure all previous patches have been applied.';
END IF;
-- This upgrade modified the `public` schema only, not individual
-- project schemas.
CALL pg_temp.upgrade_schema();
END;
$outer$ LANGUAGE plpgsql;
CALL pg_temp.upgrade();
CALL pg_temp.show_notice('Cleaning up');
DROP PROCEDURE pg_temp.upgrade_schema ();
DROP PROCEDURE pg_temp.upgrade ();
CALL pg_temp.show_notice('Updating db_schema version');
INSERT INTO public.info VALUES ('version', '{"db_schema": "0.4.3"}')
ON CONFLICT (key) DO UPDATE
SET value = public.info.value || '{"db_schema": "0.4.3"}' WHERE public.info.key = 'version';
CALL pg_temp.show_notice('All done. You may now run "COMMIT;" to persist the changes');
DROP PROCEDURE pg_temp.show_notice (notice text);
--
--NOTE Run `COMMIT;` now if all went well
--

View File

@@ -9,105 +9,16 @@ const { ALERT, ERROR, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
* the last shot and first shot of the previous and current dates, respectively.
*/
class DetectFDSP {
/* Data may come much faster than we can process it, so we put it
* in a queue and process it at our own pace.
*
* The run() method fills the queue with the necessary data and then
* calls processQueue().
*
* The processQueue() method looks at the first two elements in
* the queue and processes them if they are not already being taken
* care of by a previous processQueue() call this will happen when
* data is coming in faster than it can be processed.
*
* If the processQueue() call is the first to see the two bottommost
* two elements, it will process them and, when finished, it will set
* the `isPending` flag of the bottommost element to `false`, thus
* letting the next call know that it has work to do.
*
* If the queue was empty, run() will set the `isPending` flag of its
* first element to a falsy value, thus bootstrapping the process.
*/
static MAX_QUEUE_SIZE = 125000;
queue = [];
author = `*${this.constructor.name}*`;
prev = null;
async processQueue () {
DEBUG("Queue length", this.queue.length)
while (this.queue.length > 1) {
if (this.queue[0].isPending) {
setImmediate(() => this.processQueue());
return;
}
const prev = this.queue.shift();
const cur = this.queue[0];
const sequence = Number(cur._sequence);
try {
if (prev.lineName == cur.lineName && prev._sequence == cur._sequence &&
prev.lineStatus == "online" && cur.lineStatus == "online" && sequence) {
// DEBUG("Previous", prev);
// DEBUG("Current", cur);
if (prev.time.substr(0, 10) != cur.time.substr(0, 10)) {
// Possible a date change, but could also be a missing timestamp
// or something else.
const ts0 = new Date(prev.time)
const ts1 = new Date(cur.time);
if (!isNaN(ts0) && !isNaN(ts1) && ts0.getUTCDay() != ts1.getUTCDay()) {
INFO("Sequence shot across midnight UTC detected", cur._sequence, cur.lineName);
const ldsp = {
sequence: prev._sequence,
point: prev._point,
remarks: "Last shotpoint of the day",
labels: ["LDSP", "Prod"],
meta: {auto: true, author: `*${this.constructor.name}*`}
};
const fdsp = {
sequence: cur._sequence,
point: cur._point,
remarks: "First shotpoint of the day",
labels: ["FDSP", "Prod"],
meta: {auto: true, author: `*${this.constructor.name}*`}
};
INFO("LDSP", ldsp);
INFO("FDSP", fdsp);
const projectId = await schema2pid(prev._schema);
if (projectId) {
await event.post(projectId, ldsp);
await event.post(projectId, fdsp);
} else {
ERROR("projectId not found for", prev._schema);
}
} else {
WARNING("False positive on these timestamps", prev.time, cur.time);
WARNING("No events were created");
}
}
}
// Processing of this shot has already been completed.
// The queue can now move forward.
} catch (err) {
ERROR(err);
} finally {
cur.isPending = false;
}
}
constructor () {
DEBUG(`${this.author} instantiated`);
}
async run (data) {
async run (data, ctx) {
if (!data || data.channel !== "realtime") {
return;
}
@@ -116,27 +27,70 @@ class DetectFDSP {
return;
}
const meta = data.payload.new.meta;
if (this.queue.length < DetectFDSP.MAX_QUEUE_SIZE) {
const event = {
isPending: this.queue.length,
_schema: meta._schema,
time: meta.time,
lineStatus: meta.lineStatus,
_sequence: meta._sequence,
_point: meta._point,
lineName: meta.lineName
};
this.queue.push(event);
// DEBUG("EVENT", event);
} else {
ALERT("Queue full at", this.queue.length);
if (!this.prev) {
DEBUG("Initialising `prev`");
this.prev = data;
return;
}
this.processQueue();
try {
DEBUG("Running");
const cur = data;
const sequence = Number(cur._sequence);
if (this.prev.lineName == cur.lineName && this.prev._sequence == cur._sequence &&
this.prev.lineStatus == "online" && cur.lineStatus == "online" && sequence) {
if (this.prev.time.substr(0, 10) != cur.time.substr(0, 10)) {
// Possibly a date change, but could also be a missing timestamp
// or something else.
const ts0 = new Date(this.prev.time)
const ts1 = new Date(cur.time);
if (!isNaN(ts0) && !isNaN(ts1) && ts0.getUTCDay() != ts1.getUTCDay()) {
INFO("Sequence shot across midnight UTC detected", cur._sequence, cur.lineName);
const ldsp = {
sequence: this.prev._sequence,
point: this.prev._point,
remarks: "Last shotpoint of the day",
labels: ["LDSP", "Prod"],
meta: {auto: true, author: `*${this.constructor.name}*`}
};
const fdsp = {
sequence: cur._sequence,
point: cur._point,
remarks: "First shotpoint of the day",
labels: ["FDSP", "Prod"],
meta: {auto: true, author: `*${this.constructor.name}*`}
};
INFO("LDSP", ldsp);
INFO("FDSP", fdsp);
const projectId = await schema2pid(this.prev._schema);
if (projectId) {
await event.post(projectId, ldsp);
await event.post(projectId, fdsp);
} else {
ERROR("projectId not found for", this.prev._schema);
}
} else {
WARNING("False positive on these timestamps", this.prev.time, cur.time);
WARNING("No events were created");
}
}
}
} catch (err) {
DEBUG(`${this.author} error`, err);
throw err;
} finally {
this.prev = data;
}
}
}

View File

@@ -0,0 +1,60 @@
const project = require('../../lib/db/project');
const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
class DetectProjectConfigurationChange {
author = `*${this.constructor.name}*`;
constructor (ctx) {
DEBUG(`${this.author} instantiated`);
// Grab project configurations.
// NOTE that this will run asynchronously
this.run({channel: "project"}, ctx);
}
async run (data, ctx) {
if (!data || data.channel !== "project") {
return;
}
// Project notifications, as of this writing, most likely
// do not carry payloads as those exceed the notification
// size limit.
// For our purposes, we do not care as we just re-read all
// the configurations for all non-archived projects.
try {
DEBUG("Project configuration change detected")
const projects = await project.get();
const _ctx_data = {};
for (let pid of projects.map(i => i.pid)) {
DEBUG("Retrieving configuration for", pid);
const cfg = await project.configuration.get(pid);
if (cfg?.archived === true) {
DEBUG(pid, "is archived. Ignoring");
continue;
}
DEBUG("Saving configuration for", pid);
_ctx_data[pid] = cfg;
}
if (! ("projects" in ctx)) {
ctx.projects = {};
}
ctx.projects.configuration = _ctx_data;
DEBUG("Committed project configuration to ctx.projects.configuration");
} catch (err) {
DEBUG(`${this.author} error`, err);
throw err;
}
}
}
module.exports = DetectProjectConfigurationChange;

View File

@@ -3,94 +3,16 @@ const { event } = require('../../lib/db');
const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
class DetectSoftStart {
/* Data may come much faster than we can process it, so we put it
* in a queue and process it at our own pace.
*
* The run() method fills the queue with the necessary data and then
* calls processQueue().
*
* The processQueue() method looks takes the first two elements in
* the queue and processes them if they are not already being taken
* care of by a previous processQueue() call this will happen when
* data is coming in faster than it can be processed.
*
* If the processQueue() call is the first to see the two bottommost
* two elements, it will process them and, when finished, it will set
* the `isPending` flag of the bottommost element to `false`, thus
* letting the next call know that it has work to do.
*
* If the queue was empty, run() will set the `isPending` flag of its
* first element to a falsy value, thus bootstrapping the process.
*/
static MAX_QUEUE_SIZE = 125000;
queue = [];
author = `*${this.constructor.name}*`;
prev = null;
async processQueue () {
DEBUG("Queue length", this.queue.length)
while (this.queue.length > 1) {
if (this.queue[0].isPending) {
DEBUG("Queue busy");
setImmediate(() => this.processQueue());
return;
}
const prev = this.queue.shift();
const cur = this.queue[0];
try {
// DEBUG("Previous", prev);
// DEBUG("Current", cur);
// TODO:
// Consider whether to remember if soft start / full volume events
// have already been emitted and wait until there is an online/offline
// transition before re-emitting.
// This may or may not be a good idea.
// Look for a soft start or full volume event
if (cur.num_active >= 1 && !prev.num_active && cur.num_active < cur.num_guns) {
INFO("Soft start detected @", cur.tstamp);
const projectId = await schema2pid(cur._schema ?? prev._schema);
// TODO: Try and grab the corresponding comment from the configuration?
const payload = {
tstamp: cur.tstamp,
remarks: "Soft start",
labels: [ "Daily", "Guns", "Prod" ],
meta: {auto: true, author: `*${this.constructor.name}*`}
};
DEBUG("Posting event", projectId, payload);
await event.post(projectId, payload);
} else if (cur.num_active == cur.num_guns && prev.num_active < cur.num_active) {
INFO("Full volume detected @", cur.tstamp);
const projectId = await schema2pid(cur._schema ?? prev._schema);
// TODO: Try and grab the corresponding comment from the configuration?
const payload = {
tstamp: cur.tstamp,
remarks: "Full volume",
labels: [ "Daily", "Guns", "Prod" ],
meta: {auto: true, author: `*${this.constructor.name}*`}
};
DEBUG("Posting event", projectId, payload);
await event.post(projectId, payload);
}
// Processing of this shot has already been completed.
// The queue can now move forward.
} catch (err) {
ERROR("DetectSoftStart Error")
ERROR(err);
} finally {
cur.isPending = false;
}
}
constructor () {
DEBUG(`${this.author} instantiated`);
}
async run (data) {
async run (data, ctx) {
if (!data || data.channel !== "realtime") {
return;
}
@@ -99,29 +21,59 @@ class DetectSoftStart {
return;
}
const meta = data.payload.new.meta;
if (this.queue.length < DetectSoftStart.MAX_QUEUE_SIZE) {
this.queue.push({
isPending: this.queue.length,
_schema: meta._schema,
tstamp: meta.tstamp ?? meta.time,
shot: meta.shot,
lineStatus: meta.lineStatus,
_sequence: meta._sequence,
_point: meta._point,
lineName: meta.lineName,
num_guns: meta.num_guns,
num_active: meta.num_active
});
} else {
// FIXME Change to alert
ALERT("DetectSoftStart queue full at", this.queue.length);
if (!this.prev) {
DEBUG("Initialising `prev`");
this.prev = data;
return;
}
this.processQueue();
try {
DEBUG("Running");
const cur = data?.payload?.new?.meta;
const prev = this.prev?.payload?.new?.meta;
// DEBUG("%j", prev);
// DEBUG("%j", cur);
DEBUG("cur.num_guns: %d\ncur.num_active: %d\nprv.num_active: %d\ntest passed: %j", cur.num_guns, cur.num_active, prev.num_active, cur.num_active >= 1 && !prev.num_active && cur.num_active < cur.num_guns);
if (cur.num_active >= 1 && !prev.num_active && cur.num_active < cur.num_guns) {
INFO("Soft start detected @", cur.tstamp);
// FIXME Shouldn't need to use schema2pid as pid already present in payload.
const projectId = await schema2pid(cur._schema ?? prev._schema);
// TODO: Try and grab the corresponding comment from the configuration?
const payload = {
tstamp: cur.tstamp,
remarks: "Soft start",
labels: [ "Daily", "Guns", "Prod" ],
meta: {auto: true, author: `*${this.constructor.name}*`}
};
DEBUG("Posting event", projectId, payload);
await event.post(projectId, payload);
} else if (cur.num_active == cur.num_guns && prev.num_active < cur.num_active) {
INFO("Full volume detected @", cur.tstamp);
const projectId = await schema2pid(cur._schema ?? prev._schema);
// TODO: Try and grab the corresponding comment from the configuration?
const payload = {
tstamp: cur.tstamp,
remarks: "Full volume",
labels: [ "Daily", "Guns", "Prod" ],
meta: {auto: true, author: `*${this.constructor.name}*`}
};
DEBUG("Posting event", projectId, payload);
await event.post(projectId, payload);
}
} catch (err) {
DEBUG(`${this.author} error`, err);
throw err;
} finally {
this.prev = data;
}
}
}

View File

@@ -3,130 +3,15 @@ const { event } = require('../../lib/db');
const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
class DetectSOLEOL {
/* Data may come much faster than we can process it, so we put it
* in a queue and process it at our own pace.
*
* The run() method fills the queue with the necessary data and then
* calls processQueue().
*
* The processQueue() method looks takes the first two elements in
* the queue and processes them if they are not already being taken
* care of by a previous processQueue() call this will happen when
* data is coming in faster than it can be processed.
*
* If the processQueue() call is the first to see the two bottommost
* two elements, it will process them and, when finished, it will set
* the `isPending` flag of the bottommost element to `false`, thus
* letting the next call know that it has work to do.
*
* If the queue was empty, run() will set the `isPending` flag of its
* first element to a falsy value, thus bootstrapping the process.
*/
static MAX_QUEUE_SIZE = 125000;
queue = [];
author = `*${this.constructor.name}*`;
prev = null;
async processQueue () {
DEBUG("Queue length", this.queue.length)
while (this.queue.length > 1) {
if (this.queue[0].isPending) {
DEBUG("Queue busy");
setImmediate(() => this.processQueue());
return;
}
const prev = this.queue.shift();
const cur = this.queue[0];
const sequence = Number(cur._sequence);
try {
DEBUG("Sequence", sequence);
// DEBUG("Previous", prev);
// DEBUG("Current", cur);
if (prev.lineName == cur.lineName && prev._sequence == cur._sequence &&
prev.lineStatus != "online" && cur.lineStatus == "online" && sequence) {
INFO("Transition to ONLINE detected");
// DEBUG(cur);
// DEBUG(prev);
// console.log("TRANSITION TO ONLINE", prev, cur);
// Check if there are already FSP, FGSP events for this sequence
const projectId = await schema2pid(cur._schema);
const sequenceEvents = await event.list(projectId, {sequence});
const labels = ["FSP", "FGSP"].filter(l => !sequenceEvents.find(i => i.labels.includes(l)));
if (labels.includes("FSP")) {
// At this point labels contains either FSP only or FSP + FGSP,
// depending on whether a FGSP event has already been entered.
const remarks = `SEQ ${cur._sequence}, SOL ${cur.lineName}, BSP: ${(cur.speed*3.6/1.852).toFixed(1)} kt, Water depth: ${Number(cur.waterDepth).toFixed(0)} m.`;
const payload = {
type: "sequence",
sequence,
point: cur._point,
remarks,
labels,
meta: {auto: true, author: `*${this.constructor.name}*`}
}
// console.log(projectId, payload);
INFO("Posting event", projectId, payload);
await event.post(projectId, payload);
} else {
// A first shot point has been already entered in the log,
// so we have nothing to do here.
INFO("FSP already in the log. Doing nothing");
}
} else if (prev.lineStatus == "online" && cur.lineStatus != "online") {
INFO("Transition to OFFLINE detected");
// DEBUG(cur);
// DEBUG(prev);
// console.log("TRANSITION TO OFFLINE", prev, cur);
// Check if there are already LSP, LGSP events for this sequence
const projectId = await schema2pid(prev._schema);
const sequenceEvents = await event.list(projectId, {sequence});
const labels = ["LSP", "LGSP"].filter(l => !sequenceEvents.find(i => i.labels.includes(l)));
if (labels.includes("LSP")) {
// At this point labels contains either LSP only or LSP + LGSP,
// depending on whether a LGSP event has already been entered.
const remarks = `SEQ ${prev._sequence}, EOL ${prev.lineName}, BSP: ${(prev.speed*3.6/1.852).toFixed(1)} kt, Water depth: ${Number(prev.waterDepth).toFixed(0)} m.`;
const payload = {
type: "sequence",
sequence,
point: prev._point,
remarks,
labels,
meta: {auto: true, author: `*${this.constructor.name}*`}
}
// console.log(projectId, payload);
INFO("Posting event", projectId, payload);
await event.post(projectId, payload);
} else {
// A first shot point has been already entered in the log,
// so we have nothing to do here.
INFO("LSP already in the log. Doing nothing");
}
}
// Processing of this shot has already been completed.
// The queue can now move forward.
} catch (err) {
console.error("DetectSOLEOL Error")
console.log(err);
} finally {
cur.isPending = false;
}
}
constructor () {
DEBUG(`${this.author} instantiated`);
}
async run (data) {
async run (data, ctx) {
if (!data || data.channel !== "realtime") {
return;
}
@@ -135,30 +20,69 @@ class DetectSOLEOL {
return;
}
const meta = data.payload.new.meta;
if (this.queue.length < DetectSOLEOL.MAX_QUEUE_SIZE) {
this.queue.push({
isPending: this.queue.length,
_schema: meta._schema,
time: meta.time,
shot: meta.shot,
lineStatus: meta.lineStatus,
_sequence: meta._sequence,
_point: meta._point,
lineName: meta.lineName,
speed: meta.speed,
waterDepth: meta.waterDepth
});
} else {
// FIXME Change to alert
console.error("DetectSOLEOL queue full at", this.queue.length);
if (!this.prev) {
DEBUG("Initialising `prev`");
this.prev = data;
return;
}
this.processQueue();
try {
DEBUG("Running");
// DEBUG("%j", data);
const cur = data?.payload?.new?.meta;
const prev = this.prev?.payload?.new?.meta;
const sequence = Number(cur._sequence);
// DEBUG("%j", prev);
// DEBUG("%j", cur);
DEBUG("prv.lineName: %s\ncur.lineName: %s\nprv._sequence: %s\ncur._sequence: %s\nprv.lineStatus: %s\ncur.lineStatus: %s", prev.lineName, cur.lineName, prev._sequence, cur._sequence, prev.lineStatus, cur.lineStatus);
if (prev.lineName == cur.lineName && prev._sequence == cur._sequence &&
prev.lineStatus != "online" && cur.lineStatus == "online" && sequence) {
INFO("Transition to ONLINE detected");
// We must use schema2pid because the pid may not have been
// populated for this event.
const projectId = await schema2pid(cur._schema ?? prev._schema);
const labels = ["FSP", "FGSP"];
const remarks = `SEQ ${cur._sequence}, SOL ${cur.lineName}, BSP: ${(cur.speed*3.6/1.852).toFixed(1)} kt, Water depth: ${Number(cur.waterDepth).toFixed(0)} m.`;
const payload = {
type: "sequence",
sequence,
point: cur._point,
remarks,
labels,
meta: {auto: true, author: `*${this.constructor.name}*`}
}
INFO("Posting event", projectId, payload);
await event.post(projectId, payload);
} else if (prev.lineName == cur.lineName && prev._sequence == cur._sequence &&
prev.lineStatus == "online" && cur.lineStatus != "online" && sequence) {
INFO("Transition to OFFLINE detected");
const projectId = await schema2pid(prev._schema ?? cur._schema);
const labels = ["LSP", "LGSP"];
const remarks = `SEQ ${cur._sequence}, EOL ${cur.lineName}, BSP: ${(cur.speed*3.6/1.852).toFixed(1)} kt, Water depth: ${Number(cur.waterDepth).toFixed(0)} m.`;
const payload = {
type: "sequence",
sequence,
point: cur._point,
remarks,
labels,
meta: {auto: true, author: `*${this.constructor.name}*`}
}
INFO("Posting event", projectId, payload);
await event.post(projectId, payload);
}
} catch (err) {
DEBUG(`${this.author} error`, err);
throw err;
} finally {
this.prev = data;
}
}
}
module.exports = DetectSOLEOL;

View File

@@ -1,15 +1,44 @@
const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
const Handlers = [
require('./detect-project-configuration-change'),
require('./detect-soleol'),
require('./detect-soft-start'),
require('./report-line-change-time'),
require('./detect-fdsp')
];
function init () {
return Handlers.map(Handler => new Handler());
function init (ctx) {
const instances = Handlers.map(Handler => new Handler(ctx));
function prepare (data, ctx) {
const promises = [];
for (let instance of instances) {
const promise = new Promise(async (resolve, reject) => {
try {
DEBUG("Run", instance.author);
const result = await instance.run(data, ctx);
DEBUG("%s result: %O", instance.author, result);
resolve(result);
} catch (err) {
ERROR("%s error:\n%O", instance.author, err);
reject(err);
}
});
promises.push(promise);
}
return promises;
}
function despatch (data, ctx) {
return Promise.allSettled(prepare(data, ctx));
}
return { instances, prepare, despatch };
}
module.exports = {
Handlers,
init
}
};

View File

@@ -1,60 +1,47 @@
const { schema2pid } = require('../../lib/db/connection');
const { event, project } = require('../../lib/db');
const { withinValidity } = require('../../lib/utils/ranges');
const unique = require('../../lib/utils/unique');
const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
class ReportLineChangeTime {
/* Data may come much faster than we can process it, so we put it
* in a queue and process it at our own pace.
*
* The run() method fills the queue with the necessary data and then
* calls processQueue().
*
* The processQueue() method looks takes the first two elements in
* the queue and processes them if they are not already being taken
* care of by a previous processQueue() call this will happen when
* data is coming in faster than it can be processed.
*
* If the processQueue() call is the first to see the two bottommost
* two elements, it will process them and, when finished, it will set
* the `isPending` flag of the bottommost element to `false`, thus
* letting the next call know that it has work to do.
*
* If the queue was empty, run() will set the `isPending` flag of its
* first element to a falsy value, thus bootstrapping the process.
*/
static MAX_QUEUE_SIZE = 125000;
queue = [];
author = `*${this.constructor.name}*`;
async processQueue () {
DEBUG("Queue length", this.queue.length)
while (this.queue.length > 0) {
if (this.queue[0].isPending) {
DEBUG("Queue busy");
setTimeout(() => this.processQueue(), 1000); // We're not in a hurry
return;
}
constructor () {
DEBUG(`${this.author} instantiated`);
}
const cur = this.queue.shift();
const next = this.queue[0];
const projectId = cur.pid;
// Are we being called because of a LGSP or because of a FGSP?
const forward = (cur.old?.labels?.includes("LGSP") || cur.new?.labels?.includes("LGSP"));
async run (data, ctx) {
if (!data || data.channel !== "event") {
return;
}
const n = data.payload.new;
const o = data.payload.old;
if (!(n?.labels) && !(o?.labels)) {
return;
}
if (!n?.labels?.includes("FGSP") && !o?.labels?.includes("FGSP") &&
!n?.labels?.includes("LGSP") && !o?.labels?.includes("LGSP")) {
return;
}
try {
DEBUG("Running");
const cur = data;
const projectId = cur?.payload?.pid;
const forward = (cur?.payload?.old?.labels?.includes("LGSP") || cur?.payload?.new?.labels?.includes("LGSP"));
DEBUG("%j", cur);
if (!projectId) {
throw {message: "No projectID found in event", cur};
return;
}
async function getConfiguration (projectId) {
return await project.configuration.get(projectId);
}
async function getLineChangeTime (data, forward = false) {
if (forward) {
const ospEvents = await event.list(projectId, {label: "FGSP"});
@@ -126,7 +113,7 @@ class ReportLineChangeTime {
const createLineChangeTimeEvents = async (lineChangeTime, data, osp) => {
const events = [];
const cfg = (await project.configuration.get(projectId));
const cfg = ctx?.projects?.configuration?.[projectId] ?? {};
const nlcd = cfg?.production?.nominalLineChangeDuration * 60*1000; // m → ms
DEBUG("nlcd", nlcd);
if (nlcd && lineChangeTime > nlcd) {
@@ -196,108 +183,48 @@ class ReportLineChangeTime {
await event.post(projectId, payload);
}
try {
// DEBUG("Previous", prev);
DEBUG("Current", cur);
DEBUG("Forward search", forward);
// We have these scenarios to consider:
// INSERT:
// `old` will be NULL
// Add event with line change time:
// - match validity with `new`
// - meta.ReportLineChangeTime.link refers to new.uid (or new.id?)
// UPDATE:
// `old` is not NULL
// `new` is not NULL
// Delete previous event from event_log (not event_log_full)
// Add event with line change time:
// - match validity with `new`
// - meta.ReportLineChangeTime.link refers to new.uid (or new.id?)
// DELETE:
// `old` is not NULL
// `new` will be NULL
// Delete previous event from event_log (not event_log_full)
await deleteStaleEvents([cur.old?.sequence, cur.new?.sequence]);
await deleteStaleEvents([cur.old?.sequence, cur.new?.sequence]);
if (cur?.payload?.operation == "INSERT") {
// NOTE: UPDATE on the event_log view translates to one UPDATE plus one INSERT
// on event_log_full, so we don't need to worry about UPDATE here.
const data = n;
DEBUG("INSERT seen: will add lct events related to ", data.id);
if (cur.operation == "INSERT") {
// NOTE: UPDATE on the event_log view translates to one UPDATE plus one INSERT
// on event_log_full, so we don't need to worry about UPDATE here.
const data = cur.new;
DEBUG("INSERT seen: will add lct events related to ", data.id);
if (withinValidity(data.validity)) {
DEBUG("Event within validity period", data.validity, new Date());
if (withinValidity(data.validity)) {
DEBUG("Event within validity period", data.validity, new Date());
data.tstamp = new Date(data.tstamp);
const { lineChangeTime, osp } = await getLineChangeTime(data, forward);
data.tstamp = new Date(data.tstamp);
const { lineChangeTime, osp } = await getLineChangeTime(data, forward);
if (lineChangeTime) {
if (lineChangeTime) {
const events = await createLineChangeTimeEvents(lineChangeTime, data, osp);
const events = await createLineChangeTimeEvents(lineChangeTime, data, osp);
if (events?.length) {
DEBUG("Deleting other events for sequence", events[0].sequence);
await deleteStaleEvents(events[0].sequence);
}
for (let payload of events) {
await maybePostEvent(projectId, payload);
}
if (events?.length) {
DEBUG("Deleting other events for sequence", events[0].sequence);
await deleteStaleEvents(events[0].sequence);
}
for (let payload of events) {
await maybePostEvent(projectId, payload);
}
} else {
DEBUG("Event outside of validity range", data.validity, "lct events not inserted");
}
} else {
DEBUG("Event outside of validity range", data.validity, "lct events not inserted");
}
// Processing of this shot has already been completed.
// The queue can now move forward.
} catch (err) {
ERROR("ReportLineChangeTime Error")
ERROR(err);
} finally {
if (next) {
next.isPending = false;
}
}
}
}
async run (data) {
DEBUG("Seen", data);
if (!data || data.channel !== "event") {
return;
} catch (err) {
ERROR(`${this.author} error`, err);
throw err;
}
if (!(data.payload?.new?.labels) && !(data.payload?.old?.labels)) {
return;
}
const n = data.payload.new;
const o = data.payload.old;
if (!n?.labels?.includes("FGSP") && !o?.labels?.includes("FGSP") &&
!n?.labels?.includes("LGSP") && !o?.labels?.includes("LGSP")) {
return;
}
if (this.queue.length < ReportLineChangeTime.MAX_QUEUE_SIZE) {
const item = {
...structuredClone(data.payload),
isPending: this.queue.length,
};
DEBUG("Queueing", item);
this.queue.push(item);
} else {
ALERT("ReportLineChangeTime queue full at", this.queue.length);
}
this.processQueue();
}
}

View File

@@ -1,23 +1,25 @@
const { listen } = require('../lib/db/notify');
const channels = require('../lib/db/channels');
const handlers = require('./handlers').init();
const handlers = require('./handlers');
const { ActionsQueue } = require('../lib/queue');
const { ERROR, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
function start () {
listen(channels, async function (data) {
const queue = new ActionsQueue();
const ctx = {}; // Context object
const { prepare, despatch } = handlers.init(ctx);
listen(channels, function (data) {
DEBUG("Incoming data", data);
for (const handler of handlers) {
// NOTE: We are intentionally passing the same instance
// of the data to every handler. This means that earlier
// handlers could, in principle, modify the data to be
// consumed by latter ones, provided that they are
// synchronous (as otherwise, the completion order is
// undefined).
await handler.run(data);
}
// We don't bother awaiting
queue.enqueue(() => despatch(data, ctx));
DEBUG("Queue size", queue.length());
});
INFO("Events manager started.", handlers.length, "active handlers");
INFO("Events manager started");
}
module.exports = { start }

View File

@@ -8,23 +8,55 @@ async function main () {
INFO("Running version", await version.describe());
version.compatible()
.then( (versions) => {
const api = require('./api');
const ws = require('./ws');
try {
const api = require('./api');
const ws = require('./ws');
const periodicTasks = require('./periodic-tasks').init();
const { fork } = require('child_process');
const { fork } = require('child_process');
const port = process.env.HTTP_PORT || 3000;
const host = process.env.HTTP_HOST || "127.0.0.1";
const path = process.env.HTTP_PATH ?? "/api";
const server = api.start(port, host, path);
ws.start(server);
const port = process.env.HTTP_PORT || 3000;
const host = process.env.HTTP_HOST || "127.0.0.1";
const path = process.env.HTTP_PATH ?? "/api";
const server = api.start(port, host, path);
ws.start(server);
const eventManagerPath = [__dirname, "events"].join("/");
const eventManager = fork(eventManagerPath, /*{ stdio: 'ignore' }*/);
INFO("Versions:", versions);
INFO("Versions:", versions);
periodicTasks.start();
process.on('exit', () => eventManager.kill());
const eventManagerPath = [__dirname, "events"].join("/");
const eventManager = fork(eventManagerPath, /*{ stdio: 'ignore' }*/);
process.on("SIGINT", async () => {
DEBUG("Interrupted (SIGINT)");
eventManager.kill()
await periodicTasks.cleanup();
process.exit(0);
})
process.on("SIGHUP", async () => {
DEBUG("Stopping (SIGHUP)");
eventManager.kill()
await periodicTasks.cleanup();
process.exit(0);
})
process.on('beforeExit', async () => {
DEBUG("Preparing to exit");
eventManager.kill()
await periodicTasks.cleanup();
});
process.on('exit', async () => {
DEBUG("Exiting");
// eventManager.kill()
// periodicTasks.cleanup();
});
} catch (err) {
ERROR(err);
process.exit(2);
}
})
.catch( ({current, wanted, component}) => {
console.error(`Fatal error: incompatible ${component} version ${current} (wanted: ${wanted})`);

View File

@@ -1,17 +1,43 @@
// FIXME This code is in painful need of refactoring
const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
const { setSurvey, transaction, pool } = require('../connection');
const { listen } = require('../notify');
const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
let last_tstamp = 0;
async function getAllProjectConfigs () {
const client = await pool.connect();
let project_configs, listener;
const text = `SELECT schema, meta AS data FROM projects;`;
const res = await client.query(text);
client.release();
return res.rows;
async function getAllProjectConfigs () {
async function getFromDatabase () {
DEBUG("Getting project configurations");
const client = await pool.connect();
try {
const text = `
SELECT schema, meta AS data
FROM projects
WHERE (meta->>'archived')::boolean IS NOT true;
`;
const res = await client.query(text);
project_configs = res.rows;
DEBUG("Have configurations for projects", project_configs.map(i => i.data.id));
} catch (err) {
ERROR(err);
} finally {
client.release();
}
return project_configs;
}
if (project_configs) {
return project_configs;
} else {
listener = await listen(["project"], getFromDatabase);
DEBUG("Added project configuration change listener");
return await getFromDatabase();
}
}
async function getNearestPreplot (candidates) {
@@ -237,7 +263,7 @@ async function getCandidates (navData) {
});
return obj;
}).filter(c => !!c);
DEBUG("Candidates: %j", candidates.map(c => c.schema));
// DEBUG("Candidates: %j", candidates.map(c => c.schema));
return candidates;
}
@@ -269,7 +295,7 @@ async function save (navData, opts = {}) {
// Only one candidate, associate with it
// console.log("Save into schema", candidates[0].match.schema);
await saveOnline(candidates);
navData.payload._schema = candidates[0].match.schema;
navData.payload._schema = candidates[0].schema;
} else {
// More than one candidate, go for the closest. If more than one active
// project with the same preplots, highest numbered schema.
@@ -309,6 +335,7 @@ async function save (navData, opts = {}) {
}
await saveOffline(navData, opts);
DEBUG("Saved");
}
module.exports = save;

View File

@@ -1,5 +1,43 @@
const { makeSubscriber } = require('./connection');
const { makeSubscriber, pool } = require('./connection');
const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
async function purge () {
DEBUG("Purging old notifications");
const client = await pool.connect();
try {
await client.query("CALL purge_notifications();");
} catch (err) {
ERROR(err);
} finally {
client.release();
}
}
async function fullPayload (payload) {
if (!payload.payload_id) {
return payload;
} else {
let client, res;
try {
client = await pool.connect();
const text = `SELECT payload FROM notify_payloads WHERE id = $1;`;
const values = [ payload.payload_id ];
res = await client.query(text, values);
res = res?.rows[0]?.payload;
DEBUG(`Oversize notification payload retrieved with id ${payload.payload_id} and size ${res.length}`);
// DEBUG(res);
res = JSON.parse(res);
} catch (err) {
ERROR(err);
} finally {
if (client) {
client.release();
}
}
return res;
}
}
async function listen (addChannels, callback) {
@@ -18,11 +56,11 @@ async function listen (addChannels, callback) {
for (const channel of addChannels) {
await client.listenTo(channel);
client.notifications.on(channel, (payload) => {
client.notifications.on(channel, async (payload) => {
const data = {
channel,
_received: new Date(),
payload
payload: await fullPayload(payload)
};
callback(data);
});
@@ -32,5 +70,6 @@ async function listen (addChannels, callback) {
}
module.exports = {
listen
listen,
purge
};

View File

@@ -0,0 +1,52 @@
const Queue = require('./queue');
// Inspired by:
// https://stackoverflow.com/questions/53540348/js-async-await-tasks-queue#53540586
class ActionsQueue extends Queue {
constructor (items = []) {
super(items);
this.pending = false;
}
enqueue (action) {
return new Promise ((resolve, reject) => {
super.enqueue({ action, resolve, reject });
this.dequeue();
});
}
async dequeue () {
if (this.pending) {
return false;
}
const item = super.dequeue();
if (!item) {
return false;
}
try {
this.pending = true;
const result = await item.action(this);
this.pending = false;
item.resolve(result);
} catch (err) {
this.pending = false;
item.reject(err);
} finally {
this.dequeue();
}
}
}
module.exports = ActionsQueue;

View File

@@ -0,0 +1,6 @@
module.exports = {
Queue: require('./queue'),
ActionsQueue: require('./actions-queue')
};

View File

@@ -0,0 +1,22 @@
class Queue {
constructor (items = []) {
this.items = items;
}
enqueue (item) {
this.items.push(item);
}
dequeue () {
return this.items.shift();
}
length () {
return this.items.length;
}
}
module.exports = Queue;

View File

@@ -0,0 +1,38 @@
const tasks = require('./tasks');
const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
function init () {
const iids = [];
function start () {
INFO("Initialising %d periodic tasks", tasks.length);
for (let t of tasks) {
const iid = setInterval(t.task, t.timeout);
iids.push(iid);
}
return iids;
};
function stop () {
INFO("Stopping %d periodic tasks", iids.length);
for (let iid of iids) {
clearInterval(iid);
}
}
async function cleanup () {
stop();
DEBUG("Cleaning up %d periodic tasks", tasks.length);
for (let t of tasks) {
if (t.cleanup) {
await t.cleanup();
}
}
}
return { start, stop, cleanup, iids };
}
module.exports = {
init
};

View File

@@ -0,0 +1,4 @@
module.exports = [
require('./purge-notifications')
];

View File

@@ -0,0 +1,20 @@
const { purge } = require('../../lib/db/notify');
const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
const timeout = 120*1000; // 2 minutes
function task () {
DEBUG("Running task");
purge();
}
async function cleanup () {
DEBUG("Running cleanup");
await purge();
}
module.exports = {
task,
timeout,
cleanup
};

View File

@@ -95,7 +95,8 @@ for (const header of (cfg._("global.navigation.headers") || []).filter(h => h.ty
const server = dgram.createSocket('udp4');
server.on('error', (err) => {
console.error(`server error:\n${err.stack}`);
ERROR(err);
// console.error(`server error:\n${err.stack}`);
maybeSendError(err, {title: "UDP listener error on port "+header.port});
// server.close();
});