Compare commits

..

17 Commits

Author SHA1 Message Date
D. Berge
6a21ddd1cd Rewrite events listener and handlers.
The events listener now uses a proper self-consuming queue and
the event handlers have been rewritten accordingly.

The way this works is that running init() on the handlers
library instantiates the handlers and returns two higher-order
functions, prepare() and despatch(). A call to the latter of
these is appended to the queue with each new incoming event.

The handlers have access to a context object (ctx) which may be
used to persist data between calls and/or exchange data between
handlers. This is used notably to give the handlers access to
project configurations, which are themselves refreshed by a
project configuration change handler (DetectProjectConfigurationChange).
2023-10-14 20:53:42 +02:00
D. Berge
c1e35b2459 Cache project configuration details.
This avoids requesting the project configurations on every single
incoming message. A listener refreshes the data on configuration
changes.
2023-10-14 20:11:18 +02:00
D. Berge
eee2a96029 Modify logging statements 2023-10-14 20:10:46 +02:00
D. Berge
6f5e5a4d20 Fix bug for shortcut when there is only one candidate project 2023-10-14 20:09:07 +02:00
D. Berge
9e73cb7e00 Clean up on SIGINT, SIGHUP signals 2023-10-14 20:07:19 +02:00
D. Berge
d7ab4eec7c Run some tasks periodically from the main process.
This reduces reliance on crontab jobs.
2023-10-14 20:06:38 +02:00
D. Berge
cdd96a4bc7 Don't bother trying to kill the child process on exit.
As the exit signal handler does not allow asynchronous tasks and
besides, killing the parent should kill its children too.
2023-10-14 20:02:54 +02:00
D. Berge
39a21766b6 Exit on start up errors 2023-10-14 20:02:04 +02:00
D. Berge
0e33c18b5c Replace console.log() with debug library calls 2023-10-14 19:57:57 +02:00
D. Berge
7f411ac7dd Add queue libraries.
A basic queue implementation and one that consumes its items
automatically until empty.
2023-10-14 19:56:56 +02:00
D. Berge
ed1da11c9d Add helper function to purge notifications 2023-10-14 19:54:34 +02:00
D. Berge
66ec28dd83 Refactor DB notifications listener to support large payloads.
The listener will automatically retrieve the full payload
before passing it on to event handlers.
2023-10-14 18:33:41 +02:00
D. Berge
b928d96774 Add database upgrade file 30. 2023-10-14 18:29:28 +02:00
D. Berge
73335f9c1e Merge branch '136-add-line-change-time-log-pseudoevent' into 'devel'
Resolve "Add line change time log pseudoevent"

Closes #136

See merge request wgp/dougal/software!45
2023-10-04 12:50:49 +00:00
D. Berge
7b6b81dbc5 Add more debugging statements 2023-10-04 14:50:12 +02:00
D. Berge
2e11c574c2 Throw rather than return.
Otherwise the finally {} block won't run.
2023-10-04 14:49:35 +02:00
D. Berge
d07565807c Do not retry immediately 2023-10-04 14:49:09 +02:00
18 changed files with 783 additions and 527 deletions

View File

