From 6a21ddd1cd2a633bae999973a1d040b5d663abe1 Mon Sep 17 00:00:00 2001 From: "D. Berge" Date: Sat, 14 Oct 2023 20:53:42 +0200 Subject: [PATCH] Rewrite events listener and handlers. The events listener now uses a proper self-consuming queue and the event handlers have been rewritten accordingly. The way this works is that running init() on the handlers library instantiates the handlers and returns two higher-order functions, prepare() and despatch(). A call to the latter of these is appended to the queue with each new incoming event. The handlers have access to a context object (ctx) which may be used to persist data between calls and/or exchange data between handlers. This is used notably to give the handlers access to project configurations, which are themselves refreshed by a project configuration change handler (DetectProjectConfigurationChange). --- lib/www/server/events/handlers/detect-fdsp.js | 182 ++++++---------- .../detect-project-configuration-change.js | 60 +++++ .../events/handlers/detect-soft-start.js | 162 +++++--------- .../server/events/handlers/detect-soleol.js | 206 ++++++------------ lib/www/server/events/handlers/index.js | 35 ++- .../handlers/report-line-change-time.js | 183 +++++----------- lib/www/server/events/index.js | 26 ++- 7 files changed, 351 insertions(+), 503 deletions(-) create mode 100644 lib/www/server/events/handlers/detect-project-configuration-change.js diff --git a/lib/www/server/events/handlers/detect-fdsp.js b/lib/www/server/events/handlers/detect-fdsp.js index 55d05af..1b30644 100644 --- a/lib/www/server/events/handlers/detect-fdsp.js +++ b/lib/www/server/events/handlers/detect-fdsp.js @@ -9,105 +9,16 @@ const { ALERT, ERROR, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename); * the last shot and first shot of the previous and current dates, respectively. */ class DetectFDSP { - /* Data may come much faster than we can process it, so we put it - * in a queue and process it at our own pace. - * - * The run() method fills the queue with the necessary data and then - * calls processQueue(). - * - * The processQueue() method looks at the first two elements in - * the queue and processes them if they are not already being taken - * care of by a previous processQueue() call – this will happen when - * data is coming in faster than it can be processed. - * - * If the processQueue() call is the first to see the two bottommost - * two elements, it will process them and, when finished, it will set - * the `isPending` flag of the bottommost element to `false`, thus - * letting the next call know that it has work to do. - * - * If the queue was empty, run() will set the `isPending` flag of its - * first element to a falsy value, thus bootstrapping the process. - */ - static MAX_QUEUE_SIZE = 125000; - queue = []; + author = `*${this.constructor.name}*`; + prev = null; - async processQueue () { - DEBUG("Queue length", this.queue.length) - while (this.queue.length > 1) { - - if (this.queue[0].isPending) { - setImmediate(() => this.processQueue()); - return; - } - - const prev = this.queue.shift(); - const cur = this.queue[0]; - - const sequence = Number(cur._sequence); - - try { - - if (prev.lineName == cur.lineName && prev._sequence == cur._sequence && - prev.lineStatus == "online" && cur.lineStatus == "online" && sequence) { - -// DEBUG("Previous", prev); -// DEBUG("Current", cur); - - if (prev.time.substr(0, 10) != cur.time.substr(0, 10)) { - // Possible a date change, but could also be a missing timestamp - // or something else. - - const ts0 = new Date(prev.time) - const ts1 = new Date(cur.time); - - if (!isNaN(ts0) && !isNaN(ts1) && ts0.getUTCDay() != ts1.getUTCDay()) { - INFO("Sequence shot across midnight UTC detected", cur._sequence, cur.lineName); - - const ldsp = { - sequence: prev._sequence, - point: prev._point, - remarks: "Last shotpoint of the day", - labels: ["LDSP", "Prod"], - meta: {auto: true, author: `*${this.constructor.name}*`} - }; - - const fdsp = { - sequence: cur._sequence, - point: cur._point, - remarks: "First shotpoint of the day", - labels: ["FDSP", "Prod"], - meta: {auto: true, author: `*${this.constructor.name}*`} - }; - - INFO("LDSP", ldsp); - INFO("FDSP", fdsp); - - const projectId = await schema2pid(prev._schema); - - if (projectId) { - await event.post(projectId, ldsp); - await event.post(projectId, fdsp); - } else { - ERROR("projectId not found for", prev._schema); - } - } else { - WARNING("False positive on these timestamps", prev.time, cur.time); - WARNING("No events were created"); - } - } - } - // Processing of this shot has already been completed. - // The queue can now move forward. - } catch (err) { - ERROR(err); - } finally { - cur.isPending = false; - } - } + constructor () { + DEBUG(`${this.author} instantiated`); } - async run (data) { + async run (data, ctx) { + if (!data || data.channel !== "realtime") { return; } @@ -116,27 +27,70 @@ class DetectFDSP { return; } - const meta = data.payload.new.meta; - - if (this.queue.length < DetectFDSP.MAX_QUEUE_SIZE) { - - const event = { - isPending: this.queue.length, - _schema: meta._schema, - time: meta.time, - lineStatus: meta.lineStatus, - _sequence: meta._sequence, - _point: meta._point, - lineName: meta.lineName - }; - this.queue.push(event); -// DEBUG("EVENT", event); - - } else { - ALERT("Queue full at", this.queue.length); + if (!this.prev) { + DEBUG("Initialising `prev`"); + this.prev = data; + return; } - this.processQueue(); + try { + DEBUG("Running"); + const cur = data; + const sequence = Number(cur._sequence); + + if (this.prev.lineName == cur.lineName && this.prev._sequence == cur._sequence && + this.prev.lineStatus == "online" && cur.lineStatus == "online" && sequence) { + + if (this.prev.time.substr(0, 10) != cur.time.substr(0, 10)) { + // Possibly a date change, but could also be a missing timestamp + // or something else. + + const ts0 = new Date(this.prev.time) + const ts1 = new Date(cur.time); + + if (!isNaN(ts0) && !isNaN(ts1) && ts0.getUTCDay() != ts1.getUTCDay()) { + INFO("Sequence shot across midnight UTC detected", cur._sequence, cur.lineName); + + const ldsp = { + sequence: this.prev._sequence, + point: this.prev._point, + remarks: "Last shotpoint of the day", + labels: ["LDSP", "Prod"], + meta: {auto: true, author: `*${this.constructor.name}*`} + }; + + const fdsp = { + sequence: cur._sequence, + point: cur._point, + remarks: "First shotpoint of the day", + labels: ["FDSP", "Prod"], + meta: {auto: true, author: `*${this.constructor.name}*`} + }; + + INFO("LDSP", ldsp); + INFO("FDSP", fdsp); + + const projectId = await schema2pid(this.prev._schema); + + if (projectId) { + await event.post(projectId, ldsp); + await event.post(projectId, fdsp); + } else { + ERROR("projectId not found for", this.prev._schema); + } + } else { + WARNING("False positive on these timestamps", this.prev.time, cur.time); + WARNING("No events were created"); + } + } + + } + } catch (err) { + DEBUG(`${this.author} error`, err); + throw err; + } finally { + this.prev = data; + } } } diff --git a/lib/www/server/events/handlers/detect-project-configuration-change.js b/lib/www/server/events/handlers/detect-project-configuration-change.js new file mode 100644 index 0000000..0a88b21 --- /dev/null +++ b/lib/www/server/events/handlers/detect-project-configuration-change.js @@ -0,0 +1,60 @@ +const project = require('../../lib/db/project'); +const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename); + +class DetectProjectConfigurationChange { + + author = `*${this.constructor.name}*`; + + constructor (ctx) { + DEBUG(`${this.author} instantiated`); + + // Grab project configurations. + // NOTE that this will run asynchronously + this.run({channel: "project"}, ctx); + } + + async run (data, ctx) { + + if (!data || data.channel !== "project") { + return; + } + + // Project notifications, as of this writing, most likely + // do not carry payloads as those exceed the notification + // size limit. + // For our purposes, we do not care as we just re-read all + // the configurations for all non-archived projects. + + try { + DEBUG("Project configuration change detected") + + const projects = await project.get(); + + const _ctx_data = {}; + for (let pid of projects.map(i => i.pid)) { + DEBUG("Retrieving configuration for", pid); + const cfg = await project.configuration.get(pid); + if (cfg?.archived === true) { + DEBUG(pid, "is archived. Ignoring"); + continue; + } + + DEBUG("Saving configuration for", pid); + _ctx_data[pid] = cfg; + } + + if (! ("projects" in ctx)) { + ctx.projects = {}; + } + + ctx.projects.configuration = _ctx_data; + DEBUG("Committed project configuration to ctx.projects.configuration"); + + } catch (err) { + DEBUG(`${this.author} error`, err); + throw err; + } + } +} + +module.exports = DetectProjectConfigurationChange; diff --git a/lib/www/server/events/handlers/detect-soft-start.js b/lib/www/server/events/handlers/detect-soft-start.js index ea96e6c..6f92eb5 100644 --- a/lib/www/server/events/handlers/detect-soft-start.js +++ b/lib/www/server/events/handlers/detect-soft-start.js @@ -3,94 +3,16 @@ const { event } = require('../../lib/db'); const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename); class DetectSoftStart { - /* Data may come much faster than we can process it, so we put it - * in a queue and process it at our own pace. - * - * The run() method fills the queue with the necessary data and then - * calls processQueue(). - * - * The processQueue() method looks takes the first two elements in - * the queue and processes them if they are not already being taken - * care of by a previous processQueue() call – this will happen when - * data is coming in faster than it can be processed. - * - * If the processQueue() call is the first to see the two bottommost - * two elements, it will process them and, when finished, it will set - * the `isPending` flag of the bottommost element to `false`, thus - * letting the next call know that it has work to do. - * - * If the queue was empty, run() will set the `isPending` flag of its - * first element to a falsy value, thus bootstrapping the process. - */ - static MAX_QUEUE_SIZE = 125000; - queue = []; + author = `*${this.constructor.name}*`; + prev = null; - async processQueue () { - DEBUG("Queue length", this.queue.length) - while (this.queue.length > 1) { - if (this.queue[0].isPending) { - DEBUG("Queue busy"); - setImmediate(() => this.processQueue()); - return; - } - - const prev = this.queue.shift(); - const cur = this.queue[0]; - - try { - // DEBUG("Previous", prev); - // DEBUG("Current", cur); - - // TODO: - // Consider whether to remember if soft start / full volume events - // have already been emitted and wait until there is an online/offline - // transition before re-emitting. - // This may or may not be a good idea. - - // Look for a soft start or full volume event - if (cur.num_active >= 1 && !prev.num_active && cur.num_active < cur.num_guns) { - INFO("Soft start detected @", cur.tstamp); - - const projectId = await schema2pid(cur._schema ?? prev._schema); - - // TODO: Try and grab the corresponding comment from the configuration? - const payload = { - tstamp: cur.tstamp, - remarks: "Soft start", - labels: [ "Daily", "Guns", "Prod" ], - meta: {auto: true, author: `*${this.constructor.name}*`} - }; - DEBUG("Posting event", projectId, payload); - await event.post(projectId, payload); - - } else if (cur.num_active == cur.num_guns && prev.num_active < cur.num_active) { - INFO("Full volume detected @", cur.tstamp); - - const projectId = await schema2pid(cur._schema ?? prev._schema); - - // TODO: Try and grab the corresponding comment from the configuration? - const payload = { - tstamp: cur.tstamp, - remarks: "Full volume", - labels: [ "Daily", "Guns", "Prod" ], - meta: {auto: true, author: `*${this.constructor.name}*`} - }; - DEBUG("Posting event", projectId, payload); - await event.post(projectId, payload); - } - // Processing of this shot has already been completed. - // The queue can now move forward. - } catch (err) { - ERROR("DetectSoftStart Error") - ERROR(err); - } finally { - cur.isPending = false; - } - } + constructor () { + DEBUG(`${this.author} instantiated`); } - async run (data) { + async run (data, ctx) { + if (!data || data.channel !== "realtime") { return; } @@ -99,29 +21,59 @@ class DetectSoftStart { return; } - const meta = data.payload.new.meta; - - if (this.queue.length < DetectSoftStart.MAX_QUEUE_SIZE) { - - this.queue.push({ - isPending: this.queue.length, - _schema: meta._schema, - tstamp: meta.tstamp ?? meta.time, - shot: meta.shot, - lineStatus: meta.lineStatus, - _sequence: meta._sequence, - _point: meta._point, - lineName: meta.lineName, - num_guns: meta.num_guns, - num_active: meta.num_active - }); - - } else { - // FIXME Change to alert - ALERT("DetectSoftStart queue full at", this.queue.length); + if (!this.prev) { + DEBUG("Initialising `prev`"); + this.prev = data; + return; } - this.processQueue(); + try { + DEBUG("Running"); + const cur = data?.payload?.new?.meta; + const prev = this.prev?.payload?.new?.meta; + // DEBUG("%j", prev); + // DEBUG("%j", cur); + DEBUG("cur.num_guns: %d\ncur.num_active: %d\nprv.num_active: %d\ntest passed: %j", cur.num_guns, cur.num_active, prev.num_active, cur.num_active >= 1 && !prev.num_active && cur.num_active < cur.num_guns); + + + if (cur.num_active >= 1 && !prev.num_active && cur.num_active < cur.num_guns) { + INFO("Soft start detected @", cur.tstamp); + + // FIXME Shouldn't need to use schema2pid as pid already present in payload. + const projectId = await schema2pid(cur._schema ?? prev._schema); + + // TODO: Try and grab the corresponding comment from the configuration? + const payload = { + tstamp: cur.tstamp, + remarks: "Soft start", + labels: [ "Daily", "Guns", "Prod" ], + meta: {auto: true, author: `*${this.constructor.name}*`} + }; + DEBUG("Posting event", projectId, payload); + await event.post(projectId, payload); + + } else if (cur.num_active == cur.num_guns && prev.num_active < cur.num_active) { + INFO("Full volume detected @", cur.tstamp); + + const projectId = await schema2pid(cur._schema ?? prev._schema); + + // TODO: Try and grab the corresponding comment from the configuration? + const payload = { + tstamp: cur.tstamp, + remarks: "Full volume", + labels: [ "Daily", "Guns", "Prod" ], + meta: {auto: true, author: `*${this.constructor.name}*`} + }; + DEBUG("Posting event", projectId, payload); + await event.post(projectId, payload); + } + + } catch (err) { + DEBUG(`${this.author} error`, err); + throw err; + } finally { + this.prev = data; + } } } diff --git a/lib/www/server/events/handlers/detect-soleol.js b/lib/www/server/events/handlers/detect-soleol.js index 7cae9f6..40550b7 100644 --- a/lib/www/server/events/handlers/detect-soleol.js +++ b/lib/www/server/events/handlers/detect-soleol.js @@ -3,130 +3,15 @@ const { event } = require('../../lib/db'); const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename); class DetectSOLEOL { - /* Data may come much faster than we can process it, so we put it - * in a queue and process it at our own pace. - * - * The run() method fills the queue with the necessary data and then - * calls processQueue(). - * - * The processQueue() method looks takes the first two elements in - * the queue and processes them if they are not already being taken - * care of by a previous processQueue() call – this will happen when - * data is coming in faster than it can be processed. - * - * If the processQueue() call is the first to see the two bottommost - * two elements, it will process them and, when finished, it will set - * the `isPending` flag of the bottommost element to `false`, thus - * letting the next call know that it has work to do. - * - * If the queue was empty, run() will set the `isPending` flag of its - * first element to a falsy value, thus bootstrapping the process. - */ - static MAX_QUEUE_SIZE = 125000; - queue = []; + author = `*${this.constructor.name}*`; + prev = null; - async processQueue () { - DEBUG("Queue length", this.queue.length) - while (this.queue.length > 1) { - if (this.queue[0].isPending) { - DEBUG("Queue busy"); - setImmediate(() => this.processQueue()); - return; - } - - const prev = this.queue.shift(); - const cur = this.queue[0]; - - const sequence = Number(cur._sequence); - - try { - DEBUG("Sequence", sequence); - // DEBUG("Previous", prev); - // DEBUG("Current", cur); - - if (prev.lineName == cur.lineName && prev._sequence == cur._sequence && - prev.lineStatus != "online" && cur.lineStatus == "online" && sequence) { - INFO("Transition to ONLINE detected"); - // DEBUG(cur); - // DEBUG(prev); -// console.log("TRANSITION TO ONLINE", prev, cur); - - // Check if there are already FSP, FGSP events for this sequence - const projectId = await schema2pid(cur._schema); - const sequenceEvents = await event.list(projectId, {sequence}); - - const labels = ["FSP", "FGSP"].filter(l => !sequenceEvents.find(i => i.labels.includes(l))); - - if (labels.includes("FSP")) { - // At this point labels contains either FSP only or FSP + FGSP, - // depending on whether a FGSP event has already been entered. - - const remarks = `SEQ ${cur._sequence}, SOL ${cur.lineName}, BSP: ${(cur.speed*3.6/1.852).toFixed(1)} kt, Water depth: ${Number(cur.waterDepth).toFixed(0)} m.`; - const payload = { - type: "sequence", - sequence, - point: cur._point, - remarks, - labels, - meta: {auto: true, author: `*${this.constructor.name}*`} - } - -// console.log(projectId, payload); - INFO("Posting event", projectId, payload); - await event.post(projectId, payload); - } else { - // A first shot point has been already entered in the log, - // so we have nothing to do here. - INFO("FSP already in the log. Doing nothing"); - } - } else if (prev.lineStatus == "online" && cur.lineStatus != "online") { - INFO("Transition to OFFLINE detected"); - // DEBUG(cur); - // DEBUG(prev); -// console.log("TRANSITION TO OFFLINE", prev, cur); - - // Check if there are already LSP, LGSP events for this sequence - const projectId = await schema2pid(prev._schema); - const sequenceEvents = await event.list(projectId, {sequence}); - - const labels = ["LSP", "LGSP"].filter(l => !sequenceEvents.find(i => i.labels.includes(l))); - - if (labels.includes("LSP")) { - // At this point labels contains either LSP only or LSP + LGSP, - // depending on whether a LGSP event has already been entered. - - const remarks = `SEQ ${prev._sequence}, EOL ${prev.lineName}, BSP: ${(prev.speed*3.6/1.852).toFixed(1)} kt, Water depth: ${Number(prev.waterDepth).toFixed(0)} m.`; - const payload = { - type: "sequence", - sequence, - point: prev._point, - remarks, - labels, - meta: {auto: true, author: `*${this.constructor.name}*`} - } - -// console.log(projectId, payload); - INFO("Posting event", projectId, payload); - await event.post(projectId, payload); - } else { - // A first shot point has been already entered in the log, - // so we have nothing to do here. - INFO("LSP already in the log. Doing nothing"); - } - } - // Processing of this shot has already been completed. - // The queue can now move forward. - } catch (err) { - console.error("DetectSOLEOL Error") - console.log(err); - } finally { - cur.isPending = false; - } - } + constructor () { + DEBUG(`${this.author} instantiated`); } - async run (data) { + async run (data, ctx) { if (!data || data.channel !== "realtime") { return; } @@ -135,30 +20,69 @@ class DetectSOLEOL { return; } - const meta = data.payload.new.meta; - - if (this.queue.length < DetectSOLEOL.MAX_QUEUE_SIZE) { - - this.queue.push({ - isPending: this.queue.length, - _schema: meta._schema, - time: meta.time, - shot: meta.shot, - lineStatus: meta.lineStatus, - _sequence: meta._sequence, - _point: meta._point, - lineName: meta.lineName, - speed: meta.speed, - waterDepth: meta.waterDepth - }); - - } else { - // FIXME Change to alert - console.error("DetectSOLEOL queue full at", this.queue.length); + if (!this.prev) { + DEBUG("Initialising `prev`"); + this.prev = data; + return; } - this.processQueue(); + try { + DEBUG("Running"); + // DEBUG("%j", data); + const cur = data?.payload?.new?.meta; + const prev = this.prev?.payload?.new?.meta; + const sequence = Number(cur._sequence); + + // DEBUG("%j", prev); + // DEBUG("%j", cur); + DEBUG("prv.lineName: %s\ncur.lineName: %s\nprv._sequence: %s\ncur._sequence: %s\nprv.lineStatus: %s\ncur.lineStatus: %s", prev.lineName, cur.lineName, prev._sequence, cur._sequence, prev.lineStatus, cur.lineStatus); + + if (prev.lineName == cur.lineName && prev._sequence == cur._sequence && + prev.lineStatus != "online" && cur.lineStatus == "online" && sequence) { + INFO("Transition to ONLINE detected"); + + // We must use schema2pid because the pid may not have been + // populated for this event. + const projectId = await schema2pid(cur._schema ?? prev._schema); + const labels = ["FSP", "FGSP"]; + const remarks = `SEQ ${cur._sequence}, SOL ${cur.lineName}, BSP: ${(cur.speed*3.6/1.852).toFixed(1)} kt, Water depth: ${Number(cur.waterDepth).toFixed(0)} m.`; + const payload = { + type: "sequence", + sequence, + point: cur._point, + remarks, + labels, + meta: {auto: true, author: `*${this.constructor.name}*`} + } + INFO("Posting event", projectId, payload); + await event.post(projectId, payload); + } else if (prev.lineName == cur.lineName && prev._sequence == cur._sequence && + prev.lineStatus == "online" && cur.lineStatus != "online" && sequence) { + INFO("Transition to OFFLINE detected"); + + const projectId = await schema2pid(prev._schema ?? cur._schema); + const labels = ["LSP", "LGSP"]; + const remarks = `SEQ ${cur._sequence}, EOL ${cur.lineName}, BSP: ${(cur.speed*3.6/1.852).toFixed(1)} kt, Water depth: ${Number(cur.waterDepth).toFixed(0)} m.`; + const payload = { + type: "sequence", + sequence, + point: cur._point, + remarks, + labels, + meta: {auto: true, author: `*${this.constructor.name}*`} + } + INFO("Posting event", projectId, payload); + await event.post(projectId, payload); + } + + } catch (err) { + DEBUG(`${this.author} error`, err); + throw err; + } finally { + this.prev = data; + } } + } module.exports = DetectSOLEOL; diff --git a/lib/www/server/events/handlers/index.js b/lib/www/server/events/handlers/index.js index 25e07c2..ea8d021 100644 --- a/lib/www/server/events/handlers/index.js +++ b/lib/www/server/events/handlers/index.js @@ -1,15 +1,44 @@ +const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename); + const Handlers = [ + require('./detect-project-configuration-change'), require('./detect-soleol'), require('./detect-soft-start'), require('./report-line-change-time'), require('./detect-fdsp') ]; -function init () { - return Handlers.map(Handler => new Handler()); +function init (ctx) { + + const instances = Handlers.map(Handler => new Handler(ctx)); + + function prepare (data, ctx) { + const promises = []; + for (let instance of instances) { + const promise = new Promise(async (resolve, reject) => { + try { + DEBUG("Run", instance.author); + const result = await instance.run(data, ctx); + DEBUG("%s result: %O", instance.author, result); + resolve(result); + } catch (err) { + ERROR("%s error:\n%O", instance.author, err); + reject(err); + } + }); + promises.push(promise); + } + return promises; + } + + function despatch (data, ctx) { + return Promise.allSettled(prepare(data, ctx)); + } + + return { instances, prepare, despatch }; } module.exports = { Handlers, init -} +}; diff --git a/lib/www/server/events/handlers/report-line-change-time.js b/lib/www/server/events/handlers/report-line-change-time.js index 0871953..d6dece6 100644 --- a/lib/www/server/events/handlers/report-line-change-time.js +++ b/lib/www/server/events/handlers/report-line-change-time.js @@ -1,60 +1,47 @@ -const { schema2pid } = require('../../lib/db/connection'); const { event, project } = require('../../lib/db'); const { withinValidity } = require('../../lib/utils/ranges'); const unique = require('../../lib/utils/unique'); const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename); class ReportLineChangeTime { - /* Data may come much faster than we can process it, so we put it - * in a queue and process it at our own pace. - * - * The run() method fills the queue with the necessary data and then - * calls processQueue(). - * - * The processQueue() method looks takes the first two elements in - * the queue and processes them if they are not already being taken - * care of by a previous processQueue() call – this will happen when - * data is coming in faster than it can be processed. - * - * If the processQueue() call is the first to see the two bottommost - * two elements, it will process them and, when finished, it will set - * the `isPending` flag of the bottommost element to `false`, thus - * letting the next call know that it has work to do. - * - * If the queue was empty, run() will set the `isPending` flag of its - * first element to a falsy value, thus bootstrapping the process. - */ - static MAX_QUEUE_SIZE = 125000; - - queue = []; author = `*${this.constructor.name}*`; - async processQueue () { - DEBUG("Queue length", this.queue.length) - while (this.queue.length > 0) { - if (this.queue[0].isPending) { - DEBUG("Queue busy"); - setTimeout(() => this.processQueue(), 1000); // We're not in a hurry - return; - } + constructor () { + DEBUG(`${this.author} instantiated`); + } - const cur = this.queue.shift(); - const next = this.queue[0]; - const projectId = cur.pid; - // Are we being called because of a LGSP or because of a FGSP? - const forward = (cur.old?.labels?.includes("LGSP") || cur.new?.labels?.includes("LGSP")); + async run (data, ctx) { + + if (!data || data.channel !== "event") { + return; + } + + const n = data.payload.new; + const o = data.payload.old; + + if (!(n?.labels) && !(o?.labels)) { + return; + } + + if (!n?.labels?.includes("FGSP") && !o?.labels?.includes("FGSP") && + !n?.labels?.includes("LGSP") && !o?.labels?.includes("LGSP")) { + return; + } + + + try { + DEBUG("Running"); + const cur = data; + const projectId = cur?.payload?.pid; + const forward = (cur?.payload?.old?.labels?.includes("LGSP") || cur?.payload?.new?.labels?.includes("LGSP")); + DEBUG("%j", cur); if (!projectId) { throw {message: "No projectID found in event", cur}; return; } - async function getConfiguration (projectId) { - return await project.configuration.get(projectId); - } - - async function getLineChangeTime (data, forward = false) { if (forward) { const ospEvents = await event.list(projectId, {label: "FGSP"}); @@ -126,7 +113,7 @@ class ReportLineChangeTime { const createLineChangeTimeEvents = async (lineChangeTime, data, osp) => { const events = []; - const cfg = (await project.configuration.get(projectId)); + const cfg = ctx?.projects?.configuration?.[projectId] ?? {}; const nlcd = cfg?.production?.nominalLineChangeDuration * 60*1000; // m → ms DEBUG("nlcd", nlcd); if (nlcd && lineChangeTime > nlcd) { @@ -196,108 +183,48 @@ class ReportLineChangeTime { await event.post(projectId, payload); } - try { - // DEBUG("Previous", prev); - DEBUG("Current", cur); - DEBUG("Forward search", forward); - // We have these scenarios to consider: - // INSERT: - // `old` will be NULL - // Add event with line change time: - // - match validity with `new` - // - meta.ReportLineChangeTime.link refers to new.uid (or new.id?) - // UPDATE: - // `old` is not NULL - // `new` is not NULL - // Delete previous event from event_log (not event_log_full) - // Add event with line change time: - // - match validity with `new` - // - meta.ReportLineChangeTime.link refers to new.uid (or new.id?) - // DELETE: - // `old` is not NULL - // `new` will be NULL - // Delete previous event from event_log (not event_log_full) + await deleteStaleEvents([cur.old?.sequence, cur.new?.sequence]); - await deleteStaleEvents([cur.old?.sequence, cur.new?.sequence]); + if (cur?.payload?.operation == "INSERT") { + // NOTE: UPDATE on the event_log view translates to one UPDATE plus one INSERT + // on event_log_full, so we don't need to worry about UPDATE here. + const data = n; + DEBUG("INSERT seen: will add lct events related to ", data.id); - if (cur.operation == "INSERT") { - // NOTE: UPDATE on the event_log view translates to one UPDATE plus one INSERT - // on event_log_full, so we don't need to worry about UPDATE here. - const data = cur.new; - DEBUG("INSERT seen: will add lct events related to ", data.id); + if (withinValidity(data.validity)) { + DEBUG("Event within validity period", data.validity, new Date()); - if (withinValidity(data.validity)) { - DEBUG("Event within validity period", data.validity, new Date()); + data.tstamp = new Date(data.tstamp); + const { lineChangeTime, osp } = await getLineChangeTime(data, forward); - data.tstamp = new Date(data.tstamp); - const { lineChangeTime, osp } = await getLineChangeTime(data, forward); + if (lineChangeTime) { - if (lineChangeTime) { + const events = await createLineChangeTimeEvents(lineChangeTime, data, osp); - const events = await createLineChangeTimeEvents(lineChangeTime, data, osp); - - if (events?.length) { - DEBUG("Deleting other events for sequence", events[0].sequence); - await deleteStaleEvents(events[0].sequence); - } - - for (let payload of events) { - await maybePostEvent(projectId, payload); - } + if (events?.length) { + DEBUG("Deleting other events for sequence", events[0].sequence); + await deleteStaleEvents(events[0].sequence); + } + + for (let payload of events) { + await maybePostEvent(projectId, payload); } - } else { - DEBUG("Event outside of validity range", data.validity, "lct events not inserted"); } - + } else { + DEBUG("Event outside of validity range", data.validity, "lct events not inserted"); } - - // Processing of this shot has already been completed. - // The queue can now move forward. - } catch (err) { - ERROR("ReportLineChangeTime Error") - ERROR(err); - } finally { - if (next) { - next.isPending = false; - } } - } - } - async run (data) { - DEBUG("Seen", data); - if (!data || data.channel !== "event") { - return; + + + } catch (err) { + ERROR(`${this.author} error`, err); + throw err; } - if (!(data.payload?.new?.labels) && !(data.payload?.old?.labels)) { - return; - } - const n = data.payload.new; - const o = data.payload.old; - - if (!n?.labels?.includes("FGSP") && !o?.labels?.includes("FGSP") && - !n?.labels?.includes("LGSP") && !o?.labels?.includes("LGSP")) { - return; - } - - if (this.queue.length < ReportLineChangeTime.MAX_QUEUE_SIZE) { - - const item = { - ...structuredClone(data.payload), - isPending: this.queue.length, - }; - DEBUG("Queueing", item); - this.queue.push(item); - - } else { - ALERT("ReportLineChangeTime queue full at", this.queue.length); - } - - this.processQueue(); } } diff --git a/lib/www/server/events/index.js b/lib/www/server/events/index.js index f415d46..a388cca 100644 --- a/lib/www/server/events/index.js +++ b/lib/www/server/events/index.js @@ -1,23 +1,25 @@ const { listen } = require('../lib/db/notify'); const channels = require('../lib/db/channels'); -const handlers = require('./handlers').init(); +const handlers = require('./handlers'); +const { ActionsQueue } = require('../lib/queue'); const { ERROR, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename); function start () { - listen(channels, async function (data) { + + const queue = new ActionsQueue(); + const ctx = {}; // Context object + + const { prepare, despatch } = handlers.init(ctx); + + listen(channels, function (data) { DEBUG("Incoming data", data); - for (const handler of handlers) { - // NOTE: We are intentionally passing the same instance - // of the data to every handler. This means that earlier - // handlers could, in principle, modify the data to be - // consumed by latter ones, provided that they are - // synchronous (as otherwise, the completion order is - // undefined). - await handler.run(data); - } + + // We don't bother awaiting + queue.enqueue(() => despatch(data, ctx)); + DEBUG("Queue size", queue.length()); }); - INFO("Events manager started.", handlers.length, "active handlers"); + INFO("Events manager started"); } module.exports = { start }