From 992205da4a7cf9148ac11a3d6c814a2dd93f7100 Mon Sep 17 00:00:00 2001 From: "D. Berge" Date: Sun, 15 May 2022 13:39:45 +0200 Subject: [PATCH] Add event handler for midnight shot detection. This event handler checks if there is an UTC date jump between consecutive shots. If a jump is detected, it sends to new entries to the event log, for the last shot and first shot of the previous and current dates, respectively. Fixes #223. --- lib/www/server/events/handlers/detect-fdsp.js | 143 ++++++++++++++++++ lib/www/server/events/handlers/index.js | 3 +- 2 files changed, 145 insertions(+), 1 deletion(-) create mode 100644 lib/www/server/events/handlers/detect-fdsp.js diff --git a/lib/www/server/events/handlers/detect-fdsp.js b/lib/www/server/events/handlers/detect-fdsp.js new file mode 100644 index 0000000..dec521d --- /dev/null +++ b/lib/www/server/events/handlers/detect-fdsp.js @@ -0,0 +1,143 @@ +const { schema2pid } = require('../../lib/db/connection'); +const { event } = require('../../lib/db'); +const { ALERT, ERROR, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename); + +/** Midnight shot detection. + * + * This event handler checks if there is an UTC date jump between consecutive + * shots. If a jump is detected, it sends to new entries to the event log, for + * the last shot and first shot of the previous and current dates, respectively. + */ +class DetectFDSP { + /* Data may come much faster than we can process it, so we put it + * in a queue and process it at our own pace. + * + * The run() method fills the queue with the necessary data and then + * calls processQueue(). + * + * The processQueue() method looks at the first two elements in + * the queue and processes them if they are not already being taken + * care of by a previous processQueue() call – this will happen when + * data is coming in faster than it can be processed. + * + * If the processQueue() call is the first to see the two bottommost + * two elements, it will process them and, when finished, it will set + * the `isPending` flag of the bottommost element to `false`, thus + * letting the next call know that it has work to do. + * + * If the queue was empty, run() will set the `isPending` flag of its + * first element to a falsy value, thus bootstrapping the process. + */ + static MAX_QUEUE_SIZE = 125000; + + queue = []; + + async processQueue () { + DEBUG("Queue length", this.queue.length) + while (this.queue.length > 1) { + + if (this.queue[0].isPending) { + setImmediate(() => this.processQueue()); + return; + } + + const prev = this.queue.shift(); + const cur = this.queue[0]; + + const sequence = Number(cur._sequence); + + try { + + if (prev.lineName == cur.lineName && prev._sequence == cur._sequence && + prev.lineStatus == "online" && cur.lineStatus == "online" && sequence) { + +// DEBUG("Previous", prev); +// DEBUG("Current", cur); + + if (prev.time.substr(0, 10) != cur.time.substr(0, 10)) { + // Possible a date change, but could also be a missing timestamp + // or something else. + + const ts0 = new Date(prev.time) + const ts1 = new Date(cur.time); + + if (!isNaN(ts0) && !isNaN(ts1) && ts0.getUTCDay() != ts1.getUTCDay()) { + INFO("Sequence shot across midnight UTC detected", cur._sequence, cur.lineName); + + const ldsp = { + sequence: prev._sequence, + point: prev._point, + remarks: "Last shotpoint of the day", + labels: ["LDSP", "Prod"], + meta: {auto: true, insertedBy: this.constructor.name} + }; + + const fdsp = { + sequence: cur._sequence, + point: cur._point, + remarks: "First shotpoint of the day", + labels: ["FDSP", "Prod"], + meta: {auto: true, insertedBy: this.constructor.name} + }; + + INFO("LDSP", ldsp); + INFO("FDSP", fdsp); + + const projectId = await schema2pid(prev._schema); + + if (projectId) { + await event.post(projectId, ldsp); + await event.post(projectId, fdsp); + } else { + ERROR("projectId not found for", prev._schema); + } + } else { + WARNING("False positive on these timestamps", prev.time, cur.time); + WARNING("No events were created"); + } + } + } + // Processing of this shot has already been completed. + // The queue can now move forward. + } catch (err) { + ERROR(err); + } finally { + cur.isPending = false; + } + } + } + + async run (data) { + if (!data || data.channel !== "realtime") { + return; + } + + if (!(data.payload && data.payload.new && data.payload.new.meta)) { + return; + } + + const meta = data.payload.new.meta; + + if (this.queue.length < DetectFDSP.MAX_QUEUE_SIZE) { + + const event = { + isPending: this.queue.length, + _schema: meta._schema, + time: meta.time, + lineStatus: meta.lineStatus, + _sequence: meta._sequence, + _point: meta._point, + lineName: meta.lineName + }; + this.queue.push(event); +// DEBUG("EVENT", event); + + } else { + ALERT("Queue full at", this.queue.length); + } + + this.processQueue(); + } +} + +module.exports = DetectFDSP; diff --git a/lib/www/server/events/handlers/index.js b/lib/www/server/events/handlers/index.js index 8739808..5fe9436 100644 --- a/lib/www/server/events/handlers/index.js +++ b/lib/www/server/events/handlers/index.js @@ -1,5 +1,6 @@ const Handlers = [ - require('./detect-soleol') + require('./detect-soleol'), + require('./detect-fdsp') ]; function init () {