mirror of
https://gitlab.com/wgp/dougal/software.git
synced 2025-12-06 11:07:08 +00:00
Add event handler for midnight shot detection.
This event handler checks if there is an UTC date jump between consecutive shots. If a jump is detected, it sends to new entries to the event log, for the last shot and first shot of the previous and current dates, respectively. Fixes #223.
This commit is contained in:
143
lib/www/server/events/handlers/detect-fdsp.js
Normal file
143
lib/www/server/events/handlers/detect-fdsp.js
Normal file
@@ -0,0 +1,143 @@
|
|||||||
|
const { schema2pid } = require('../../lib/db/connection');
|
||||||
|
const { event } = require('../../lib/db');
|
||||||
|
const { ALERT, ERROR, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
|
||||||
|
|
||||||
|
/** Midnight shot detection.
|
||||||
|
*
|
||||||
|
* This event handler checks if there is an UTC date jump between consecutive
|
||||||
|
* shots. If a jump is detected, it sends to new entries to the event log, for
|
||||||
|
* the last shot and first shot of the previous and current dates, respectively.
|
||||||
|
*/
|
||||||
|
class DetectFDSP {
|
||||||
|
/* Data may come much faster than we can process it, so we put it
|
||||||
|
* in a queue and process it at our own pace.
|
||||||
|
*
|
||||||
|
* The run() method fills the queue with the necessary data and then
|
||||||
|
* calls processQueue().
|
||||||
|
*
|
||||||
|
* The processQueue() method looks at the first two elements in
|
||||||
|
* the queue and processes them if they are not already being taken
|
||||||
|
* care of by a previous processQueue() call – this will happen when
|
||||||
|
* data is coming in faster than it can be processed.
|
||||||
|
*
|
||||||
|
* If the processQueue() call is the first to see the two bottommost
|
||||||
|
* two elements, it will process them and, when finished, it will set
|
||||||
|
* the `isPending` flag of the bottommost element to `false`, thus
|
||||||
|
* letting the next call know that it has work to do.
|
||||||
|
*
|
||||||
|
* If the queue was empty, run() will set the `isPending` flag of its
|
||||||
|
* first element to a falsy value, thus bootstrapping the process.
|
||||||
|
*/
|
||||||
|
static MAX_QUEUE_SIZE = 125000;
|
||||||
|
|
||||||
|
queue = [];
|
||||||
|
|
||||||
|
async processQueue () {
|
||||||
|
DEBUG("Queue length", this.queue.length)
|
||||||
|
while (this.queue.length > 1) {
|
||||||
|
|
||||||
|
if (this.queue[0].isPending) {
|
||||||
|
setImmediate(() => this.processQueue());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const prev = this.queue.shift();
|
||||||
|
const cur = this.queue[0];
|
||||||
|
|
||||||
|
const sequence = Number(cur._sequence);
|
||||||
|
|
||||||
|
try {
|
||||||
|
|
||||||
|
if (prev.lineName == cur.lineName && prev._sequence == cur._sequence &&
|
||||||
|
prev.lineStatus == "online" && cur.lineStatus == "online" && sequence) {
|
||||||
|
|
||||||
|
// DEBUG("Previous", prev);
|
||||||
|
// DEBUG("Current", cur);
|
||||||
|
|
||||||
|
if (prev.time.substr(0, 10) != cur.time.substr(0, 10)) {
|
||||||
|
// Possible a date change, but could also be a missing timestamp
|
||||||
|
// or something else.
|
||||||
|
|
||||||
|
const ts0 = new Date(prev.time)
|
||||||
|
const ts1 = new Date(cur.time);
|
||||||
|
|
||||||
|
if (!isNaN(ts0) && !isNaN(ts1) && ts0.getUTCDay() != ts1.getUTCDay()) {
|
||||||
|
INFO("Sequence shot across midnight UTC detected", cur._sequence, cur.lineName);
|
||||||
|
|
||||||
|
const ldsp = {
|
||||||
|
sequence: prev._sequence,
|
||||||
|
point: prev._point,
|
||||||
|
remarks: "Last shotpoint of the day",
|
||||||
|
labels: ["LDSP", "Prod"],
|
||||||
|
meta: {auto: true, insertedBy: this.constructor.name}
|
||||||
|
};
|
||||||
|
|
||||||
|
const fdsp = {
|
||||||
|
sequence: cur._sequence,
|
||||||
|
point: cur._point,
|
||||||
|
remarks: "First shotpoint of the day",
|
||||||
|
labels: ["FDSP", "Prod"],
|
||||||
|
meta: {auto: true, insertedBy: this.constructor.name}
|
||||||
|
};
|
||||||
|
|
||||||
|
INFO("LDSP", ldsp);
|
||||||
|
INFO("FDSP", fdsp);
|
||||||
|
|
||||||
|
const projectId = await schema2pid(prev._schema);
|
||||||
|
|
||||||
|
if (projectId) {
|
||||||
|
await event.post(projectId, ldsp);
|
||||||
|
await event.post(projectId, fdsp);
|
||||||
|
} else {
|
||||||
|
ERROR("projectId not found for", prev._schema);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
WARNING("False positive on these timestamps", prev.time, cur.time);
|
||||||
|
WARNING("No events were created");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Processing of this shot has already been completed.
|
||||||
|
// The queue can now move forward.
|
||||||
|
} catch (err) {
|
||||||
|
ERROR(err);
|
||||||
|
} finally {
|
||||||
|
cur.isPending = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async run (data) {
|
||||||
|
if (!data || data.channel !== "realtime") {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!(data.payload && data.payload.new && data.payload.new.meta)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const meta = data.payload.new.meta;
|
||||||
|
|
||||||
|
if (this.queue.length < DetectFDSP.MAX_QUEUE_SIZE) {
|
||||||
|
|
||||||
|
const event = {
|
||||||
|
isPending: this.queue.length,
|
||||||
|
_schema: meta._schema,
|
||||||
|
time: meta.time,
|
||||||
|
lineStatus: meta.lineStatus,
|
||||||
|
_sequence: meta._sequence,
|
||||||
|
_point: meta._point,
|
||||||
|
lineName: meta.lineName
|
||||||
|
};
|
||||||
|
this.queue.push(event);
|
||||||
|
// DEBUG("EVENT", event);
|
||||||
|
|
||||||
|
} else {
|
||||||
|
ALERT("Queue full at", this.queue.length);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.processQueue();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = DetectFDSP;
|
||||||
@@ -1,5 +1,6 @@
|
|||||||
const Handlers = [
|
const Handlers = [
|
||||||
require('./detect-soleol')
|
require('./detect-soleol'),
|
||||||
|
require('./detect-fdsp')
|
||||||
];
|
];
|
||||||
|
|
||||||
function init () {
|
function init () {
|
||||||
|
|||||||
Reference in New Issue
Block a user