@@ -0,0 +1,164 @@
-- Support notification payloads larger than Postgres' NOTIFY limit.
--
-- New schema version: 0.4.3
--
-- ATTENTION:
--
-- ENSURE YOU HAVE BACKED UP THE DATABASE BEFORE RUNNING THIS SCRIPT.
--
--
-- NOTE: This upgrade affects the public schema only.
-- NOTE: Each application starts a transaction, which must be committed
-- or rolled back.
--
-- This creates a new table where large notification payloads are stored
-- temporarily and from which they might be recalled by the notification
-- listeners. It also creates a purge_notifications() procedure used to
-- clean up old notifications from the notifications log and finally,
-- modifies notify() to support these changes. When a large payload is
-- encountered, the payload is stored in the notify_payloads table and
-- a trimmed down version containing a notification_id is sent to listeners
-- instead. Listeners can then query notify_payloads to retrieve the full
-- payloads. It is the application layer's responsibility to delete old
-- notifications.
--
-- To apply, run as the dougal user:
--
-- psql <<EOF
-- \i $THIS_FILE
-- COMMIT;
-- EOF
--
-- NOTE: It can be applied multiple times without ill effect.
--
BEGIN;
CREATE OR REPLACE PROCEDURE pg_temp.show_notice (notice text) AS $$
BEGIN
RAISE NOTICE '%', notice;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE PROCEDURE pg_temp.upgrade_schema () AS $outer$
BEGIN
RAISE NOTICE 'Updating public schema';
-- We need to set the search path because some of the trigger
-- functions reference other tables in survey schemas assuming
-- they are in the search path.
EXECUTE format('SET search_path TO public');
CREATE TABLE IF NOT EXISTS public.notify_payloads (
id SERIAL,
tstamp timestamptz NOT NULL DEFAULT CURRENT_TIMESTAMP,
payload text NOT NULL DEFAULT '',
PRIMARY KEY (id)
);
CREATE INDEX IF NOT EXISTS notify_payload_tstamp ON notify_payloads (tstamp);
CREATE OR REPLACE FUNCTION public.notify() RETURNS trigger
LANGUAGE plpgsql
AS $$
DECLARE
channel text := TG_ARGV[0];
pid text;
payload text;
notification text;
payload_id integer;
BEGIN
SELECT projects.pid INTO pid FROM projects WHERE schema = TG_TABLE_SCHEMA;
payload := json_build_object(
'tstamp', CURRENT_TIMESTAMP,
'operation', TG_OP,
'schema', TG_TABLE_SCHEMA,
'table', TG_TABLE_NAME,
'old', row_to_json(OLD),
'new', row_to_json(NEW),
'pid', pid
)::text;
IF octet_length(payload) < 1000 THEN
PERFORM pg_notify(channel, payload);
ELSE
-- We need to find another solution
-- FIXME Consider storing the payload in a temporary memory table,
-- referenced by some form of autogenerated ID. Then send the ID
-- as the payload and then it's up to the user to fetch the original
-- payload if interested. This needs a mechanism to expire older payloads
-- in the interest of conserving memory.
INSERT INTO notify_payloads (payload) VALUES (payload) RETURNING id INTO payload_id;
notification := json_build_object(
'tstamp', CURRENT_TIMESTAMP,
'operation', TG_OP,
'schema', TG_TABLE_SCHEMA,
'table', TG_TABLE_NAME,
'pid', pid,
'payload_id', payload_id
)::text;
PERFORM pg_notify(channel, notification);
RAISE INFO 'Payload over limit';
END IF;
RETURN NULL;
END;
$$;
CREATE PROCEDURE public.purge_notifications (age_seconds numeric DEFAULT 120) AS $$
DELETE FROM notify_payloads WHERE EXTRACT(epoch FROM CURRENT_TIMESTAMP - tstamp) > age_seconds;
$$ LANGUAGE sql;
END;
$outer$ LANGUAGE plpgsql;
CREATE OR REPLACE PROCEDURE pg_temp.upgrade () AS $outer$
DECLARE
row RECORD;
current_db_version TEXT;
BEGIN
SELECT value->>'db_schema' INTO current_db_version FROM public.info WHERE key = 'version';
IF current_db_version >= '0.4.3' THEN
RAISE EXCEPTION
USING MESSAGE='Patch already applied';
END IF;
IF current_db_version != '0.4.2' THEN
RAISE EXCEPTION
USING MESSAGE='Invalid database version: ' || current_db_version,
HINT='Ensure all previous patches have been applied.';
END IF;
-- This upgrade modified the `public` schema only, not individual
-- project schemas.
CALL pg_temp.upgrade_schema();
END;
$outer$ LANGUAGE plpgsql;
CALL pg_temp.upgrade();
CALL pg_temp.show_notice('Cleaning up');
DROP PROCEDURE pg_temp.upgrade_schema ();
DROP PROCEDURE pg_temp.upgrade ();
CALL pg_temp.show_notice('Updating db_schema version');
INSERT INTO public.info VALUES ('version', '{"db_schema": "0.4.3"}')
ON CONFLICT (key) DO UPDATE
SET value = public.info.value || '{"db_schema": "0.4.3"}' WHERE public.info.key = 'version';
CALL pg_temp.show_notice('All done. You may now run "COMMIT;" to persist the changes');
DROP PROCEDURE pg_temp.show_notice (notice text);
--
--NOTE Run `COMMIT;` now if all went well
--

View File

