diff --git a/lib/www/server/events/handlers/detect-soleol.js b/lib/www/server/events/handlers/detect-soleol.js new file mode 100644 index 0000000..e817325 --- /dev/null +++ b/lib/www/server/events/handlers/detect-soleol.js @@ -0,0 +1,146 @@ +const { schema2pid } = require('../../lib/db/connection'); +const { event } = require('../../lib/db'); + +class DetectSOLEOL { + /* Data may come much faster than we can process it, so we put it + * in a queue and process it at our own pace. + * + * The run() method fills the queue with the necessary data and then + * calls processQueue(). + * + * The processQueue() method looks takes the first two elements in + * the queue and processes them if they are not already being taken + * care of by a previous processQueue() call – this will happen when + * data is coming in faster than it can be processed. + * + * If the processQueue() call is the first to see the two bottommost + * two elements, it will process them and, when finished, it will set + * the `isPending` flag of the bottommost element to `false`, thus + * letting the next call know that it has work to do. + * + * If the queue was empty, run() will set the `isPending` flag of its + * first element to a falsy value, thus bootstrapping the process. + */ + static MAX_QUEUE_SIZE = 125000; + + queue = []; + + async processQueue () { + while (this.queue.length > 1) { + if (this.queue[0].isPending) { + setImmediate(() => this.processQueue()); + return; + } + + const prev = this.queue.shift(); + const cur = this.queue[0]; + + const sequence = Number(cur._sequence); + + try { + + if (prev.lineName == cur.lineName && prev._sequence == cur._sequence && + prev.lineStatus != "online" && cur.lineStatus == "online" && sequence) { +// console.log("TRANSITION TO ONLINE", prev, cur); + + // Check if there are already FSP, FGSP events for this sequence + const projectId = await schema2pid(cur._schema); + const sequenceEvents = await event.list(projectId, {sequence}); + + const labels = ["FSP", "FGSP"].filter(l => !sequenceEvents.find(i => i.labels.includes(l))); + + if (labels.includes("FSP")) { + // At this point labels contains either FSP only or FSP + FGSP, + // depending on whether a FGSP event has already been entered. + + const remarks = `SEQ ${cur._sequence}, SOL ${cur.lineName}, BSP: ${(cur.speed*3.6/1.852).toFixed(1)} kt, Water depth: ${Number(cur.waterDepth).toFixed(0)} m.`; + const payload = { + type: "sequence", + sequence, + point: cur._point, + remarks, + labels + } + +// console.log(projectId, payload); + await event.post(projectId, payload); + } else { + // A first shot point has been already entered in the log, + // so we have nothing to do here. + } + } else if (prev.lineStatus == "online" && cur.lineStatus != "online") { +// console.log("TRANSITION TO OFFLINE", prev, cur); + + // Check if there are already LSP, LGSP events for this sequence + const projectId = await schema2pid(prev._schema); + const sequenceEvents = await event.list(projectId, {sequence}); + + const labels = ["LSP", "LGSP"].filter(l => !sequenceEvents.find(i => i.labels.includes(l))); + + if (labels.includes("LSP")) { + // At this point labels contains either LSP only or LSP + LGSP, + // depending on whether a LGSP event has already been entered. + + const remarks = `SEQ ${prev._sequence}, EOL ${prev.lineName}, BSP: ${(prev.speed*3.6/1.852).toFixed(1)} kt, Water depth: ${Number(prev.waterDepth).toFixed(0)} m.`; + const payload = { + type: "sequence", + sequence, + point: prev._point, + remarks, + labels + } + +// console.log(projectId, payload); + await event.post(projectId, payload); + } else { + // A first shot point has been already entered in the log, + // so we have nothing to do here. + } + } + // Processing of this shot has already been completed. + // The queue can now move forward. + } catch (err) { + console.error("DetectSOLEOL Error") + console.log(err); + } finally { + cur.isPending = false; + } + } + } + + async run (data) { + if (!data || !data.channel === "realtime") { + return; + } + + if (!(data.payload && data.payload.new && data.payload.new.meta)) { + return; + } + + const meta = data.payload.new.meta; + + if (this.queue.length < DetectSOLEOL.MAX_QUEUE_SIZE) { + + this.queue.push({ + isPending: this.queue.length, + _schema: meta._schema, + time: meta.time, + shot: meta.shot, + lineStatus: meta.lineStatus, + _sequence: meta._sequence, + _point: meta._point, + lineName: meta.lineName, + speed: meta.speed, + waterDepth: meta.waterDepth + }); + + } else { + // FIXME Change to alert + console.error("DetectSOLEOL queue full at", this.queue.length); + } + + this.processQueue(); + } +} + +module.exports = DetectSOLEOL; diff --git a/lib/www/server/events/handlers/index.js b/lib/www/server/events/handlers/index.js new file mode 100644 index 0000000..8739808 --- /dev/null +++ b/lib/www/server/events/handlers/index.js @@ -0,0 +1,12 @@ +const Handlers = [ + require('./detect-soleol') +]; + +function init () { + return Handlers.map(Handler => new Handler()); +} + +module.exports = { + Handlers, + init +} diff --git a/lib/www/server/events/index.js b/lib/www/server/events/index.js index 2e6c4ff..9a8ef1f 100644 --- a/lib/www/server/events/index.js +++ b/lib/www/server/events/index.js @@ -1,56 +1,21 @@ -const { schema2pid } = require('../lib/db/connection'); const { listen } = require('../ws/db'); -const { event } = require('../lib/db'); +const channels = require('../lib/db/channels'); +const handlers = require('./handlers').init(); function start () { - let prevPos = null; - - listen(["realtime"], function (data) { - if (!(data.payload && data.payload.new && data.payload.new.meta)) { - console.log("Wrong event", data); - return; + listen(channels, async function (data) { + for (const handler of handlers) { + // NOTE: We are intentionally passing the same instance + // of the data to every handler. This means that earlier + // handlers could, in principle, modify the data to be + // consumed by latter ones, provided that they are + // synchronous (as otherwise, the completion order is + // undefined). + await handler.run(data); } - - const pos = data.payload.new.meta; - - if (prevPos) { - if (pos.lineStatus == "online") { - if (prevPos.lineStatus != "online") { - // FIXME TODO Check if there are already FSP, FGSP events for this sequence - // Tag this as FSP/FGSP - const remarks = `SEQ ${pos._sequence}, SOL ${pos.lineName}, BSP: ${(pos.speed*3.6/1.852).toFixed(1)} kt, Water depth: ${Number(pos.waterDepth).toFixed(0)} m.`; - const payload = { - type: "sequence", - sequence: pos._sequence, - point: pos._point, - remarks, - labels: [ "FSP", "FGSP" ] - } - schema2pid(pos._schema).then(projectId => event.post(projectId, payload)); - // console.log("post fsp", pos._schema); - } - } else { - if (prevPos.lineStatus == "online") { - // FIXME TODO Check if there are already LSP, LGSP events for this sequence - // Tag this as LSP/LGSP - const remarks = `SEQ ${prevPos._sequence}, EOL ${prevPos.lineName}, BSP: ${(prevPos.speed*3.6/1.852).toFixed(1)} kt, Water depth: ${Number(prevPos.waterDepth).toFixed(0)} m.`; - const payload = { - type: "sequence", - sequence: prevPos._sequence, - point: prevPos._point, - remarks, - labels: [ "LSP", "LGSP" ] - } - schema2pid(prevPos._schema).then(projectId => event.post(projectId, payload)); - // console.log("post lsp", prevPos._schema); - } - } - } - - prevPos = JSON.parse(JSON.stringify(pos)); }); - console.log("Events manager started"); + console.log("Events manager started.", handlers.length, "active handlers"); } module.exports = { start }