const { schema2pid } = require('../../lib/db/connection'); const { event } = require('../../lib/db'); const { ALERT, ERROR, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename); /** Midnight shot detection. * * This event handler checks if there is an UTC date jump between consecutive * shots. If a jump is detected, it sends to new entries to the event log, for * the last shot and first shot of the previous and current dates, respectively. */ class DetectFDSP { /* Data may come much faster than we can process it, so we put it * in a queue and process it at our own pace. * * The run() method fills the queue with the necessary data and then * calls processQueue(). * * The processQueue() method looks at the first two elements in * the queue and processes them if they are not already being taken * care of by a previous processQueue() call – this will happen when * data is coming in faster than it can be processed. * * If the processQueue() call is the first to see the two bottommost * two elements, it will process them and, when finished, it will set * the `isPending` flag of the bottommost element to `false`, thus * letting the next call know that it has work to do. * * If the queue was empty, run() will set the `isPending` flag of its * first element to a falsy value, thus bootstrapping the process. */ static MAX_QUEUE_SIZE = 125000; queue = []; async processQueue () { DEBUG("Queue length", this.queue.length) while (this.queue.length > 1) { if (this.queue[0].isPending) { setImmediate(() => this.processQueue()); return; } const prev = this.queue.shift(); const cur = this.queue[0]; const sequence = Number(cur._sequence); try { if (prev.lineName == cur.lineName && prev._sequence == cur._sequence && prev.lineStatus == "online" && cur.lineStatus == "online" && sequence) { // DEBUG("Previous", prev); // DEBUG("Current", cur); if (prev.time.substr(0, 10) != cur.time.substr(0, 10)) { // Possible a date change, but could also be a missing timestamp // or something else. const ts0 = new Date(prev.time) const ts1 = new Date(cur.time); if (!isNaN(ts0) && !isNaN(ts1) && ts0.getUTCDay() != ts1.getUTCDay()) { INFO("Sequence shot across midnight UTC detected", cur._sequence, cur.lineName); const ldsp = { sequence: prev._sequence, point: prev._point, remarks: "Last shotpoint of the day", labels: ["LDSP", "Prod"], meta: {auto: true, insertedBy: this.constructor.name} }; const fdsp = { sequence: cur._sequence, point: cur._point, remarks: "First shotpoint of the day", labels: ["FDSP", "Prod"], meta: {auto: true, insertedBy: this.constructor.name} }; INFO("LDSP", ldsp); INFO("FDSP", fdsp); const projectId = await schema2pid(prev._schema); if (projectId) { await event.post(projectId, ldsp); await event.post(projectId, fdsp); } else { ERROR("projectId not found for", prev._schema); } } else { WARNING("False positive on these timestamps", prev.time, cur.time); WARNING("No events were created"); } } } // Processing of this shot has already been completed. // The queue can now move forward. } catch (err) { ERROR(err); } finally { cur.isPending = false; } } } async run (data) { if (!data || data.channel !== "realtime") { return; } if (!(data.payload && data.payload.new && data.payload.new.meta)) { return; } const meta = data.payload.new.meta; if (this.queue.length < DetectFDSP.MAX_QUEUE_SIZE) { const event = { isPending: this.queue.length, _schema: meta._schema, time: meta.time, lineStatus: meta.lineStatus, _sequence: meta._sequence, _point: meta._point, lineName: meta.lineName }; this.queue.push(event); // DEBUG("EVENT", event); } else { ALERT("Queue full at", this.queue.length); } this.processQueue(); } } module.exports = DetectFDSP;