diff --git a/lib/www/server/events/handlers/detect-soft-start.js b/lib/www/server/events/handlers/detect-soft-start.js new file mode 100644 index 0000000..184dc63 --- /dev/null +++ b/lib/www/server/events/handlers/detect-soft-start.js @@ -0,0 +1,128 @@ +const { schema2pid } = require('../../lib/db/connection'); +const { event } = require('../../lib/db'); +const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename); + +class DetectSoftStart { + /* Data may come much faster than we can process it, so we put it + * in a queue and process it at our own pace. + * + * The run() method fills the queue with the necessary data and then + * calls processQueue(). + * + * The processQueue() method looks takes the first two elements in + * the queue and processes them if they are not already being taken + * care of by a previous processQueue() call – this will happen when + * data is coming in faster than it can be processed. + * + * If the processQueue() call is the first to see the two bottommost + * two elements, it will process them and, when finished, it will set + * the `isPending` flag of the bottommost element to `false`, thus + * letting the next call know that it has work to do. + * + * If the queue was empty, run() will set the `isPending` flag of its + * first element to a falsy value, thus bootstrapping the process. + */ + static MAX_QUEUE_SIZE = 125000; + + queue = []; + + async processQueue () { + DEBUG("Queue length", this.queue.length) + while (this.queue.length > 1) { + if (this.queue[0].isPending) { + DEBUG("Queue busy"); + setImmediate(() => this.processQueue()); + return; + } + + const prev = this.queue.shift(); + const cur = this.queue[0]; + + try { + // DEBUG("Previous", prev); + // DEBUG("Current", cur); + + // TODO: + // Consider whether to remember if soft start / full volume events + // have already been emitted and wait until there is an online/offline + // transition before re-emitting. + // This may or may not be a good idea. + + // Look for a soft start or full volume event + if (cur.num_active >= 1 && !prev.num_active && cur.num_active < cur.num_guns) { + INFO("Soft start detected @", cur.tstamp); + + const projectId = await schema2pid(cur._schema ?? prev._schema); + + // TODO: Try and grab the corresponding comment from the configuration? + const payload = { + tstamp: cur.tstamp, + remarks: "Soft start", + labels: [ "Daily", "Guns", "Prod" ], + meta: { author: `*${this.constructor.name}*`} + }; + DEBUG("Posting event", projectId, payload); + await event.post(projectId, payload); + + } else if (cur.num_active == cur.num_guns && prev.num_active < cur.num_active) { + INFO("Full volume detected @", cur.tstamp); + + const projectId = await schema2pid(cur._schema ?? prev._schema); + + // TODO: Try and grab the corresponding comment from the configuration? + const payload = { + tstamp: cur.tstamp, + remarks: "Full volume", + labels: [ "Daily", "Guns", "Prod" ], + meta: { author: `*${this.constructor.name}*`} + }; + DEBUG("Posting event", projectId, payload); + await event.post(projectId, payload); + } + // Processing of this shot has already been completed. + // The queue can now move forward. + } catch (err) { + ERROR("DetectSoftStart Error") + ERROR(err); + } finally { + cur.isPending = false; + } + } + } + + async run (data) { + if (!data || data.channel !== "realtime") { + return; + } + + if (!(data.payload && data.payload.new && data.payload.new.meta)) { + return; + } + + const meta = data.payload.new.meta; + + if (this.queue.length < DetectSoftStart.MAX_QUEUE_SIZE) { + + this.queue.push({ + isPending: this.queue.length, + _schema: meta._schema, + tstamp: meta.tstamp ?? meta.time, + shot: meta.shot, + lineStatus: meta.lineStatus, + _sequence: meta._sequence, + _point: meta._point, + lineName: meta.lineName, + num_guns: meta.num_guns, + num_active: meta.num_active + }); + + } else { + // FIXME Change to alert + ALERT("DetectSoftStart queue full at", this.queue.length); + } + + this.processQueue(); + } +} + +module.exports = DetectSoftStart; diff --git a/lib/www/server/events/handlers/index.js b/lib/www/server/events/handlers/index.js index 5fe9436..b6825ac 100644 --- a/lib/www/server/events/handlers/index.js +++ b/lib/www/server/events/handlers/index.js @@ -1,5 +1,6 @@ const Handlers = [ require('./detect-soleol'), + require('./detect-soft-start'), require('./detect-fdsp') ]; diff --git a/lib/www/server/lib/db/navdata/save.js b/lib/www/server/lib/db/navdata/save.js index f1ece0b..5e1c494 100644 --- a/lib/www/server/lib/db/navdata/save.js +++ b/lib/www/server/lib/db/navdata/save.js @@ -290,12 +290,13 @@ async function save (navData, opts = {}) { } } else { // We are offline. We only assign _schema once every save_interval seconds at most + // unless there is gun data present. if (opts.offline_survey_heuristics == "nearest_preplot") { const now = Date.now(); const do_save = !opts.offline_survey_detect_interval || (now - last_tstamp) >= opts.offline_survey_detect_interval; - if (do_save) { + if (do_save || "guns" in navData?.payload) { const configs = await getAllProjectConfigs(); const candidates = await getCandidates(navData); const bestCandidate = await getNearestOfflinePreplot(candidates);