@@ -9,64 +9,51 @@ const { ALERT, ERROR, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
* the last shot and first shot of the previous and current dates, respectively.
*/
class DetectFDSP {
/* Data may come much faster than we can process it, so we put it
* in a queue and process it at our own pace.
*
* The run() method fills the queue with the necessary data and then
* calls processQueue().
*
* The processQueue() method looks at the first two elements in
* the queue and processes them if they are not already being taken
* care of by a previous processQueue() call this will happen when
* data is coming in faster than it can be processed.
*
* If the processQueue() call is the first to see the two bottommost
* two elements, it will process them and, when finished, it will set
* the `isPending` flag of the bottommost element to `false`, thus
* letting the next call know that it has work to do.
*
* If the queue was empty, run() will set the `isPending` flag of its
* first element to a falsy value, thus bootstrapping the process.
*/
static MAX_QUEUE_SIZE = 125000;
queue = [];
author = `*${this.constructor.name}*`;
prev = null;
async processQueue () {
DEBUG("Queue length", this.queue.length)
while (this.queue.length > 1) {
constructor () {
DEBUG(`${this.author} instantiated`);
}
if (this.queue[0].isPending) {
setImmediate(() => this.processQueue());
async run (data, ctx) {
if (!data || data.channel !== "realtime") {
return;
}
const prev = this.queue.shift();
const cur = this.queue[0];
if (!(data.payload && data.payload.new && data.payload.new.meta)) {
return;
}
const sequence = Number(cur._sequence);
if (!this.prev) {
DEBUG("Initialising `prev`");
this.prev = data;
return;
}
try {
DEBUG("Running");
const cur = data;
const sequence = Number(cur._sequence);
if (prev.lineName == cur.lineName && prev._sequence == cur._sequence &&
prev.lineStatus == "online" && cur.lineStatus == "online" && sequence) {
if (this.prev.lineName == cur.lineName && this.prev._sequence == cur._sequence &&
this.prev.lineStatus == "online" && cur.lineStatus == "online" && sequence) {
// DEBUG("Previous", prev);
// DEBUG("Current", cur);
if (prev.time.substr(0, 10) != cur.time.substr(0, 10)) {
// Possible a date change, but could also be a missing timestamp
if (this.prev.time.substr(0, 10) != cur.time.substr(0, 10)) {
// Possibly a date change, but could also be a missing timestamp
// or something else.
const ts0 = new Date(prev.time)
const ts0 = new Date(this.prev.time)
const ts1 = new Date(cur.time);
if (!isNaN(ts0) && !isNaN(ts1) && ts0.getUTCDay() != ts1.getUTCDay()) {
INFO("Sequence shot across midnight UTC detected", cur._sequence, cur.lineName);
const ldsp = {
sequence: prev._sequence,
point: prev._point,
sequence: this.prev._sequence,
point: this.prev._point,
remarks: "Last shotpoint of the day",
labels: ["LDSP", "Prod"],
meta: {auto: true, author: `*${this.constructor.name}*`}
@@ -83,61 +70,28 @@ class DetectFDSP {
INFO("LDSP", ldsp);
INFO("FDSP", fdsp);
const projectId = await schema2pid(prev._schema);
const projectId = await schema2pid(this.prev._schema);
if (projectId) {
await event.post(projectId, ldsp);
await event.post(projectId, fdsp);
} else {
ERROR("projectId not found for", prev._schema);
ERROR("projectId not found for", this.prev._schema);
}
} else {
WARNING("False positive on these timestamps", prev.time, cur.time);
WARNING("False positive on these timestamps", this.prev.time, cur.time);
WARNING("No events were created");
}
}
}
// Processing of this shot has already been completed.
// The queue can now move forward.
} catch (err) {
ERROR(err);
DEBUG(`${this.author} error`, err);
throw err;
} finally {
cur.isPending = false;
this.prev = data;
}
}
}
async run (data) {
if (!data || data.channel !== "realtime") {
return;
}
if (!(data.payload && data.payload.new && data.payload.new.meta)) {
return;
}
const meta = data.payload.new.meta;
if (this.queue.length < DetectFDSP.MAX_QUEUE_SIZE) {
const event = {
isPending: this.queue.length,
_schema: meta._schema,
time: meta.time,
lineStatus: meta.lineStatus,
_sequence: meta._sequence,
_point: meta._point,
lineName: meta.lineName
};
this.queue.push(event);
// DEBUG("EVENT", event);
} else {
ALERT("Queue full at", this.queue.length);
}
this.processQueue();
}
}
module.exports = DetectFDSP;

View File

@@ -0,0 +1,60 @@
const project = require('../../lib/db/project');
const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
class DetectProjectConfigurationChange {
author = `*${this.constructor.name}*`;
constructor (ctx) {
DEBUG(`${this.author} instantiated`);
// Grab project configurations.
// NOTE that this will run asynchronously
this.run({channel: "project"}, ctx);
}
async run (data, ctx) {
if (!data || data.channel !== "project") {
return;
}
// Project notifications, as of this writing, most likely
// do not carry payloads as those exceed the notification
// size limit.
// For our purposes, we do not care as we just re-read all
// the configurations for all non-archived projects.
try {
DEBUG("Project configuration change detected")
const projects = await project.get();
const _ctx_data = {};
for (let pid of projects.map(i => i.pid)) {
DEBUG("Retrieving configuration for", pid);
const cfg = await project.configuration.get(pid);
if (cfg?.archived === true) {
DEBUG(pid, "is archived. Ignoring");
continue;
}
DEBUG("Saving configuration for", pid);
_ctx_data[pid] = cfg;
}
if (! ("projects" in ctx)) {
ctx.projects = {};
}
ctx.projects.configuration = _ctx_data;
DEBUG("Committed project configuration to ctx.projects.configuration");
} catch (err) {
DEBUG(`${this.author} error`, err);
throw err;
}
}
}
module.exports = DetectProjectConfigurationChange;

View File

@@ -3,55 +3,43 @@ const { event } = require('../../lib/db');
const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
class DetectSoftStart {
/* Data may come much faster than we can process it, so we put it
* in a queue and process it at our own pace.
*
* The run() method fills the queue with the necessary data and then
* calls processQueue().
*
* The processQueue() method looks takes the first two elements in
* the queue and processes them if they are not already being taken
* care of by a previous processQueue() call this will happen when
* data is coming in faster than it can be processed.
*
* If the processQueue() call is the first to see the two bottommost
* two elements, it will process them and, when finished, it will set
* the `isPending` flag of the bottommost element to `false`, thus
* letting the next call know that it has work to do.
*
* If the queue was empty, run() will set the `isPending` flag of its
* first element to a falsy value, thus bootstrapping the process.
*/
static MAX_QUEUE_SIZE = 125000;
queue = [];
author = `*${this.constructor.name}*`;
prev = null;
async processQueue () {
DEBUG("Queue length", this.queue.length)
while (this.queue.length > 1) {
if (this.queue[0].isPending) {
DEBUG("Queue busy");
setImmediate(() => this.processQueue());
constructor () {
DEBUG(`${this.author} instantiated`);
}
async run (data, ctx) {
if (!data || data.channel !== "realtime") {
return;
}
const prev = this.queue.shift();
const cur = this.queue[0];
if (!(data.payload && data.payload.new && data.payload.new.meta)) {
return;
}
if (!this.prev) {
DEBUG("Initialising `prev`");
this.prev = data;
return;
}
try {
// DEBUG("Previous", prev);
// DEBUG("Current", cur);
DEBUG("Running");
const cur = data?.payload?.new?.meta;
const prev = this.prev?.payload?.new?.meta;
// DEBUG("%j", prev);
// DEBUG("%j", cur);
DEBUG("cur.num_guns: %d\ncur.num_active: %d\nprv.num_active: %d\ntest passed: %j", cur.num_guns, cur.num_active, prev.num_active, cur.num_active >= 1 && !prev.num_active && cur.num_active < cur.num_guns);
// TODO:
// Consider whether to remember if soft start / full volume events
// have already been emitted and wait until there is an online/offline
// transition before re-emitting.
// This may or may not be a good idea.
// Look for a soft start or full volume event
if (cur.num_active >= 1 && !prev.num_active && cur.num_active < cur.num_guns) {
INFO("Soft start detected @", cur.tstamp);
// FIXME Shouldn't need to use schema2pid as pid already present in payload.
const projectId = await schema2pid(cur._schema ?? prev._schema);
// TODO: Try and grab the corresponding comment from the configuration?
@@ -79,50 +67,14 @@ class DetectSoftStart {
DEBUG("Posting event", projectId, payload);
await event.post(projectId, payload);
}
// Processing of this shot has already been completed.
// The queue can now move forward.
} catch (err) {
ERROR("DetectSoftStart Error")
ERROR(err);
DEBUG(`${this.author} error`, err);
throw err;
} finally {
cur.isPending = false;
this.prev = data;
}
}
}
async run (data) {
if (!data || data.channel !== "realtime") {
return;
}
if (!(data.payload && data.payload.new && data.payload.new.meta)) {
return;
}
const meta = data.payload.new.meta;
if (this.queue.length < DetectSoftStart.MAX_QUEUE_SIZE) {
this.queue.push({
isPending: this.queue.length,
_schema: meta._schema,
tstamp: meta.tstamp ?? meta.time,
shot: meta.shot,
lineStatus: meta.lineStatus,
_sequence: meta._sequence,
_point: meta._point,
lineName: meta.lineName,
num_guns: meta.num_guns,
num_active: meta.num_active
});
} else {
// FIXME Change to alert
ALERT("DetectSoftStart queue full at", this.queue.length);
}
this.processQueue();
}
}
module.exports = DetectSoftStart;

View File

@@ -3,65 +3,48 @@ const { event } = require('../../lib/db');
const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
class DetectSOLEOL {
/* Data may come much faster than we can process it, so we put it
* in a queue and process it at our own pace.
*
* The run() method fills the queue with the necessary data and then
* calls processQueue().
*
* The processQueue() method looks takes the first two elements in
* the queue and processes them if they are not already being taken
* care of by a previous processQueue() call this will happen when
* data is coming in faster than it can be processed.
*
* If the processQueue() call is the first to see the two bottommost
* two elements, it will process them and, when finished, it will set
* the `isPending` flag of the bottommost element to `false`, thus
* letting the next call know that it has work to do.
*
* If the queue was empty, run() will set the `isPending` flag of its
* first element to a falsy value, thus bootstrapping the process.
*/
static MAX_QUEUE_SIZE = 125000;
queue = [];
author = `*${this.constructor.name}*`;
prev = null;
async processQueue () {
DEBUG("Queue length", this.queue.length)
while (this.queue.length > 1) {
if (this.queue[0].isPending) {
DEBUG("Queue busy");
setImmediate(() => this.processQueue());
constructor () {
DEBUG(`${this.author} instantiated`);
}
async run (data, ctx) {
if (!data || data.channel !== "realtime") {
return;
}
const prev = this.queue.shift();
const cur = this.queue[0];
if (!(data.payload && data.payload.new && data.payload.new.meta)) {
return;
}
const sequence = Number(cur._sequence);
if (!this.prev) {
DEBUG("Initialising `prev`");
this.prev = data;
return;
}
try {
DEBUG("Sequence", sequence);
// DEBUG("Previous", prev);
// DEBUG("Current", cur);
DEBUG("Running");
// DEBUG("%j", data);
const cur = data?.payload?.new?.meta;
const prev = this.prev?.payload?.new?.meta;
const sequence = Number(cur._sequence);
// DEBUG("%j", prev);
// DEBUG("%j", cur);
DEBUG("prv.lineName: %s\ncur.lineName: %s\nprv._sequence: %s\ncur._sequence: %s\nprv.lineStatus: %s\ncur.lineStatus: %s", prev.lineName, cur.lineName, prev._sequence, cur._sequence, prev.lineStatus, cur.lineStatus);
if (prev.lineName == cur.lineName && prev._sequence == cur._sequence &&
prev.lineStatus != "online" && cur.lineStatus == "online" && sequence) {
INFO("Transition to ONLINE detected");
// DEBUG(cur);
// DEBUG(prev);
// console.log("TRANSITION TO ONLINE", prev, cur);
// Check if there are already FSP, FGSP events for this sequence
const projectId = await schema2pid(cur._schema);
const sequenceEvents = await event.list(projectId, {sequence});
const labels = ["FSP", "FGSP"].filter(l => !sequenceEvents.find(i => i.labels.includes(l)));
if (labels.includes("FSP")) {
// At this point labels contains either FSP only or FSP + FGSP,
// depending on whether a FGSP event has already been entered.
// We must use schema2pid because the pid may not have been
// populated for this event.
const projectId = await schema2pid(cur._schema ?? prev._schema);
const labels = ["FSP", "FGSP"];
const remarks = `SEQ ${cur._sequence}, SOL ${cur.lineName}, BSP: ${(cur.speed*3.6/1.852).toFixed(1)} kt, Water depth: ${Number(cur.waterDepth).toFixed(0)} m.`;
const payload = {
type: "sequence",
@@ -71,94 +54,35 @@ class DetectSOLEOL {
labels,
meta: {auto: true, author: `*${this.constructor.name}*`}
}
// console.log(projectId, payload);
INFO("Posting event", projectId, payload);
await event.post(projectId, payload);
} else {
// A first shot point has been already entered in the log,
// so we have nothing to do here.
INFO("FSP already in the log. Doing nothing");
}
} else if (prev.lineStatus == "online" && cur.lineStatus != "online") {
} else if (prev.lineName == cur.lineName && prev._sequence == cur._sequence &&
prev.lineStatus == "online" && cur.lineStatus != "online" && sequence) {
INFO("Transition to OFFLINE detected");
// DEBUG(cur);
// DEBUG(prev);
// console.log("TRANSITION TO OFFLINE", prev, cur);
// Check if there are already LSP, LGSP events for this sequence
const projectId = await schema2pid(prev._schema);
const sequenceEvents = await event.list(projectId, {sequence});
const labels = ["LSP", "LGSP"].filter(l => !sequenceEvents.find(i => i.labels.includes(l)));
if (labels.includes("LSP")) {
// At this point labels contains either LSP only or LSP + LGSP,
// depending on whether a LGSP event has already been entered.
const remarks = `SEQ ${prev._sequence}, EOL ${prev.lineName}, BSP: ${(prev.speed*3.6/1.852).toFixed(1)} kt, Water depth: ${Number(prev.waterDepth).toFixed(0)} m.`;
const projectId = await schema2pid(prev._schema ?? cur._schema);
const labels = ["LSP", "LGSP"];
const remarks = `SEQ ${cur._sequence}, EOL ${cur.lineName}, BSP: ${(cur.speed*3.6/1.852).toFixed(1)} kt, Water depth: ${Number(cur.waterDepth).toFixed(0)} m.`;
const payload = {
type: "sequence",
sequence,
point: prev._point,
point: cur._point,
remarks,
labels,
meta: {auto: true, author: `*${this.constructor.name}*`}
}
// console.log(projectId, payload);
INFO("Posting event", projectId, payload);
await event.post(projectId, payload);
} else {
// A first shot point has been already entered in the log,
// so we have nothing to do here.
INFO("LSP already in the log. Doing nothing");
}
}
// Processing of this shot has already been completed.
// The queue can now move forward.
} catch (err) {
console.error("DetectSOLEOL Error")
console.log(err);
DEBUG(`${this.author} error`, err);
throw err;
} finally {
cur.isPending = false;
}
this.prev = data;
}
}
async run (data) {
if (!data || data.channel !== "realtime") {
return;
}
if (!(data.payload && data.payload.new && data.payload.new.meta)) {
return;
}
const meta = data.payload.new.meta;
if (this.queue.length < DetectSOLEOL.MAX_QUEUE_SIZE) {
this.queue.push({
isPending: this.queue.length,
_schema: meta._schema,
time: meta.time,
shot: meta.shot,
lineStatus: meta.lineStatus,
_sequence: meta._sequence,
_point: meta._point,
lineName: meta.lineName,
speed: meta.speed,
waterDepth: meta.waterDepth
});
} else {
// FIXME Change to alert
console.error("DetectSOLEOL queue full at", this.queue.length);
}
this.processQueue();
}
}
module.exports = DetectSOLEOL;

View File

@@ -1,15 +1,44 @@
const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
const Handlers = [
require('./detect-project-configuration-change'),
require('./detect-soleol'),
require('./detect-soft-start'),
require('./report-line-change-time'),
require('./detect-fdsp')
];
function init () {
return Handlers.map(Handler => new Handler());
function init (ctx) {
const instances = Handlers.map(Handler => new Handler(ctx));
function prepare (data, ctx) {
const promises = [];
for (let instance of instances) {
const promise = new Promise(async (resolve, reject) => {
try {
DEBUG("Run", instance.author);
const result = await instance.run(data, ctx);
DEBUG("%s result: %O", instance.author, result);
resolve(result);
} catch (err) {
ERROR("%s error:\n%O", instance.author, err);
reject(err);
}
});
promises.push(promise);
}
return promises;
}
function despatch (data, ctx) {
return Promise.allSettled(prepare(data, ctx));
}
return { instances, prepare, despatch };
}
module.exports = {
Handlers,
init
}
};

View File

@@ -1,60 +1,47 @@
const { schema2pid } = require('../../lib/db/connection');
const { event, project } = require('../../lib/db');
const { withinValidity } = require('../../lib/utils/ranges');
const unique = require('../../lib/utils/unique');
const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
class ReportLineChangeTime {
/* Data may come much faster than we can process it, so we put it
* in a queue and process it at our own pace.
*
* The run() method fills the queue with the necessary data and then
* calls processQueue().
*
* The processQueue() method looks takes the first two elements in
* the queue and processes them if they are not already being taken
* care of by a previous processQueue() call this will happen when
* data is coming in faster than it can be processed.
*
* If the processQueue() call is the first to see the two bottommost
* two elements, it will process them and, when finished, it will set
* the `isPending` flag of the bottommost element to `false`, thus
* letting the next call know that it has work to do.
*
* If the queue was empty, run() will set the `isPending` flag of its
* first element to a falsy value, thus bootstrapping the process.
*/
static MAX_QUEUE_SIZE = 125000;
queue = [];
author = `*${this.constructor.name}*`;
async processQueue () {
DEBUG("Queue length", this.queue.length)
while (this.queue.length > 0) {
if (this.queue[0].isPending) {
DEBUG("Queue busy");
setImmediate(() => this.processQueue());
constructor () {
DEBUG(`${this.author} instantiated`);
}
async run (data, ctx) {
if (!data || data.channel !== "event") {
return;
}
const cur = this.queue.shift();
const next = this.queue[0];
const projectId = cur.pid;
// Are we being called because of a LGSP or because of a FGSP?
const forward = (cur.old?.labels?.includes("LGSP") || cur.new?.labels?.includes("LGSP"));
const n = data.payload.new;
const o = data.payload.old;
if (!(n?.labels) && !(o?.labels)) {
return;
}
if (!n?.labels?.includes("FGSP") && !o?.labels?.includes("FGSP") &&
!n?.labels?.includes("LGSP") && !o?.labels?.includes("LGSP")) {
return;
}
try {
DEBUG("Running");
const cur = data;
const projectId = cur?.payload?.pid;
const forward = (cur?.payload?.old?.labels?.includes("LGSP") || cur?.payload?.new?.labels?.includes("LGSP"));
DEBUG("%j", cur);
if (!projectId) {
WARNING("No projectID found in event", cur);
throw {message: "No projectID found in event", cur};
return;
}
async function getConfiguration (projectId) {
return await project.configuration.get(projectId);
}
async function getLineChangeTime (data, forward = false) {
if (forward) {
const ospEvents = await event.list(projectId, {label: "FGSP"});
@@ -126,7 +113,7 @@ class ReportLineChangeTime {
const createLineChangeTimeEvents = async (lineChangeTime, data, osp) => {
const events = [];
const cfg = (await project.configuration.get(projectId));
const cfg = ctx?.projects?.configuration?.[projectId] ?? {};
const nlcd = cfg?.production?.nominalLineChangeDuration * 60*1000; // m → ms
DEBUG("nlcd", nlcd);
if (nlcd && lineChangeTime > nlcd) {
@@ -196,35 +183,13 @@ class ReportLineChangeTime {
await event.post(projectId, payload);
}
try {
// DEBUG("Previous", prev);
DEBUG("Current", cur);
DEBUG("Forward search", forward);
// We have these scenarios to consider:
// INSERT:
// `old` will be NULL
// Add event with line change time:
// - match validity with `new`
// - meta.ReportLineChangeTime.link refers to new.uid (or new.id?)
// UPDATE:
// `old` is not NULL
// `new` is not NULL
// Delete previous event from event_log (not event_log_full)
// Add event with line change time:
// - match validity with `new`
// - meta.ReportLineChangeTime.link refers to new.uid (or new.id?)
// DELETE:
// `old` is not NULL
// `new` will be NULL
// Delete previous event from event_log (not event_log_full)
await deleteStaleEvents([cur.old?.sequence, cur.new?.sequence]);
if (cur.operation == "INSERT") {
if (cur?.payload?.operation == "INSERT") {
// NOTE: UPDATE on the event_log view translates to one UPDATE plus one INSERT
// on event_log_full, so we don't need to worry about UPDATE here.
const data = cur.new;
const data = n;
DEBUG("INSERT seen: will add lct events related to ", data.id);
if (withinValidity(data.validity)) {
@@ -253,48 +218,13 @@ class ReportLineChangeTime {
}
// Processing of this shot has already been completed.
// The queue can now move forward.
} catch (err) {
ERROR("ReportLineChangeTime Error")
ERROR(err);
} finally {
if (next) {
next.isPending = false;
}
}
}
ERROR(`${this.author} error`, err);
throw err;
}
async run (data) {
if (!data || data.channel !== "event") {
return;
}
if (!(data.payload?.new?.labels) && !(data.payload?.old?.labels)) {
return;
}
const n = data.payload.new;
const o = data.payload.old;
if (!n?.labels?.includes("FGSP") && !o?.labels?.includes("FGSP") &&
!n?.labels?.includes("LGSP") && !o?.labels?.includes("LGSP")) {
return;
}
if (this.queue.length < ReportLineChangeTime.MAX_QUEUE_SIZE) {
this.queue.push({
...data.payload,
isPending: this.queue.length,
});
} else {
ALERT("ReportLineChangeTime queue full at", this.queue.length);
}
this.processQueue();
}
}

View File

@@ -1,23 +1,25 @@
const { listen } = require('../lib/db/notify');
const channels = require('../lib/db/channels');
const handlers = require('./handlers').init();
const handlers = require('./handlers');
const { ActionsQueue } = require('../lib/queue');
const { ERROR, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
function start () {
listen(channels, async function (data) {
const queue = new ActionsQueue();
const ctx = {}; // Context object
const { prepare, despatch } = handlers.init(ctx);
listen(channels, function (data) {
DEBUG("Incoming data", data);
for (const handler of handlers) {
// NOTE: We are intentionally passing the same instance
// of the data to every handler. This means that earlier
// handlers could, in principle, modify the data to be
// consumed by latter ones, provided that they are
// synchronous (as otherwise, the completion order is
// undefined).
await handler.run(data);
}
// We don't bother awaiting
queue.enqueue(() => despatch(data, ctx));
DEBUG("Queue size", queue.length());
});
INFO("Events manager started.", handlers.length, "active handlers");
INFO("Events manager started");
}
module.exports = { start }

View File

@@ -8,8 +8,10 @@ async function main () {
INFO("Running version", await version.describe());
version.compatible()
.then( (versions) => {
try {
const api = require('./api');
const ws = require('./ws');
const periodicTasks = require('./periodic-tasks').init();
const { fork } = require('child_process');
@@ -19,12 +21,42 @@ async function main () {
const server = api.start(port, host, path);
ws.start(server);
INFO("Versions:", versions);
periodicTasks.start();
const eventManagerPath = [__dirname, "events"].join("/");
const eventManager = fork(eventManagerPath, /*{ stdio: 'ignore' }*/);
INFO("Versions:", versions);
process.on("SIGINT", async () => {
DEBUG("Interrupted (SIGINT)");
eventManager.kill()
await periodicTasks.cleanup();
process.exit(0);
})
process.on('exit', () => eventManager.kill());
process.on("SIGHUP", async () => {
DEBUG("Stopping (SIGHUP)");
eventManager.kill()
await periodicTasks.cleanup();
process.exit(0);
})
process.on('beforeExit', async () => {
DEBUG("Preparing to exit");
eventManager.kill()
await periodicTasks.cleanup();
});
process.on('exit', async () => {
DEBUG("Exiting");
// eventManager.kill()
// periodicTasks.cleanup();
});
} catch (err) {
ERROR(err);
process.exit(2);
}
})
.catch( ({current, wanted, component}) => {
console.error(`Fatal error: incompatible ${component} version ${current} (wanted: ${wanted})`);

View File

@@ -1,17 +1,43 @@
// FIXME This code is in painful need of refactoring
const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
const { setSurvey, transaction, pool } = require('../connection');
const { listen } = require('../notify');
const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
let last_tstamp = 0;
let project_configs, listener;
async function getAllProjectConfigs () {
async function getFromDatabase () {
DEBUG("Getting project configurations");
const client = await pool.connect();
const text = `SELECT schema, meta AS data FROM projects;`;
try {
const text = `
SELECT schema, meta AS data
FROM projects
WHERE (meta->>'archived')::boolean IS NOT true;
`;
const res = await client.query(text);
project_configs = res.rows;
DEBUG("Have configurations for projects", project_configs.map(i => i.data.id));
} catch (err) {
ERROR(err);
} finally {
client.release();
return res.rows;
}
return project_configs;
}
if (project_configs) {
return project_configs;
} else {
listener = await listen(["project"], getFromDatabase);
DEBUG("Added project configuration change listener");
return await getFromDatabase();
}
}
async function getNearestPreplot (candidates) {
@@ -237,7 +263,7 @@ async function getCandidates (navData) {
});
return obj;
}).filter(c => !!c);
DEBUG("Candidates: %j", candidates.map(c => c.schema));
// DEBUG("Candidates: %j", candidates.map(c => c.schema));
return candidates;
}
@@ -269,7 +295,7 @@ async function save (navData, opts = {}) {
// Only one candidate, associate with it
// console.log("Save into schema", candidates[0].match.schema);
await saveOnline(candidates);
navData.payload._schema = candidates[0].match.schema;
navData.payload._schema = candidates[0].schema;
} else {
// More than one candidate, go for the closest. If more than one active
// project with the same preplots, highest numbered schema.
@@ -309,6 +335,7 @@ async function save (navData, opts = {}) {
}
await saveOffline(navData, opts);
DEBUG("Saved");
}
module.exports = save;

View File

@@ -1,5 +1,43 @@
const { makeSubscriber } = require('./connection');
const { makeSubscriber, pool } = require('./connection');
const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
async function purge () {
DEBUG("Purging old notifications");
const client = await pool.connect();
try {
await client.query("CALL purge_notifications();");
} catch (err) {
ERROR(err);
} finally {
client.release();
}
}
async function fullPayload (payload) {
if (!payload.payload_id) {
return payload;
} else {
let client, res;
try {
client = await pool.connect();
const text = `SELECT payload FROM notify_payloads WHERE id = $1;`;
const values = [ payload.payload_id ];
res = await client.query(text, values);
res = res?.rows[0]?.payload;
DEBUG(`Oversize notification payload retrieved with id ${payload.payload_id} and size ${res.length}`);
// DEBUG(res);
res = JSON.parse(res);
} catch (err) {
ERROR(err);
} finally {
if (client) {
client.release();
}
}
return res;
}
}
async function listen (addChannels, callback) {
@@ -18,11 +56,11 @@ async function listen (addChannels, callback) {
for (const channel of addChannels) {
await client.listenTo(channel);
client.notifications.on(channel, (payload) => {
client.notifications.on(channel, async (payload) => {
const data = {
channel,
_received: new Date(),
payload
payload: await fullPayload(payload)
};
callback(data);
});
@@ -32,5 +70,6 @@ async function listen (addChannels, callback) {
}
module.exports = {
listen
listen,
purge
};

View File

@@ -0,0 +1,52 @@
const Queue = require('./queue');
// Inspired by:
// https://stackoverflow.com/questions/53540348/js-async-await-tasks-queue#53540586
class ActionsQueue extends Queue {
constructor (items = []) {
super(items);
this.pending = false;
}
enqueue (action) {
return new Promise ((resolve, reject) => {
super.enqueue({ action, resolve, reject });
this.dequeue();
});
}
async dequeue () {
if (this.pending) {
return false;
}
const item = super.dequeue();
if (!item) {
return false;
}
try {
this.pending = true;
const result = await item.action(this);
this.pending = false;
item.resolve(result);
} catch (err) {
this.pending = false;
item.reject(err);
} finally {
this.dequeue();
}
}
}
module.exports = ActionsQueue;

View File

@@ -0,0 +1,6 @@
module.exports = {
Queue: require('./queue'),
ActionsQueue: require('./actions-queue')
};

View File

@@ -0,0 +1,22 @@
class Queue {
constructor (items = []) {
this.items = items;
}
enqueue (item) {
this.items.push(item);
}
dequeue () {
return this.items.shift();
}
length () {
return this.items.length;
}
}
module.exports = Queue;

View File

@@ -0,0 +1,38 @@
const tasks = require('./tasks');
const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
function init () {
const iids = [];
function start () {
INFO("Initialising %d periodic tasks", tasks.length);
for (let t of tasks) {
const iid = setInterval(t.task, t.timeout);
iids.push(iid);
}
return iids;
};
function stop () {
INFO("Stopping %d periodic tasks", iids.length);
for (let iid of iids) {
clearInterval(iid);
}
}
async function cleanup () {
stop();
DEBUG("Cleaning up %d periodic tasks", tasks.length);
for (let t of tasks) {
if (t.cleanup) {
await t.cleanup();
}
}
}
return { start, stop, cleanup, iids };
}
module.exports = {
init
};

View File

@@ -0,0 +1,4 @@
module.exports = [
require('./purge-notifications')
];

View File

@@ -0,0 +1,20 @@
const { purge } = require('../../lib/db/notify');
const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
const timeout = 120*1000; // 2 minutes
function task () {
DEBUG("Running task");
purge();
}
async function cleanup () {
DEBUG("Running cleanup");
await purge();
}
module.exports = {
task,
timeout,
cleanup
};

View File

@@ -95,7 +95,8 @@ for (const header of (cfg._("global.navigation.headers") || []).filter(h => h.ty
const server = dgram.createSocket('udp4');
server.on('error', (err) => {
console.error(`server error:\n${err.stack}`);
ERROR(err);
// console.error(`server error:\n${err.stack}`);
maybeSendError(err, {title: "UDP listener error on port "+header.port});
// server.close();
});