diff --git a/etc/db/upgrades/upgrade30-v0.4.3-large-notification-payloads.sql b/etc/db/upgrades/upgrade30-v0.4.3-large-notification-payloads.sql new file mode 100644 index 0000000..aaa9d15 --- /dev/null +++ b/etc/db/upgrades/upgrade30-v0.4.3-large-notification-payloads.sql @@ -0,0 +1,164 @@ +-- Support notification payloads larger than Postgres' NOTIFY limit. +-- +-- New schema version: 0.4.3 +-- +-- ATTENTION: +-- +-- ENSURE YOU HAVE BACKED UP THE DATABASE BEFORE RUNNING THIS SCRIPT. +-- +-- +-- NOTE: This upgrade affects the public schema only. +-- NOTE: Each application starts a transaction, which must be committed +-- or rolled back. +-- +-- This creates a new table where large notification payloads are stored +-- temporarily and from which they might be recalled by the notification +-- listeners. It also creates a purge_notifications() procedure used to +-- clean up old notifications from the notifications log and finally, +-- modifies notify() to support these changes. When a large payload is +-- encountered, the payload is stored in the notify_payloads table and +-- a trimmed down version containing a notification_id is sent to listeners +-- instead. Listeners can then query notify_payloads to retrieve the full +-- payloads. It is the application layer's responsibility to delete old +-- notifications. +-- +-- To apply, run as the dougal user: +-- +-- psql < age_seconds; + $$ LANGUAGE sql; + + + +END; +$outer$ LANGUAGE plpgsql; + +CREATE OR REPLACE PROCEDURE pg_temp.upgrade () AS $outer$ +DECLARE + row RECORD; + current_db_version TEXT; +BEGIN + + SELECT value->>'db_schema' INTO current_db_version FROM public.info WHERE key = 'version'; + + IF current_db_version >= '0.4.3' THEN + RAISE EXCEPTION + USING MESSAGE='Patch already applied'; + END IF; + + IF current_db_version != '0.4.2' THEN + RAISE EXCEPTION + USING MESSAGE='Invalid database version: ' || current_db_version, + HINT='Ensure all previous patches have been applied.'; + END IF; + + -- This upgrade modified the `public` schema only, not individual + -- project schemas. + CALL pg_temp.upgrade_schema(); + +END; +$outer$ LANGUAGE plpgsql; + +CALL pg_temp.upgrade(); + +CALL pg_temp.show_notice('Cleaning up'); +DROP PROCEDURE pg_temp.upgrade_schema (); +DROP PROCEDURE pg_temp.upgrade (); + +CALL pg_temp.show_notice('Updating db_schema version'); +INSERT INTO public.info VALUES ('version', '{"db_schema": "0.4.3"}') +ON CONFLICT (key) DO UPDATE + SET value = public.info.value || '{"db_schema": "0.4.3"}' WHERE public.info.key = 'version'; + + +CALL pg_temp.show_notice('All done. You may now run "COMMIT;" to persist the changes'); +DROP PROCEDURE pg_temp.show_notice (notice text); + +-- +--NOTE Run `COMMIT;` now if all went well +-- diff --git a/lib/www/server/events/handlers/detect-fdsp.js b/lib/www/server/events/handlers/detect-fdsp.js index 55d05af..1b30644 100644 --- a/lib/www/server/events/handlers/detect-fdsp.js +++ b/lib/www/server/events/handlers/detect-fdsp.js @@ -9,105 +9,16 @@ const { ALERT, ERROR, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename); * the last shot and first shot of the previous and current dates, respectively. */ class DetectFDSP { - /* Data may come much faster than we can process it, so we put it - * in a queue and process it at our own pace. - * - * The run() method fills the queue with the necessary data and then - * calls processQueue(). - * - * The processQueue() method looks at the first two elements in - * the queue and processes them if they are not already being taken - * care of by a previous processQueue() call – this will happen when - * data is coming in faster than it can be processed. - * - * If the processQueue() call is the first to see the two bottommost - * two elements, it will process them and, when finished, it will set - * the `isPending` flag of the bottommost element to `false`, thus - * letting the next call know that it has work to do. - * - * If the queue was empty, run() will set the `isPending` flag of its - * first element to a falsy value, thus bootstrapping the process. - */ - static MAX_QUEUE_SIZE = 125000; - queue = []; + author = `*${this.constructor.name}*`; + prev = null; - async processQueue () { - DEBUG("Queue length", this.queue.length) - while (this.queue.length > 1) { - - if (this.queue[0].isPending) { - setImmediate(() => this.processQueue()); - return; - } - - const prev = this.queue.shift(); - const cur = this.queue[0]; - - const sequence = Number(cur._sequence); - - try { - - if (prev.lineName == cur.lineName && prev._sequence == cur._sequence && - prev.lineStatus == "online" && cur.lineStatus == "online" && sequence) { - -// DEBUG("Previous", prev); -// DEBUG("Current", cur); - - if (prev.time.substr(0, 10) != cur.time.substr(0, 10)) { - // Possible a date change, but could also be a missing timestamp - // or something else. - - const ts0 = new Date(prev.time) - const ts1 = new Date(cur.time); - - if (!isNaN(ts0) && !isNaN(ts1) && ts0.getUTCDay() != ts1.getUTCDay()) { - INFO("Sequence shot across midnight UTC detected", cur._sequence, cur.lineName); - - const ldsp = { - sequence: prev._sequence, - point: prev._point, - remarks: "Last shotpoint of the day", - labels: ["LDSP", "Prod"], - meta: {auto: true, author: `*${this.constructor.name}*`} - }; - - const fdsp = { - sequence: cur._sequence, - point: cur._point, - remarks: "First shotpoint of the day", - labels: ["FDSP", "Prod"], - meta: {auto: true, author: `*${this.constructor.name}*`} - }; - - INFO("LDSP", ldsp); - INFO("FDSP", fdsp); - - const projectId = await schema2pid(prev._schema); - - if (projectId) { - await event.post(projectId, ldsp); - await event.post(projectId, fdsp); - } else { - ERROR("projectId not found for", prev._schema); - } - } else { - WARNING("False positive on these timestamps", prev.time, cur.time); - WARNING("No events were created"); - } - } - } - // Processing of this shot has already been completed. - // The queue can now move forward. - } catch (err) { - ERROR(err); - } finally { - cur.isPending = false; - } - } + constructor () { + DEBUG(`${this.author} instantiated`); } - async run (data) { + async run (data, ctx) { + if (!data || data.channel !== "realtime") { return; } @@ -116,27 +27,70 @@ class DetectFDSP { return; } - const meta = data.payload.new.meta; - - if (this.queue.length < DetectFDSP.MAX_QUEUE_SIZE) { - - const event = { - isPending: this.queue.length, - _schema: meta._schema, - time: meta.time, - lineStatus: meta.lineStatus, - _sequence: meta._sequence, - _point: meta._point, - lineName: meta.lineName - }; - this.queue.push(event); -// DEBUG("EVENT", event); - - } else { - ALERT("Queue full at", this.queue.length); + if (!this.prev) { + DEBUG("Initialising `prev`"); + this.prev = data; + return; } - this.processQueue(); + try { + DEBUG("Running"); + const cur = data; + const sequence = Number(cur._sequence); + + if (this.prev.lineName == cur.lineName && this.prev._sequence == cur._sequence && + this.prev.lineStatus == "online" && cur.lineStatus == "online" && sequence) { + + if (this.prev.time.substr(0, 10) != cur.time.substr(0, 10)) { + // Possibly a date change, but could also be a missing timestamp + // or something else. + + const ts0 = new Date(this.prev.time) + const ts1 = new Date(cur.time); + + if (!isNaN(ts0) && !isNaN(ts1) && ts0.getUTCDay() != ts1.getUTCDay()) { + INFO("Sequence shot across midnight UTC detected", cur._sequence, cur.lineName); + + const ldsp = { + sequence: this.prev._sequence, + point: this.prev._point, + remarks: "Last shotpoint of the day", + labels: ["LDSP", "Prod"], + meta: {auto: true, author: `*${this.constructor.name}*`} + }; + + const fdsp = { + sequence: cur._sequence, + point: cur._point, + remarks: "First shotpoint of the day", + labels: ["FDSP", "Prod"], + meta: {auto: true, author: `*${this.constructor.name}*`} + }; + + INFO("LDSP", ldsp); + INFO("FDSP", fdsp); + + const projectId = await schema2pid(this.prev._schema); + + if (projectId) { + await event.post(projectId, ldsp); + await event.post(projectId, fdsp); + } else { + ERROR("projectId not found for", this.prev._schema); + } + } else { + WARNING("False positive on these timestamps", this.prev.time, cur.time); + WARNING("No events were created"); + } + } + + } + } catch (err) { + DEBUG(`${this.author} error`, err); + throw err; + } finally { + this.prev = data; + } } } diff --git a/lib/www/server/events/handlers/detect-project-configuration-change.js b/lib/www/server/events/handlers/detect-project-configuration-change.js new file mode 100644 index 0000000..0a88b21 --- /dev/null +++ b/lib/www/server/events/handlers/detect-project-configuration-change.js @@ -0,0 +1,60 @@ +const project = require('../../lib/db/project'); +const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename); + +class DetectProjectConfigurationChange { + + author = `*${this.constructor.name}*`; + + constructor (ctx) { + DEBUG(`${this.author} instantiated`); + + // Grab project configurations. + // NOTE that this will run asynchronously + this.run({channel: "project"}, ctx); + } + + async run (data, ctx) { + + if (!data || data.channel !== "project") { + return; + } + + // Project notifications, as of this writing, most likely + // do not carry payloads as those exceed the notification + // size limit. + // For our purposes, we do not care as we just re-read all + // the configurations for all non-archived projects. + + try { + DEBUG("Project configuration change detected") + + const projects = await project.get(); + + const _ctx_data = {}; + for (let pid of projects.map(i => i.pid)) { + DEBUG("Retrieving configuration for", pid); + const cfg = await project.configuration.get(pid); + if (cfg?.archived === true) { + DEBUG(pid, "is archived. Ignoring"); + continue; + } + + DEBUG("Saving configuration for", pid); + _ctx_data[pid] = cfg; + } + + if (! ("projects" in ctx)) { + ctx.projects = {}; + } + + ctx.projects.configuration = _ctx_data; + DEBUG("Committed project configuration to ctx.projects.configuration"); + + } catch (err) { + DEBUG(`${this.author} error`, err); + throw err; + } + } +} + +module.exports = DetectProjectConfigurationChange; diff --git a/lib/www/server/events/handlers/detect-soft-start.js b/lib/www/server/events/handlers/detect-soft-start.js index ea96e6c..6f92eb5 100644 --- a/lib/www/server/events/handlers/detect-soft-start.js +++ b/lib/www/server/events/handlers/detect-soft-start.js @@ -3,94 +3,16 @@ const { event } = require('../../lib/db'); const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename); class DetectSoftStart { - /* Data may come much faster than we can process it, so we put it - * in a queue and process it at our own pace. - * - * The run() method fills the queue with the necessary data and then - * calls processQueue(). - * - * The processQueue() method looks takes the first two elements in - * the queue and processes them if they are not already being taken - * care of by a previous processQueue() call – this will happen when - * data is coming in faster than it can be processed. - * - * If the processQueue() call is the first to see the two bottommost - * two elements, it will process them and, when finished, it will set - * the `isPending` flag of the bottommost element to `false`, thus - * letting the next call know that it has work to do. - * - * If the queue was empty, run() will set the `isPending` flag of its - * first element to a falsy value, thus bootstrapping the process. - */ - static MAX_QUEUE_SIZE = 125000; - queue = []; + author = `*${this.constructor.name}*`; + prev = null; - async processQueue () { - DEBUG("Queue length", this.queue.length) - while (this.queue.length > 1) { - if (this.queue[0].isPending) { - DEBUG("Queue busy"); - setImmediate(() => this.processQueue()); - return; - } - - const prev = this.queue.shift(); - const cur = this.queue[0]; - - try { - // DEBUG("Previous", prev); - // DEBUG("Current", cur); - - // TODO: - // Consider whether to remember if soft start / full volume events - // have already been emitted and wait until there is an online/offline - // transition before re-emitting. - // This may or may not be a good idea. - - // Look for a soft start or full volume event - if (cur.num_active >= 1 && !prev.num_active && cur.num_active < cur.num_guns) { - INFO("Soft start detected @", cur.tstamp); - - const projectId = await schema2pid(cur._schema ?? prev._schema); - - // TODO: Try and grab the corresponding comment from the configuration? - const payload = { - tstamp: cur.tstamp, - remarks: "Soft start", - labels: [ "Daily", "Guns", "Prod" ], - meta: {auto: true, author: `*${this.constructor.name}*`} - }; - DEBUG("Posting event", projectId, payload); - await event.post(projectId, payload); - - } else if (cur.num_active == cur.num_guns && prev.num_active < cur.num_active) { - INFO("Full volume detected @", cur.tstamp); - - const projectId = await schema2pid(cur._schema ?? prev._schema); - - // TODO: Try and grab the corresponding comment from the configuration? - const payload = { - tstamp: cur.tstamp, - remarks: "Full volume", - labels: [ "Daily", "Guns", "Prod" ], - meta: {auto: true, author: `*${this.constructor.name}*`} - }; - DEBUG("Posting event", projectId, payload); - await event.post(projectId, payload); - } - // Processing of this shot has already been completed. - // The queue can now move forward. - } catch (err) { - ERROR("DetectSoftStart Error") - ERROR(err); - } finally { - cur.isPending = false; - } - } + constructor () { + DEBUG(`${this.author} instantiated`); } - async run (data) { + async run (data, ctx) { + if (!data || data.channel !== "realtime") { return; } @@ -99,29 +21,59 @@ class DetectSoftStart { return; } - const meta = data.payload.new.meta; - - if (this.queue.length < DetectSoftStart.MAX_QUEUE_SIZE) { - - this.queue.push({ - isPending: this.queue.length, - _schema: meta._schema, - tstamp: meta.tstamp ?? meta.time, - shot: meta.shot, - lineStatus: meta.lineStatus, - _sequence: meta._sequence, - _point: meta._point, - lineName: meta.lineName, - num_guns: meta.num_guns, - num_active: meta.num_active - }); - - } else { - // FIXME Change to alert - ALERT("DetectSoftStart queue full at", this.queue.length); + if (!this.prev) { + DEBUG("Initialising `prev`"); + this.prev = data; + return; } - this.processQueue(); + try { + DEBUG("Running"); + const cur = data?.payload?.new?.meta; + const prev = this.prev?.payload?.new?.meta; + // DEBUG("%j", prev); + // DEBUG("%j", cur); + DEBUG("cur.num_guns: %d\ncur.num_active: %d\nprv.num_active: %d\ntest passed: %j", cur.num_guns, cur.num_active, prev.num_active, cur.num_active >= 1 && !prev.num_active && cur.num_active < cur.num_guns); + + + if (cur.num_active >= 1 && !prev.num_active && cur.num_active < cur.num_guns) { + INFO("Soft start detected @", cur.tstamp); + + // FIXME Shouldn't need to use schema2pid as pid already present in payload. + const projectId = await schema2pid(cur._schema ?? prev._schema); + + // TODO: Try and grab the corresponding comment from the configuration? + const payload = { + tstamp: cur.tstamp, + remarks: "Soft start", + labels: [ "Daily", "Guns", "Prod" ], + meta: {auto: true, author: `*${this.constructor.name}*`} + }; + DEBUG("Posting event", projectId, payload); + await event.post(projectId, payload); + + } else if (cur.num_active == cur.num_guns && prev.num_active < cur.num_active) { + INFO("Full volume detected @", cur.tstamp); + + const projectId = await schema2pid(cur._schema ?? prev._schema); + + // TODO: Try and grab the corresponding comment from the configuration? + const payload = { + tstamp: cur.tstamp, + remarks: "Full volume", + labels: [ "Daily", "Guns", "Prod" ], + meta: {auto: true, author: `*${this.constructor.name}*`} + }; + DEBUG("Posting event", projectId, payload); + await event.post(projectId, payload); + } + + } catch (err) { + DEBUG(`${this.author} error`, err); + throw err; + } finally { + this.prev = data; + } } } diff --git a/lib/www/server/events/handlers/detect-soleol.js b/lib/www/server/events/handlers/detect-soleol.js index 7cae9f6..40550b7 100644 --- a/lib/www/server/events/handlers/detect-soleol.js +++ b/lib/www/server/events/handlers/detect-soleol.js @@ -3,130 +3,15 @@ const { event } = require('../../lib/db'); const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename); class DetectSOLEOL { - /* Data may come much faster than we can process it, so we put it - * in a queue and process it at our own pace. - * - * The run() method fills the queue with the necessary data and then - * calls processQueue(). - * - * The processQueue() method looks takes the first two elements in - * the queue and processes them if they are not already being taken - * care of by a previous processQueue() call – this will happen when - * data is coming in faster than it can be processed. - * - * If the processQueue() call is the first to see the two bottommost - * two elements, it will process them and, when finished, it will set - * the `isPending` flag of the bottommost element to `false`, thus - * letting the next call know that it has work to do. - * - * If the queue was empty, run() will set the `isPending` flag of its - * first element to a falsy value, thus bootstrapping the process. - */ - static MAX_QUEUE_SIZE = 125000; - queue = []; + author = `*${this.constructor.name}*`; + prev = null; - async processQueue () { - DEBUG("Queue length", this.queue.length) - while (this.queue.length > 1) { - if (this.queue[0].isPending) { - DEBUG("Queue busy"); - setImmediate(() => this.processQueue()); - return; - } - - const prev = this.queue.shift(); - const cur = this.queue[0]; - - const sequence = Number(cur._sequence); - - try { - DEBUG("Sequence", sequence); - // DEBUG("Previous", prev); - // DEBUG("Current", cur); - - if (prev.lineName == cur.lineName && prev._sequence == cur._sequence && - prev.lineStatus != "online" && cur.lineStatus == "online" && sequence) { - INFO("Transition to ONLINE detected"); - // DEBUG(cur); - // DEBUG(prev); -// console.log("TRANSITION TO ONLINE", prev, cur); - - // Check if there are already FSP, FGSP events for this sequence - const projectId = await schema2pid(cur._schema); - const sequenceEvents = await event.list(projectId, {sequence}); - - const labels = ["FSP", "FGSP"].filter(l => !sequenceEvents.find(i => i.labels.includes(l))); - - if (labels.includes("FSP")) { - // At this point labels contains either FSP only or FSP + FGSP, - // depending on whether a FGSP event has already been entered. - - const remarks = `SEQ ${cur._sequence}, SOL ${cur.lineName}, BSP: ${(cur.speed*3.6/1.852).toFixed(1)} kt, Water depth: ${Number(cur.waterDepth).toFixed(0)} m.`; - const payload = { - type: "sequence", - sequence, - point: cur._point, - remarks, - labels, - meta: {auto: true, author: `*${this.constructor.name}*`} - } - -// console.log(projectId, payload); - INFO("Posting event", projectId, payload); - await event.post(projectId, payload); - } else { - // A first shot point has been already entered in the log, - // so we have nothing to do here. - INFO("FSP already in the log. Doing nothing"); - } - } else if (prev.lineStatus == "online" && cur.lineStatus != "online") { - INFO("Transition to OFFLINE detected"); - // DEBUG(cur); - // DEBUG(prev); -// console.log("TRANSITION TO OFFLINE", prev, cur); - - // Check if there are already LSP, LGSP events for this sequence - const projectId = await schema2pid(prev._schema); - const sequenceEvents = await event.list(projectId, {sequence}); - - const labels = ["LSP", "LGSP"].filter(l => !sequenceEvents.find(i => i.labels.includes(l))); - - if (labels.includes("LSP")) { - // At this point labels contains either LSP only or LSP + LGSP, - // depending on whether a LGSP event has already been entered. - - const remarks = `SEQ ${prev._sequence}, EOL ${prev.lineName}, BSP: ${(prev.speed*3.6/1.852).toFixed(1)} kt, Water depth: ${Number(prev.waterDepth).toFixed(0)} m.`; - const payload = { - type: "sequence", - sequence, - point: prev._point, - remarks, - labels, - meta: {auto: true, author: `*${this.constructor.name}*`} - } - -// console.log(projectId, payload); - INFO("Posting event", projectId, payload); - await event.post(projectId, payload); - } else { - // A first shot point has been already entered in the log, - // so we have nothing to do here. - INFO("LSP already in the log. Doing nothing"); - } - } - // Processing of this shot has already been completed. - // The queue can now move forward. - } catch (err) { - console.error("DetectSOLEOL Error") - console.log(err); - } finally { - cur.isPending = false; - } - } + constructor () { + DEBUG(`${this.author} instantiated`); } - async run (data) { + async run (data, ctx) { if (!data || data.channel !== "realtime") { return; } @@ -135,30 +20,69 @@ class DetectSOLEOL { return; } - const meta = data.payload.new.meta; - - if (this.queue.length < DetectSOLEOL.MAX_QUEUE_SIZE) { - - this.queue.push({ - isPending: this.queue.length, - _schema: meta._schema, - time: meta.time, - shot: meta.shot, - lineStatus: meta.lineStatus, - _sequence: meta._sequence, - _point: meta._point, - lineName: meta.lineName, - speed: meta.speed, - waterDepth: meta.waterDepth - }); - - } else { - // FIXME Change to alert - console.error("DetectSOLEOL queue full at", this.queue.length); + if (!this.prev) { + DEBUG("Initialising `prev`"); + this.prev = data; + return; } - this.processQueue(); + try { + DEBUG("Running"); + // DEBUG("%j", data); + const cur = data?.payload?.new?.meta; + const prev = this.prev?.payload?.new?.meta; + const sequence = Number(cur._sequence); + + // DEBUG("%j", prev); + // DEBUG("%j", cur); + DEBUG("prv.lineName: %s\ncur.lineName: %s\nprv._sequence: %s\ncur._sequence: %s\nprv.lineStatus: %s\ncur.lineStatus: %s", prev.lineName, cur.lineName, prev._sequence, cur._sequence, prev.lineStatus, cur.lineStatus); + + if (prev.lineName == cur.lineName && prev._sequence == cur._sequence && + prev.lineStatus != "online" && cur.lineStatus == "online" && sequence) { + INFO("Transition to ONLINE detected"); + + // We must use schema2pid because the pid may not have been + // populated for this event. + const projectId = await schema2pid(cur._schema ?? prev._schema); + const labels = ["FSP", "FGSP"]; + const remarks = `SEQ ${cur._sequence}, SOL ${cur.lineName}, BSP: ${(cur.speed*3.6/1.852).toFixed(1)} kt, Water depth: ${Number(cur.waterDepth).toFixed(0)} m.`; + const payload = { + type: "sequence", + sequence, + point: cur._point, + remarks, + labels, + meta: {auto: true, author: `*${this.constructor.name}*`} + } + INFO("Posting event", projectId, payload); + await event.post(projectId, payload); + } else if (prev.lineName == cur.lineName && prev._sequence == cur._sequence && + prev.lineStatus == "online" && cur.lineStatus != "online" && sequence) { + INFO("Transition to OFFLINE detected"); + + const projectId = await schema2pid(prev._schema ?? cur._schema); + const labels = ["LSP", "LGSP"]; + const remarks = `SEQ ${cur._sequence}, EOL ${cur.lineName}, BSP: ${(cur.speed*3.6/1.852).toFixed(1)} kt, Water depth: ${Number(cur.waterDepth).toFixed(0)} m.`; + const payload = { + type: "sequence", + sequence, + point: cur._point, + remarks, + labels, + meta: {auto: true, author: `*${this.constructor.name}*`} + } + INFO("Posting event", projectId, payload); + await event.post(projectId, payload); + } + + } catch (err) { + DEBUG(`${this.author} error`, err); + throw err; + } finally { + this.prev = data; + } } + } module.exports = DetectSOLEOL; diff --git a/lib/www/server/events/handlers/index.js b/lib/www/server/events/handlers/index.js index 25e07c2..ea8d021 100644 --- a/lib/www/server/events/handlers/index.js +++ b/lib/www/server/events/handlers/index.js @@ -1,15 +1,44 @@ +const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename); + const Handlers = [ + require('./detect-project-configuration-change'), require('./detect-soleol'), require('./detect-soft-start'), require('./report-line-change-time'), require('./detect-fdsp') ]; -function init () { - return Handlers.map(Handler => new Handler()); +function init (ctx) { + + const instances = Handlers.map(Handler => new Handler(ctx)); + + function prepare (data, ctx) { + const promises = []; + for (let instance of instances) { + const promise = new Promise(async (resolve, reject) => { + try { + DEBUG("Run", instance.author); + const result = await instance.run(data, ctx); + DEBUG("%s result: %O", instance.author, result); + resolve(result); + } catch (err) { + ERROR("%s error:\n%O", instance.author, err); + reject(err); + } + }); + promises.push(promise); + } + return promises; + } + + function despatch (data, ctx) { + return Promise.allSettled(prepare(data, ctx)); + } + + return { instances, prepare, despatch }; } module.exports = { Handlers, init -} +}; diff --git a/lib/www/server/events/handlers/report-line-change-time.js b/lib/www/server/events/handlers/report-line-change-time.js index 0871953..d6dece6 100644 --- a/lib/www/server/events/handlers/report-line-change-time.js +++ b/lib/www/server/events/handlers/report-line-change-time.js @@ -1,60 +1,47 @@ -const { schema2pid } = require('../../lib/db/connection'); const { event, project } = require('../../lib/db'); const { withinValidity } = require('../../lib/utils/ranges'); const unique = require('../../lib/utils/unique'); const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename); class ReportLineChangeTime { - /* Data may come much faster than we can process it, so we put it - * in a queue and process it at our own pace. - * - * The run() method fills the queue with the necessary data and then - * calls processQueue(). - * - * The processQueue() method looks takes the first two elements in - * the queue and processes them if they are not already being taken - * care of by a previous processQueue() call – this will happen when - * data is coming in faster than it can be processed. - * - * If the processQueue() call is the first to see the two bottommost - * two elements, it will process them and, when finished, it will set - * the `isPending` flag of the bottommost element to `false`, thus - * letting the next call know that it has work to do. - * - * If the queue was empty, run() will set the `isPending` flag of its - * first element to a falsy value, thus bootstrapping the process. - */ - static MAX_QUEUE_SIZE = 125000; - - queue = []; author = `*${this.constructor.name}*`; - async processQueue () { - DEBUG("Queue length", this.queue.length) - while (this.queue.length > 0) { - if (this.queue[0].isPending) { - DEBUG("Queue busy"); - setTimeout(() => this.processQueue(), 1000); // We're not in a hurry - return; - } + constructor () { + DEBUG(`${this.author} instantiated`); + } - const cur = this.queue.shift(); - const next = this.queue[0]; - const projectId = cur.pid; - // Are we being called because of a LGSP or because of a FGSP? - const forward = (cur.old?.labels?.includes("LGSP") || cur.new?.labels?.includes("LGSP")); + async run (data, ctx) { + + if (!data || data.channel !== "event") { + return; + } + + const n = data.payload.new; + const o = data.payload.old; + + if (!(n?.labels) && !(o?.labels)) { + return; + } + + if (!n?.labels?.includes("FGSP") && !o?.labels?.includes("FGSP") && + !n?.labels?.includes("LGSP") && !o?.labels?.includes("LGSP")) { + return; + } + + + try { + DEBUG("Running"); + const cur = data; + const projectId = cur?.payload?.pid; + const forward = (cur?.payload?.old?.labels?.includes("LGSP") || cur?.payload?.new?.labels?.includes("LGSP")); + DEBUG("%j", cur); if (!projectId) { throw {message: "No projectID found in event", cur}; return; } - async function getConfiguration (projectId) { - return await project.configuration.get(projectId); - } - - async function getLineChangeTime (data, forward = false) { if (forward) { const ospEvents = await event.list(projectId, {label: "FGSP"}); @@ -126,7 +113,7 @@ class ReportLineChangeTime { const createLineChangeTimeEvents = async (lineChangeTime, data, osp) => { const events = []; - const cfg = (await project.configuration.get(projectId)); + const cfg = ctx?.projects?.configuration?.[projectId] ?? {}; const nlcd = cfg?.production?.nominalLineChangeDuration * 60*1000; // m → ms DEBUG("nlcd", nlcd); if (nlcd && lineChangeTime > nlcd) { @@ -196,108 +183,48 @@ class ReportLineChangeTime { await event.post(projectId, payload); } - try { - // DEBUG("Previous", prev); - DEBUG("Current", cur); - DEBUG("Forward search", forward); - // We have these scenarios to consider: - // INSERT: - // `old` will be NULL - // Add event with line change time: - // - match validity with `new` - // - meta.ReportLineChangeTime.link refers to new.uid (or new.id?) - // UPDATE: - // `old` is not NULL - // `new` is not NULL - // Delete previous event from event_log (not event_log_full) - // Add event with line change time: - // - match validity with `new` - // - meta.ReportLineChangeTime.link refers to new.uid (or new.id?) - // DELETE: - // `old` is not NULL - // `new` will be NULL - // Delete previous event from event_log (not event_log_full) + await deleteStaleEvents([cur.old?.sequence, cur.new?.sequence]); - await deleteStaleEvents([cur.old?.sequence, cur.new?.sequence]); + if (cur?.payload?.operation == "INSERT") { + // NOTE: UPDATE on the event_log view translates to one UPDATE plus one INSERT + // on event_log_full, so we don't need to worry about UPDATE here. + const data = n; + DEBUG("INSERT seen: will add lct events related to ", data.id); - if (cur.operation == "INSERT") { - // NOTE: UPDATE on the event_log view translates to one UPDATE plus one INSERT - // on event_log_full, so we don't need to worry about UPDATE here. - const data = cur.new; - DEBUG("INSERT seen: will add lct events related to ", data.id); + if (withinValidity(data.validity)) { + DEBUG("Event within validity period", data.validity, new Date()); - if (withinValidity(data.validity)) { - DEBUG("Event within validity period", data.validity, new Date()); + data.tstamp = new Date(data.tstamp); + const { lineChangeTime, osp } = await getLineChangeTime(data, forward); - data.tstamp = new Date(data.tstamp); - const { lineChangeTime, osp } = await getLineChangeTime(data, forward); + if (lineChangeTime) { - if (lineChangeTime) { + const events = await createLineChangeTimeEvents(lineChangeTime, data, osp); - const events = await createLineChangeTimeEvents(lineChangeTime, data, osp); - - if (events?.length) { - DEBUG("Deleting other events for sequence", events[0].sequence); - await deleteStaleEvents(events[0].sequence); - } - - for (let payload of events) { - await maybePostEvent(projectId, payload); - } + if (events?.length) { + DEBUG("Deleting other events for sequence", events[0].sequence); + await deleteStaleEvents(events[0].sequence); + } + + for (let payload of events) { + await maybePostEvent(projectId, payload); } - } else { - DEBUG("Event outside of validity range", data.validity, "lct events not inserted"); } - + } else { + DEBUG("Event outside of validity range", data.validity, "lct events not inserted"); } - - // Processing of this shot has already been completed. - // The queue can now move forward. - } catch (err) { - ERROR("ReportLineChangeTime Error") - ERROR(err); - } finally { - if (next) { - next.isPending = false; - } } - } - } - async run (data) { - DEBUG("Seen", data); - if (!data || data.channel !== "event") { - return; + + + } catch (err) { + ERROR(`${this.author} error`, err); + throw err; } - if (!(data.payload?.new?.labels) && !(data.payload?.old?.labels)) { - return; - } - const n = data.payload.new; - const o = data.payload.old; - - if (!n?.labels?.includes("FGSP") && !o?.labels?.includes("FGSP") && - !n?.labels?.includes("LGSP") && !o?.labels?.includes("LGSP")) { - return; - } - - if (this.queue.length < ReportLineChangeTime.MAX_QUEUE_SIZE) { - - const item = { - ...structuredClone(data.payload), - isPending: this.queue.length, - }; - DEBUG("Queueing", item); - this.queue.push(item); - - } else { - ALERT("ReportLineChangeTime queue full at", this.queue.length); - } - - this.processQueue(); } } diff --git a/lib/www/server/events/index.js b/lib/www/server/events/index.js index f415d46..a388cca 100644 --- a/lib/www/server/events/index.js +++ b/lib/www/server/events/index.js @@ -1,23 +1,25 @@ const { listen } = require('../lib/db/notify'); const channels = require('../lib/db/channels'); -const handlers = require('./handlers').init(); +const handlers = require('./handlers'); +const { ActionsQueue } = require('../lib/queue'); const { ERROR, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename); function start () { - listen(channels, async function (data) { + + const queue = new ActionsQueue(); + const ctx = {}; // Context object + + const { prepare, despatch } = handlers.init(ctx); + + listen(channels, function (data) { DEBUG("Incoming data", data); - for (const handler of handlers) { - // NOTE: We are intentionally passing the same instance - // of the data to every handler. This means that earlier - // handlers could, in principle, modify the data to be - // consumed by latter ones, provided that they are - // synchronous (as otherwise, the completion order is - // undefined). - await handler.run(data); - } + + // We don't bother awaiting + queue.enqueue(() => despatch(data, ctx)); + DEBUG("Queue size", queue.length()); }); - INFO("Events manager started.", handlers.length, "active handlers"); + INFO("Events manager started"); } module.exports = { start } diff --git a/lib/www/server/index.js b/lib/www/server/index.js index 09221bd..fb6751d 100755 --- a/lib/www/server/index.js +++ b/lib/www/server/index.js @@ -8,23 +8,55 @@ async function main () { INFO("Running version", await version.describe()); version.compatible() .then( (versions) => { - const api = require('./api'); - const ws = require('./ws'); + try { + const api = require('./api'); + const ws = require('./ws'); + const periodicTasks = require('./periodic-tasks').init(); - const { fork } = require('child_process'); + const { fork } = require('child_process'); - const port = process.env.HTTP_PORT || 3000; - const host = process.env.HTTP_HOST || "127.0.0.1"; - const path = process.env.HTTP_PATH ?? "/api"; - const server = api.start(port, host, path); - ws.start(server); + const port = process.env.HTTP_PORT || 3000; + const host = process.env.HTTP_HOST || "127.0.0.1"; + const path = process.env.HTTP_PATH ?? "/api"; + const server = api.start(port, host, path); + ws.start(server); - const eventManagerPath = [__dirname, "events"].join("/"); - const eventManager = fork(eventManagerPath, /*{ stdio: 'ignore' }*/); + INFO("Versions:", versions); - INFO("Versions:", versions); + periodicTasks.start(); - process.on('exit', () => eventManager.kill()); + const eventManagerPath = [__dirname, "events"].join("/"); + const eventManager = fork(eventManagerPath, /*{ stdio: 'ignore' }*/); + + process.on("SIGINT", async () => { + DEBUG("Interrupted (SIGINT)"); + eventManager.kill() + await periodicTasks.cleanup(); + process.exit(0); + }) + + process.on("SIGHUP", async () => { + DEBUG("Stopping (SIGHUP)"); + eventManager.kill() + await periodicTasks.cleanup(); + process.exit(0); + }) + + process.on('beforeExit', async () => { + DEBUG("Preparing to exit"); + eventManager.kill() + await periodicTasks.cleanup(); + }); + + process.on('exit', async () => { + DEBUG("Exiting"); + // eventManager.kill() + // periodicTasks.cleanup(); + }); + } catch (err) { + ERROR(err); + process.exit(2); + } }) .catch( ({current, wanted, component}) => { console.error(`Fatal error: incompatible ${component} version ${current} (wanted: ${wanted})`); diff --git a/lib/www/server/lib/db/navdata/save.js b/lib/www/server/lib/db/navdata/save.js index 5e1c494..888d213 100644 --- a/lib/www/server/lib/db/navdata/save.js +++ b/lib/www/server/lib/db/navdata/save.js @@ -1,17 +1,43 @@ // FIXME This code is in painful need of refactoring -const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename); const { setSurvey, transaction, pool } = require('../connection'); +const { listen } = require('../notify'); +const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename); let last_tstamp = 0; -async function getAllProjectConfigs () { - const client = await pool.connect(); +let project_configs, listener; - const text = `SELECT schema, meta AS data FROM projects;`; - const res = await client.query(text); - client.release(); - return res.rows; +async function getAllProjectConfigs () { + + async function getFromDatabase () { + DEBUG("Getting project configurations"); + const client = await pool.connect(); + + try { + const text = ` + SELECT schema, meta AS data + FROM projects + WHERE (meta->>'archived')::boolean IS NOT true; + `; + const res = await client.query(text); + project_configs = res.rows; + DEBUG("Have configurations for projects", project_configs.map(i => i.data.id)); + } catch (err) { + ERROR(err); + } finally { + client.release(); + } + return project_configs; + } + + if (project_configs) { + return project_configs; + } else { + listener = await listen(["project"], getFromDatabase); + DEBUG("Added project configuration change listener"); + return await getFromDatabase(); + } } async function getNearestPreplot (candidates) { @@ -237,7 +263,7 @@ async function getCandidates (navData) { }); return obj; }).filter(c => !!c); - DEBUG("Candidates: %j", candidates.map(c => c.schema)); + // DEBUG("Candidates: %j", candidates.map(c => c.schema)); return candidates; } @@ -269,7 +295,7 @@ async function save (navData, opts = {}) { // Only one candidate, associate with it // console.log("Save into schema", candidates[0].match.schema); await saveOnline(candidates); - navData.payload._schema = candidates[0].match.schema; + navData.payload._schema = candidates[0].schema; } else { // More than one candidate, go for the closest. If more than one active // project with the same preplots, highest numbered schema. @@ -309,6 +335,7 @@ async function save (navData, opts = {}) { } await saveOffline(navData, opts); + DEBUG("Saved"); } module.exports = save; diff --git a/lib/www/server/lib/db/notify.js b/lib/www/server/lib/db/notify.js index 5ff9e9c..0540ae8 100644 --- a/lib/www/server/lib/db/notify.js +++ b/lib/www/server/lib/db/notify.js @@ -1,5 +1,43 @@ -const { makeSubscriber } = require('./connection'); +const { makeSubscriber, pool } = require('./connection'); +const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename); +async function purge () { + DEBUG("Purging old notifications"); + const client = await pool.connect(); + try { + await client.query("CALL purge_notifications();"); + } catch (err) { + ERROR(err); + } finally { + client.release(); + } +} + +async function fullPayload (payload) { + + if (!payload.payload_id) { + return payload; + } else { + let client, res; + try { + client = await pool.connect(); + const text = `SELECT payload FROM notify_payloads WHERE id = $1;`; + const values = [ payload.payload_id ]; + res = await client.query(text, values); + res = res?.rows[0]?.payload; + DEBUG(`Oversize notification payload retrieved with id ${payload.payload_id} and size ${res.length}`); + // DEBUG(res); + res = JSON.parse(res); + } catch (err) { + ERROR(err); + } finally { + if (client) { + client.release(); + } + } + return res; + } +} async function listen (addChannels, callback) { @@ -18,11 +56,11 @@ async function listen (addChannels, callback) { for (const channel of addChannels) { await client.listenTo(channel); - client.notifications.on(channel, (payload) => { + client.notifications.on(channel, async (payload) => { const data = { channel, _received: new Date(), - payload + payload: await fullPayload(payload) }; callback(data); }); @@ -32,5 +70,6 @@ async function listen (addChannels, callback) { } module.exports = { - listen + listen, + purge }; diff --git a/lib/www/server/lib/queue/actions-queue.js b/lib/www/server/lib/queue/actions-queue.js new file mode 100644 index 0000000..069ded2 --- /dev/null +++ b/lib/www/server/lib/queue/actions-queue.js @@ -0,0 +1,52 @@ +const Queue = require('./queue'); + +// Inspired by: +// https://stackoverflow.com/questions/53540348/js-async-await-tasks-queue#53540586 + +class ActionsQueue extends Queue { + + constructor (items = []) { + super(items); + + this.pending = false; + } + + enqueue (action) { + return new Promise ((resolve, reject) => { + super.enqueue({ action, resolve, reject }); + this.dequeue(); + }); + } + + async dequeue () { + + if (this.pending) { + return false; + } + + const item = super.dequeue(); + + if (!item) { + return false; + } + + try { + + this.pending = true; + + const result = await item.action(this); + + this.pending = false; + item.resolve(result); + } catch (err) { + this.pending = false; + item.reject(err); + } finally { + this.dequeue(); + } + + } + +} + +module.exports = ActionsQueue; diff --git a/lib/www/server/lib/queue/index.js b/lib/www/server/lib/queue/index.js new file mode 100644 index 0000000..8305fea --- /dev/null +++ b/lib/www/server/lib/queue/index.js @@ -0,0 +1,6 @@ + +module.exports = { + Queue: require('./queue'), + ActionsQueue: require('./actions-queue') +}; + diff --git a/lib/www/server/lib/queue/queue.js b/lib/www/server/lib/queue/queue.js new file mode 100644 index 0000000..aabeb4d --- /dev/null +++ b/lib/www/server/lib/queue/queue.js @@ -0,0 +1,22 @@ + +class Queue { + + constructor (items = []) { + this.items = items; + } + + enqueue (item) { + this.items.push(item); + } + + dequeue () { + return this.items.shift(); + } + + length () { + return this.items.length; + } + +} + +module.exports = Queue; diff --git a/lib/www/server/periodic-tasks/index.js b/lib/www/server/periodic-tasks/index.js new file mode 100644 index 0000000..6baf887 --- /dev/null +++ b/lib/www/server/periodic-tasks/index.js @@ -0,0 +1,38 @@ +const tasks = require('./tasks'); +const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename); + +function init () { + const iids = []; + + function start () { + INFO("Initialising %d periodic tasks", tasks.length); + for (let t of tasks) { + const iid = setInterval(t.task, t.timeout); + iids.push(iid); + } + return iids; + }; + + function stop () { + INFO("Stopping %d periodic tasks", iids.length); + for (let iid of iids) { + clearInterval(iid); + } + } + + async function cleanup () { + stop(); + DEBUG("Cleaning up %d periodic tasks", tasks.length); + for (let t of tasks) { + if (t.cleanup) { + await t.cleanup(); + } + } + } + + return { start, stop, cleanup, iids }; +} + +module.exports = { + init +}; diff --git a/lib/www/server/periodic-tasks/tasks/index.js b/lib/www/server/periodic-tasks/tasks/index.js new file mode 100644 index 0000000..c5643da --- /dev/null +++ b/lib/www/server/periodic-tasks/tasks/index.js @@ -0,0 +1,4 @@ + +module.exports = [ + require('./purge-notifications') +]; diff --git a/lib/www/server/periodic-tasks/tasks/purge-notifications.js b/lib/www/server/periodic-tasks/tasks/purge-notifications.js new file mode 100644 index 0000000..6879f4e --- /dev/null +++ b/lib/www/server/periodic-tasks/tasks/purge-notifications.js @@ -0,0 +1,20 @@ +const { purge } = require('../../lib/db/notify'); +const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename); + +const timeout = 120*1000; // 2 minutes + +function task () { + DEBUG("Running task"); + purge(); +} + +async function cleanup () { + DEBUG("Running cleanup"); + await purge(); +} + +module.exports = { + task, + timeout, + cleanup +}; diff --git a/lib/www/server/udp/server.js b/lib/www/server/udp/server.js index 728f141..15194ac 100755 --- a/lib/www/server/udp/server.js +++ b/lib/www/server/udp/server.js @@ -95,7 +95,8 @@ for (const header of (cfg._("global.navigation.headers") || []).filter(h => h.ty const server = dgram.createSocket('udp4'); server.on('error', (err) => { - console.error(`server error:\n${err.stack}`); + ERROR(err); + // console.error(`server error:\n${err.stack}`); maybeSendError(err, {title: "UDP listener error on port "+header.port}); // server.close(); });