const { schema2pid } = require('../../lib/db/connection'); const { event, project } = require('../../lib/db'); const { withinValidity } = require('../../lib/utils/ranges'); const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename); class ReportLineChangeTime { /* Data may come much faster than we can process it, so we put it * in a queue and process it at our own pace. * * The run() method fills the queue with the necessary data and then * calls processQueue(). * * The processQueue() method looks takes the first two elements in * the queue and processes them if they are not already being taken * care of by a previous processQueue() call – this will happen when * data is coming in faster than it can be processed. * * If the processQueue() call is the first to see the two bottommost * two elements, it will process them and, when finished, it will set * the `isPending` flag of the bottommost element to `false`, thus * letting the next call know that it has work to do. * * If the queue was empty, run() will set the `isPending` flag of its * first element to a falsy value, thus bootstrapping the process. */ static MAX_QUEUE_SIZE = 125000; queue = []; author = `*${this.constructor.name}*`; async processQueue () { DEBUG("Queue length", this.queue.length) while (this.queue.length > 0) { if (this.queue[0].isPending) { DEBUG("Queue busy"); setImmediate(() => this.processQueue()); return; } const cur = this.queue.shift(); const next = this.queue[0]; const projectId = cur.pid; if (!projectId) { WARNING("No projectID found in event", cur); return; } async function getConfiguration (projectId) { return await project.configuration.get(projectId); } async function getLineChangeTime (data) { const lspEvents = await event.list(projectId, {label: "LSP"}); const lsp = lspEvents.filter(i => i.tstamp < data.tstamp).shift(); // DEBUG("lspEvents", lspEvents); DEBUG("lsp", lsp); // DEBUG("data", data); if (lsp) { DEBUG("Δt", data.tstamp - lsp.tstamp); return data.tstamp - lsp.tstamp; } } function parseInterval (dt) { const daySeconds = (dt/1000) % 86400; const d = Math.floor((dt/1000) / 86400); const dateObject = new Date(null); dateObject.setSeconds(daySeconds); const [ h, m, s ] = dateObject.toISOString().slice(11, 19).split(":").map(Number); return {d, h, m, s}; } function formatInterval (i) { let str = ""; for (let [k, v] of Object.entries(i)) { if (v) { str += " " + v + " " + k; } } return str.trim(); } try { // DEBUG("Previous", prev); DEBUG("Current", cur); // We have these scenarios to consider: // INSERT: // `old` will be NULL // Add event with line change time: // - match validity with `new` // - meta.ReportLineChangeTime.link refers to new.uid (or new.id?) // UPDATE: // `old` is not NULL // `new` is not NULL // Delete previous event from event_log (not event_log_full) // Add event with line change time: // - match validity with `new` // - meta.ReportLineChangeTime.link refers to new.uid (or new.id?) // DELETE: // `old` is not NULL // `new` will be NULL // Delete previous event from event_log (not event_log_full) if (cur.operation == "UPDATE" || cur.operation == "DELETE") { const data = cur.old; const jpq = `$."${this.author}".parents ? (@ == ${data.id})`; const staleEvents = await event.list(projectId, {jpq}); for (let staleEvent of staleEvents) { DEBUG("Deleting event", staleEvent.id); await event.del(projectId, staleEvent.id); } } if (cur.operation == "INSERT" || cur.operation == "UPDATE") { const data = cur.new; if (withinValidity(data.validity)) { data.tstamp = new Date(data.tstamp); const lineChangeTime = await getLineChangeTime(data); if (lineChangeTime) { DEBUG("lineChangeTime", lineChangeTime); const cfg = (await project.configuration.get(projectId)); const nlcd = cfg?.production?.nominalLineChangeDuration * 60*1000; // m → ms DEBUG("nlcd", nlcd); if (nlcd && lineChangeTime > nlcd) { const excess = lineChangeTime-nlcd; const excessString = formatInterval(parseInterval(excess)); DEBUG("excess", excess, excessString); const payload = { tstamp: new Date(data.tstamp-1), // sequence: data.sequence, // point: data.point, remarks: `_Nominal line change duration exceeded by ${excessString}_`, labels: [ "Nav", "Prod" ], meta: { auto: true, author: this.author, [this.author]: { parents: [ data.id, // FIXME And also the corresponding LSP event id ], value: excess } } } DEBUG("Posting event (duration exceeded)", projectId, payload); await event.post(projectId, payload); } const lctString = formatInterval(parseInterval(lineChangeTime)); const payload = { tstamp: new Date(data.tstamp-1), // sequence: data.sequence, // point: data.point, remarks: `Line change time: ${lctString}`, labels: [ "Nav", "Prod" ], meta: { auto: true, author: this.author, [this.author]: { parents: [ data.id, // FIXME And also the corresponding LSP event id ], value: lineChangeTime } } }; DEBUG("Posting event", projectId, payload); await event.post(projectId, payload); } } } // Processing of this shot has already been completed. // The queue can now move forward. } catch (err) { ERROR("ReportLineChangeTime Error") ERROR(err); } finally { if (next) { next.isPending = false; } } } } async run (data) { if (!data || data.channel !== "event") { return; } if (!(data.payload?.new?.labels) && !(data.payload?.old?.labels)) { return; } const n = data.payload.new; const o = data.payload.old; if (!n?.labels?.includes("FSP") && !o?.labels?.includes("FSP")) { return; } if (this.queue.length < ReportLineChangeTime.MAX_QUEUE_SIZE) { this.queue.push({ ...data.payload, isPending: this.queue.length, }); } else { ALERT("ReportLineChangeTime queue full at", this.queue.length); } this.processQueue(); } } module.exports = ReportLineChangeTime;