const { schema2pid } = require('../../lib/db/connection'); const { event } = require('../../lib/db'); const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename); class DetectSoftStart { /* Data may come much faster than we can process it, so we put it * in a queue and process it at our own pace. * * The run() method fills the queue with the necessary data and then * calls processQueue(). * * The processQueue() method looks takes the first two elements in * the queue and processes them if they are not already being taken * care of by a previous processQueue() call – this will happen when * data is coming in faster than it can be processed. * * If the processQueue() call is the first to see the two bottommost * two elements, it will process them and, when finished, it will set * the `isPending` flag of the bottommost element to `false`, thus * letting the next call know that it has work to do. * * If the queue was empty, run() will set the `isPending` flag of its * first element to a falsy value, thus bootstrapping the process. */ static MAX_QUEUE_SIZE = 125000; queue = []; async processQueue () { DEBUG("Queue length", this.queue.length) while (this.queue.length > 1) { if (this.queue[0].isPending) { DEBUG("Queue busy"); setImmediate(() => this.processQueue()); return; } const prev = this.queue.shift(); const cur = this.queue[0]; try { // DEBUG("Previous", prev); // DEBUG("Current", cur); // TODO: // Consider whether to remember if soft start / full volume events // have already been emitted and wait until there is an online/offline // transition before re-emitting. // This may or may not be a good idea. // Look for a soft start or full volume event if (cur.num_active >= 1 && !prev.num_active && cur.num_active < cur.num_guns) { INFO("Soft start detected @", cur.tstamp); const projectId = await schema2pid(cur._schema ?? prev._schema); // TODO: Try and grab the corresponding comment from the configuration? const payload = { tstamp: cur.tstamp, remarks: "Soft start", labels: [ "Daily", "Guns", "Prod" ], meta: {auto: true, author: `*${this.constructor.name}*`} }; DEBUG("Posting event", projectId, payload); await event.post(projectId, payload); } else if (cur.num_active == cur.num_guns && prev.num_active < cur.num_active) { INFO("Full volume detected @", cur.tstamp); const projectId = await schema2pid(cur._schema ?? prev._schema); // TODO: Try and grab the corresponding comment from the configuration? const payload = { tstamp: cur.tstamp, remarks: "Full volume", labels: [ "Daily", "Guns", "Prod" ], meta: {auto: true, author: `*${this.constructor.name}*`} }; DEBUG("Posting event", projectId, payload); await event.post(projectId, payload); } // Processing of this shot has already been completed. // The queue can now move forward. } catch (err) { ERROR("DetectSoftStart Error") ERROR(err); } finally { cur.isPending = false; } } } async run (data) { if (!data || data.channel !== "realtime") { return; } if (!(data.payload && data.payload.new && data.payload.new.meta)) { return; } const meta = data.payload.new.meta; if (this.queue.length < DetectSoftStart.MAX_QUEUE_SIZE) { this.queue.push({ isPending: this.queue.length, _schema: meta._schema, tstamp: meta.tstamp ?? meta.time, shot: meta.shot, lineStatus: meta.lineStatus, _sequence: meta._sequence, _point: meta._point, lineName: meta.lineName, num_guns: meta.num_guns, num_active: meta.num_active }); } else { // FIXME Change to alert ALERT("DetectSoftStart queue full at", this.queue.length); } this.processQueue(); } } module.exports = DetectSoftStart;