const { schema2pid } = require('../../lib/db/connection'); const { event } = require('../../lib/db'); const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename); class DetectSOLEOL { /* Data may come much faster than we can process it, so we put it * in a queue and process it at our own pace. * * The run() method fills the queue with the necessary data and then * calls processQueue(). * * The processQueue() method looks takes the first two elements in * the queue and processes them if they are not already being taken * care of by a previous processQueue() call – this will happen when * data is coming in faster than it can be processed. * * If the processQueue() call is the first to see the two bottommost * two elements, it will process them and, when finished, it will set * the `isPending` flag of the bottommost element to `false`, thus * letting the next call know that it has work to do. * * If the queue was empty, run() will set the `isPending` flag of its * first element to a falsy value, thus bootstrapping the process. */ static MAX_QUEUE_SIZE = 125000; queue = []; async processQueue () { DEBUG("Queue length", this.queue.length) while (this.queue.length > 1) { if (this.queue[0].isPending) { DEBUG("Queue busy"); setImmediate(() => this.processQueue()); return; } const prev = this.queue.shift(); const cur = this.queue[0]; const sequence = Number(cur._sequence); try { DEBUG("Sequence", sequence); // DEBUG("Previous", prev); // DEBUG("Current", cur); if (prev.lineName == cur.lineName && prev._sequence == cur._sequence && prev.lineStatus != "online" && cur.lineStatus == "online" && sequence) { INFO("Transition to ONLINE detected"); // DEBUG(cur); // DEBUG(prev); // console.log("TRANSITION TO ONLINE", prev, cur); // Check if there are already FSP, FGSP events for this sequence const projectId = await schema2pid(cur._schema); const sequenceEvents = await event.list(projectId, {sequence}); const labels = ["FSP", "FGSP"].filter(l => !sequenceEvents.find(i => i.labels.includes(l))); if (labels.includes("FSP")) { // At this point labels contains either FSP only or FSP + FGSP, // depending on whether a FGSP event has already been entered. const remarks = `SEQ ${cur._sequence}, SOL ${cur.lineName}, BSP: ${(cur.speed*3.6/1.852).toFixed(1)} kt, Water depth: ${Number(cur.waterDepth).toFixed(0)} m.`; const payload = { type: "sequence", sequence, point: cur._point, remarks, labels, meta: {auto: true, author: `*${this.constructor.name}*`} } // console.log(projectId, payload); INFO("Posting event", projectId, payload); await event.post(projectId, payload); } else { // A first shot point has been already entered in the log, // so we have nothing to do here. INFO("FSP already in the log. Doing nothing"); } } else if (prev.lineStatus == "online" && cur.lineStatus != "online") { INFO("Transition to OFFLINE detected"); // DEBUG(cur); // DEBUG(prev); // console.log("TRANSITION TO OFFLINE", prev, cur); // Check if there are already LSP, LGSP events for this sequence const projectId = await schema2pid(prev._schema); const sequenceEvents = await event.list(projectId, {sequence}); const labels = ["LSP", "LGSP"].filter(l => !sequenceEvents.find(i => i.labels.includes(l))); if (labels.includes("LSP")) { // At this point labels contains either LSP only or LSP + LGSP, // depending on whether a LGSP event has already been entered. const remarks = `SEQ ${prev._sequence}, EOL ${prev.lineName}, BSP: ${(prev.speed*3.6/1.852).toFixed(1)} kt, Water depth: ${Number(prev.waterDepth).toFixed(0)} m.`; const payload = { type: "sequence", sequence, point: prev._point, remarks, labels, meta: {auto: true, author: `*${this.constructor.name}*`} } // console.log(projectId, payload); INFO("Posting event", projectId, payload); await event.post(projectId, payload); } else { // A first shot point has been already entered in the log, // so we have nothing to do here. INFO("LSP already in the log. Doing nothing"); } } // Processing of this shot has already been completed. // The queue can now move forward. } catch (err) { console.error("DetectSOLEOL Error") console.log(err); } finally { cur.isPending = false; } } } async run (data) { if (!data || data.channel !== "realtime") { return; } if (!(data.payload && data.payload.new && data.payload.new.meta)) { return; } const meta = data.payload.new.meta; if (this.queue.length < DetectSOLEOL.MAX_QUEUE_SIZE) { this.queue.push({ isPending: this.queue.length, _schema: meta._schema, time: meta.time, shot: meta.shot, lineStatus: meta.lineStatus, _sequence: meta._sequence, _point: meta._point, lineName: meta.lineName, speed: meta.speed, waterDepth: meta.waterDepth }); } else { // FIXME Change to alert console.error("DetectSOLEOL queue full at", this.queue.length); } this.processQueue(); } } module.exports = DetectSOLEOL;