diff --git a/lib/www/server/api/middleware/etag/watch.js b/lib/www/server/api/middleware/etag/watch.js index 6a9b99d..e568e23 100644 --- a/lib/www/server/api/middleware/etag/watch.js +++ b/lib/www/server/api/middleware/etag/watch.js @@ -66,8 +66,18 @@ const rels = [ function invalidateCache (data, cache) { return new Promise((resolve, reject) => { + if (!data) { + ERROR("invalidateCache called with no data"); + return; + } + + if (!data.payload) { + ERROR("invalidateCache called without a payload; channel = %s", data.channel); + return; + } + const channel = data.channel; - const project = data.payload.pid ?? data.payload?.new?.pid ?? data.payload?.old?.pid; + const project = data.payload?.pid ?? data.payload?.new?.pid ?? data.payload?.old?.pid; const operation = data.payload.operation; const table = data.payload.table; const fields = { channel, project, operation, table }; diff --git a/lib/www/server/events/handlers/detect-project-configuration-change.js b/lib/www/server/events/handlers/detect-project-configuration-change.js index 71c1f53..59117e2 100644 --- a/lib/www/server/events/handlers/detect-project-configuration-change.js +++ b/lib/www/server/events/handlers/detect-project-configuration-change.js @@ -1,4 +1,3 @@ -const project = require('../../lib/db/project'); const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename); class DetectProjectConfigurationChange { @@ -10,7 +9,7 @@ class DetectProjectConfigurationChange { // Grab project configurations. // NOTE that this will run asynchronously - this.run({channel: "project"}, ctx); + //this.run({channel: "project"}, ctx); } async run (data, ctx) { @@ -28,13 +27,13 @@ class DetectProjectConfigurationChange { try { DEBUG("Project configuration change detected") - const projects = await project.get(); - project.organisations.setCache(projects); + const projects = await ctx.db.project.get(); + ctx.db.project.organisations.setCache(projects); const _ctx_data = {}; for (let pid of projects.map(i => i.pid)) { DEBUG("Retrieving configuration for", pid); - const cfg = await project.configuration.get(pid); + const cfg = await ctx.db.project.configuration.get(pid); if (cfg?.archived === true) { DEBUG(pid, "is archived. Ignoring"); continue; diff --git a/lib/www/server/events/handlers/detect-soft-start.js b/lib/www/server/events/handlers/detect-soft-start.js index 6f92eb5..c093ddb 100644 --- a/lib/www/server/events/handlers/detect-soft-start.js +++ b/lib/www/server/events/handlers/detect-soft-start.js @@ -1,5 +1,3 @@ -const { schema2pid } = require('../../lib/db/connection'); -const { event } = require('../../lib/db'); const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename); class DetectSoftStart { @@ -33,14 +31,19 @@ class DetectSoftStart { const prev = this.prev?.payload?.new?.meta; // DEBUG("%j", prev); // DEBUG("%j", cur); - DEBUG("cur.num_guns: %d\ncur.num_active: %d\nprv.num_active: %d\ntest passed: %j", cur.num_guns, cur.num_active, prev.num_active, cur.num_active >= 1 && !prev.num_active && cur.num_active < cur.num_guns); + if (cur.lineStatus == "online" || prev.lineStatus == "online") { + DEBUG("lineStatus is online, assuming not in a soft start situation"); + return; + } + + DEBUG("cur.num_guns: %d\ncur.num_active: %d\nprv.num_active: %d\ncur.num_nofire: %d\nprev.num_nofire: %d", cur.num_guns, cur.num_active, prev.num_active, cur.num_nofire, prev.num_nofire); if (cur.num_active >= 1 && !prev.num_active && cur.num_active < cur.num_guns) { INFO("Soft start detected @", cur.tstamp); // FIXME Shouldn't need to use schema2pid as pid already present in payload. - const projectId = await schema2pid(cur._schema ?? prev._schema); + const projectId = await ctx.schema2pid(cur._schema ?? prev._schema); // TODO: Try and grab the corresponding comment from the configuration? const payload = { @@ -50,12 +53,16 @@ class DetectSoftStart { meta: {auto: true, author: `*${this.constructor.name}*`} }; DEBUG("Posting event", projectId, payload); - await event.post(projectId, payload); + if (ctx.dryRun) { + DEBUG(`DRY RUN: await ctx.db.event.post(${projectId}, ${payload});`); + } else { + await ctx.db.event.post(projectId, payload); + } - } else if (cur.num_active == cur.num_guns && prev.num_active < cur.num_active) { + } else if ((cur.num_active == cur.num_guns || (prev.num_nofire > 0 && cur.num_nofire == 0)) && prev.num_active < cur.num_active) { INFO("Full volume detected @", cur.tstamp); - const projectId = await schema2pid(cur._schema ?? prev._schema); + const projectId = await ctx.schema2pid(cur._schema ?? prev._schema); // TODO: Try and grab the corresponding comment from the configuration? const payload = { @@ -65,7 +72,11 @@ class DetectSoftStart { meta: {auto: true, author: `*${this.constructor.name}*`} }; DEBUG("Posting event", projectId, payload); - await event.post(projectId, payload); + if (ctx.dryRun) { + DEBUG(`DRY RUN: await ctx.db.event.post(${projectId}, ${payload});`); + } else { + await ctx.db.event.post(projectId, payload); + } } } catch (err) { diff --git a/lib/www/server/events/handlers/detect-soleol.js b/lib/www/server/events/handlers/detect-soleol.js index 40550b7..ab77b46 100644 --- a/lib/www/server/events/handlers/detect-soleol.js +++ b/lib/www/server/events/handlers/detect-soleol.js @@ -1,5 +1,3 @@ -const { schema2pid } = require('../../lib/db/connection'); -const { event } = require('../../lib/db'); const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename); class DetectSOLEOL { @@ -43,7 +41,7 @@ class DetectSOLEOL { // We must use schema2pid because the pid may not have been // populated for this event. - const projectId = await schema2pid(cur._schema ?? prev._schema); + const projectId = await ctx.schema2pid(cur._schema ?? prev._schema); const labels = ["FSP", "FGSP"]; const remarks = `SEQ ${cur._sequence}, SOL ${cur.lineName}, BSP: ${(cur.speed*3.6/1.852).toFixed(1)} kt, Water depth: ${Number(cur.waterDepth).toFixed(0)} m.`; const payload = { @@ -55,24 +53,32 @@ class DetectSOLEOL { meta: {auto: true, author: `*${this.constructor.name}*`} } INFO("Posting event", projectId, payload); - await event.post(projectId, payload); + if (ctx.dryRun) { + DEBUG(`DRY RUN: await ctx.db.event.post(${projectId}, ${payload});`); + } else { + await ctx.db.event.post(projectId, payload); + } } else if (prev.lineName == cur.lineName && prev._sequence == cur._sequence && prev.lineStatus == "online" && cur.lineStatus != "online" && sequence) { INFO("Transition to OFFLINE detected"); - const projectId = await schema2pid(prev._schema ?? cur._schema); + const projectId = await ctx.schema2pid(prev._schema ?? cur._schema); const labels = ["LSP", "LGSP"]; - const remarks = `SEQ ${cur._sequence}, EOL ${cur.lineName}, BSP: ${(cur.speed*3.6/1.852).toFixed(1)} kt, Water depth: ${Number(cur.waterDepth).toFixed(0)} m.`; + const remarks = `SEQ ${prev._sequence}, EOL ${prev.lineName}, BSP: ${(prev.speed*3.6/1.852).toFixed(1)} kt, Water depth: ${Number(prev.waterDepth).toFixed(0)} m.`; const payload = { type: "sequence", sequence, - point: cur._point, + point: prev._point, remarks, labels, meta: {auto: true, author: `*${this.constructor.name}*`} } INFO("Posting event", projectId, payload); - await event.post(projectId, payload); + if (ctx.dryRun) { + DEBUG(`DRY RUN: await ctx.db.event.post(${projectId}, ${payload});`); + } else { + await ctx.db.event.post(projectId, payload); + } } } catch (err) { diff --git a/lib/www/server/events/handlers/index.js b/lib/www/server/events/handlers/index.js index ea8d021..ab22278 100644 --- a/lib/www/server/events/handlers/index.js +++ b/lib/www/server/events/handlers/index.js @@ -8,37 +8,6 @@ const Handlers = [ require('./detect-fdsp') ]; -function init (ctx) { - - const instances = Handlers.map(Handler => new Handler(ctx)); - - function prepare (data, ctx) { - const promises = []; - for (let instance of instances) { - const promise = new Promise(async (resolve, reject) => { - try { - DEBUG("Run", instance.author); - const result = await instance.run(data, ctx); - DEBUG("%s result: %O", instance.author, result); - resolve(result); - } catch (err) { - ERROR("%s error:\n%O", instance.author, err); - reject(err); - } - }); - promises.push(promise); - } - return promises; - } - - function despatch (data, ctx) { - return Promise.allSettled(prepare(data, ctx)); - } - - return { instances, prepare, despatch }; -} - module.exports = { Handlers, - init }; diff --git a/lib/www/server/events/handlers/report-line-change-time.js b/lib/www/server/events/handlers/report-line-change-time.js index d6dece6..36c4d8c 100644 --- a/lib/www/server/events/handlers/report-line-change-time.js +++ b/lib/www/server/events/handlers/report-line-change-time.js @@ -1,6 +1,3 @@ -const { event, project } = require('../../lib/db'); -const { withinValidity } = require('../../lib/utils/ranges'); -const unique = require('../../lib/utils/unique'); const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename); class ReportLineChangeTime { @@ -44,7 +41,7 @@ class ReportLineChangeTime { async function getLineChangeTime (data, forward = false) { if (forward) { - const ospEvents = await event.list(projectId, {label: "FGSP"}); + const ospEvents = await ctx.db.event.list(projectId, {label: "FGSP"}); // DEBUG("ospEvents", ospEvents); const osp = ospEvents.filter(i => i.tstamp > data.tstamp).pop(); DEBUG("fsp", osp); @@ -55,7 +52,7 @@ class ReportLineChangeTime { return { lineChangeTime: osp.tstamp - data.tstamp, osp }; } } else { - const ospEvents = await event.list(projectId, {label: "LGSP"}); + const ospEvents = await ctx.db.event.list(projectId, {label: "LGSP"}); // DEBUG("ospEvents", ospEvents); const osp = ospEvents.filter(i => i.tstamp < data.tstamp).shift(); DEBUG("lsp", osp); @@ -96,16 +93,20 @@ class ReportLineChangeTime { const opts = {jpq}; if (Array.isArray(seq)) { - opts.sequences = unique(seq).filter(i => !!i); + opts.sequences = ctx.unique(seq).filter(i => !!i); } else { opts.sequence = seq; } - const staleEvents = await event.list(projectId, opts); + const staleEvents = await ctx.db.event.list(projectId, opts); DEBUG(staleEvents.length ?? 0, "events to delete"); for (let staleEvent of staleEvents) { DEBUG(`Deleting event id ${staleEvent.id} (seq = ${staleEvent.sequence}, point = ${staleEvent.point})`); - await event.del(projectId, staleEvent.id); + if (ctx.dryRun) { + DEBUG(`await ctx.db.event.del(${projectId}, ${staleEvent.id});`); + } else { + await ctx.db.event.del(projectId, staleEvent.id); + } } } } @@ -180,7 +181,11 @@ class ReportLineChangeTime { const maybePostEvent = async (projectId, payload) => { DEBUG("Posting event", projectId, payload); - await event.post(projectId, payload); + if (ctx.dryRun) { + DEBUG(`await ctx.db.event.post(${projectId}, ${payload});`); + } else { + await ctx.db.event.post(projectId, payload); + } } @@ -192,7 +197,7 @@ class ReportLineChangeTime { const data = n; DEBUG("INSERT seen: will add lct events related to ", data.id); - if (withinValidity(data.validity)) { + if (ctx.withinValidity(data.validity)) { DEBUG("Event within validity period", data.validity, new Date()); data.tstamp = new Date(data.tstamp); diff --git a/lib/www/server/events/index.js b/lib/www/server/events/index.js index a388cca..b017958 100644 --- a/lib/www/server/events/index.js +++ b/lib/www/server/events/index.js @@ -1,29 +1,101 @@ +const nodeAsync = require('async'); // npm install async const { listen } = require('../lib/db/notify'); +const db = require('../lib/db'); // Adjust paths; include all needed DB utils +const { schema2pid } = require('../lib/db/connection'); +const unique = require('../lib/utils/unique'); // If needed by handlers +const withinValidity = require('../lib/utils/ranges').withinValidity; // If needed +const { ALERT, ERROR, DEBUG } = require('DOUGAL_ROOT/debug')(__filename); + +// List of handler classes (add more as needed) +const handlerClasses = require('./handlers').Handlers; + +// Channels to listen to (hardcoded for simplicity; could scan handlers for mentions) const channels = require('../lib/db/channels'); -const handlers = require('./handlers'); -const { ActionsQueue } = require('../lib/queue'); -const { ERROR, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename); -function start () { +// Queue config: Process one at a time for order; max retries=3 +const eventQueue = nodeAsync.queue(async (task, callback) => { + const { data, ctx } = task; + DEBUG(`Processing event on channel ${data.channel} with timestamp ${data._received ?? 'unknown'}`); - const queue = new ActionsQueue(); - const ctx = {}; // Context object + for (const handler of ctx.handlers) { + try { + await handler.run(data, ctx); + } catch (err) { + ERROR(`Error in handler ${handler.constructor.name}:`, err); + // Retry logic: Could add task.retries++, re-enqueue if < max + } + } - const { prepare, despatch } = handlers.init(ctx); + if (typeof callback === 'function') { + // async v3.2.6+ does not use callsbacks with AsyncFunctions, but anyway + callback(); + } +}, 1); // Concurrency=1 for strict order - listen(channels, function (data) { - DEBUG("Incoming data", data); +eventQueue.error((err, task) => { + ALERT(`Queue error processing task:`, err, task); +}); - // We don't bother awaiting - queue.enqueue(() => despatch(data, ctx)); - DEBUG("Queue size", queue.length()); +// Main setup function (call from server init) +async function setupEventHandlers(projectsConfig) { + // Shared context + const ctx = { + dryRun: Boolean(process.env.DOUGAL_HANDLERS_DRY_RUN) ?? false, // If true, don't commit changes + projects: { configuration: projectsConfig }, // From user config + handlers: handlerClasses.map(Cls => new Cls()), // Instances + // DB utils (add more as needed) + db, + schema2pid, + unique, + withinValidity + // Add other utils, e.g., ctx.logger = DEBUG; + }; + + // Optional: Replay recent events on startup to rebuild state + // await replayRecentEvents(ctx); + + // Setup listener + const subscriber = await listen(channels, (rawData) => { + const data = { + ...rawData, + enqueuedAt: new Date() // For monitoring + }; + eventQueue.push({ data, ctx }); }); - INFO("Events manager started"); + DEBUG('Event handler system initialized with channels:', channels); + if (ctx.dryRun) { + DEBUG('DRY RUNNING'); + } + + // Return for cleanup if needed + return { + close: () => { + subscriber.events.removeAllListeners(); + subscriber.close(); + eventQueue.kill(); + } + }; } -module.exports = { start } +// Optional: Replay last N events to rebuild handler state (e.g., this.prev) +// async function replayRecentEvents(ctx) { +// try { +// // Example: Fetch last 10 realtime events, sorted by tstamp +// const recentRealtime = await event.listAllProjects({ channel: 'realtime', limit: 10, sort: 'tstamp DESC' }); +// // Assume event.listAllProjects is a custom DB method; implement if needed +// +// // Enqueue in original order (reverse sort) +// recentRealtime.reverse().forEach((evt) => { +// const data = { channel: 'realtime', payload: { new: evt } }; +// eventQueue.push({ data, ctx }); +// }); +// +// // Similarly for 'event' channel if needed +// DEBUG('Replayed recent events for state rebuild'); +// } catch (err) { +// ERROR('Error replaying events:', err); +// } +// } -if (require.main === module) { - start(); -} +module.exports = { setupEventHandlers }; diff --git a/lib/www/server/index.js b/lib/www/server/index.js index fb6751d..f3e5c4f 100755 --- a/lib/www/server/index.js +++ b/lib/www/server/index.js @@ -2,18 +2,37 @@ const { ERROR, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename); +async function getProjectConfigurations (opts = {}) { + const includeArchived = {includeArchived: false, ...opts}; + let projectConfigurations = {}; + try { + const db = require('./lib/db'); + const pids = (await db.project.get()) + .filter(i => includeArchived || !i.archived) + .map(i => i.pid); + for (const pid of pids) { + DEBUG(`Reading project configuration for ${pid}`); + const cfg = await db.project.configuration.get(pid); + projectConfigurations[pid] = cfg; + } + } catch (err) { + ERROR("Failed to get project configurations"); + ERROR(err); + } + return projectConfigurations; +} + async function main () { // Check that we're running against the correct database version const version = require('./lib/version'); INFO("Running version", await version.describe()); version.compatible() - .then( (versions) => { + .then( async (versions) => { try { const api = require('./api'); const ws = require('./ws'); const periodicTasks = require('./periodic-tasks').init(); - - const { fork } = require('child_process'); + const { setupEventHandlers } = require('./events'); const port = process.env.HTTP_PORT || 3000; const host = process.env.HTTP_HOST || "127.0.0.1"; @@ -25,33 +44,31 @@ async function main () { periodicTasks.start(); - const eventManagerPath = [__dirname, "events"].join("/"); - const eventManager = fork(eventManagerPath, /*{ stdio: 'ignore' }*/); + const projectConfigurations = await getProjectConfigurations(); + const handlerSystem = await setupEventHandlers(projectConfigurations); process.on("SIGINT", async () => { DEBUG("Interrupted (SIGINT)"); - eventManager.kill() + handlerSystem.close(); await periodicTasks.cleanup(); process.exit(0); }) process.on("SIGHUP", async () => { DEBUG("Stopping (SIGHUP)"); - eventManager.kill() + handlerSystem.close(); await periodicTasks.cleanup(); process.exit(0); }) process.on('beforeExit', async () => { DEBUG("Preparing to exit"); - eventManager.kill() + handlerSystem.close(); await periodicTasks.cleanup(); }); process.on('exit', async () => { DEBUG("Exiting"); - // eventManager.kill() - // periodicTasks.cleanup(); }); } catch (err) { ERROR(err); diff --git a/lib/www/server/lib/queue/actions-queue.js b/lib/www/server/lib/queue/actions-queue.js deleted file mode 100644 index 069ded2..0000000 --- a/lib/www/server/lib/queue/actions-queue.js +++ /dev/null @@ -1,52 +0,0 @@ -const Queue = require('./queue'); - -// Inspired by: -// https://stackoverflow.com/questions/53540348/js-async-await-tasks-queue#53540586 - -class ActionsQueue extends Queue { - - constructor (items = []) { - super(items); - - this.pending = false; - } - - enqueue (action) { - return new Promise ((resolve, reject) => { - super.enqueue({ action, resolve, reject }); - this.dequeue(); - }); - } - - async dequeue () { - - if (this.pending) { - return false; - } - - const item = super.dequeue(); - - if (!item) { - return false; - } - - try { - - this.pending = true; - - const result = await item.action(this); - - this.pending = false; - item.resolve(result); - } catch (err) { - this.pending = false; - item.reject(err); - } finally { - this.dequeue(); - } - - } - -} - -module.exports = ActionsQueue; diff --git a/lib/www/server/lib/queue/index.js b/lib/www/server/lib/queue/index.js deleted file mode 100644 index 8305fea..0000000 --- a/lib/www/server/lib/queue/index.js +++ /dev/null @@ -1,6 +0,0 @@ - -module.exports = { - Queue: require('./queue'), - ActionsQueue: require('./actions-queue') -}; - diff --git a/lib/www/server/lib/queue/queue.js b/lib/www/server/lib/queue/queue.js deleted file mode 100644 index aabeb4d..0000000 --- a/lib/www/server/lib/queue/queue.js +++ /dev/null @@ -1,22 +0,0 @@ - -class Queue { - - constructor (items = []) { - this.items = items; - } - - enqueue (item) { - this.items.push(item); - } - - dequeue () { - return this.items.shift(); - } - - length () { - return this.items.length; - } - -} - -module.exports = Queue; diff --git a/lib/www/server/package.json b/lib/www/server/package.json index f463bac..9a7a7dd 100644 --- a/lib/www/server/package.json +++ b/lib/www/server/package.json @@ -29,6 +29,7 @@ "@dougal/binary": "file:../../modules/@dougal/binary", "@dougal/organisations": "file:../../modules/@dougal/organisations", "@dougal/user": "file:../../modules/@dougal/user", + "async": "^3.2.6", "body-parser": "gitlab:aaltronav/contrib/expressjs/body-parser", "busboy": "^1.6.0", "compression": "^1.8.1", diff --git a/package-lock.json b/package-lock.json index 150a128..66072dd 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9359,6 +9359,7 @@ "@dougal/binary": "file:../../modules/@dougal/binary", "@dougal/organisations": "file:../../modules/@dougal/organisations", "@dougal/user": "file:../../modules/@dougal/user", + "async": "^3.2.6", "body-parser": "gitlab:aaltronav/contrib/expressjs/body-parser", "busboy": "^1.6.0", "compression": "^1.8.1", @@ -14171,6 +14172,11 @@ "node": ">=0.8" } }, + "node_modules/async": { + "version": "3.2.6", + "resolved": "https://registry.npmjs.org/async/-/async-3.2.6.tgz", + "integrity": "sha512-htCUDlxyyCLMgaM3xXg0C0LW2xqfuQ6p05pCEIsXuyQ+a1koYKTuBMzRNwmybfLgvJDMd0r1LTn4+E0Ti6C2AA==" + }, "node_modules/asynckit": { "version": "0.4.0", "resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz",