mirror of
https://gitlab.com/wgp/dougal/software.git
synced 2025-12-06 11:37:08 +00:00
Merge branch '136-add-line-change-time-log-pseudoevent' into 'devel'
Resolve "Add line change time log pseudoevent" Closes #136 See merge request wgp/dougal/software!45
This commit is contained in:
@@ -1,6 +1,7 @@
|
|||||||
const Handlers = [
|
const Handlers = [
|
||||||
require('./detect-soleol'),
|
require('./detect-soleol'),
|
||||||
require('./detect-soft-start'),
|
require('./detect-soft-start'),
|
||||||
|
require('./report-line-change-time'),
|
||||||
require('./detect-fdsp')
|
require('./detect-fdsp')
|
||||||
];
|
];
|
||||||
|
|
||||||
|
|||||||
304
lib/www/server/events/handlers/report-line-change-time.js
Normal file
304
lib/www/server/events/handlers/report-line-change-time.js
Normal file
@@ -0,0 +1,304 @@
|
|||||||
|
const { schema2pid } = require('../../lib/db/connection');
|
||||||
|
const { event, project } = require('../../lib/db');
|
||||||
|
const { withinValidity } = require('../../lib/utils/ranges');
|
||||||
|
const unique = require('../../lib/utils/unique');
|
||||||
|
const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
|
||||||
|
|
||||||
|
class ReportLineChangeTime {
|
||||||
|
/* Data may come much faster than we can process it, so we put it
|
||||||
|
* in a queue and process it at our own pace.
|
||||||
|
*
|
||||||
|
* The run() method fills the queue with the necessary data and then
|
||||||
|
* calls processQueue().
|
||||||
|
*
|
||||||
|
* The processQueue() method looks takes the first two elements in
|
||||||
|
* the queue and processes them if they are not already being taken
|
||||||
|
* care of by a previous processQueue() call – this will happen when
|
||||||
|
* data is coming in faster than it can be processed.
|
||||||
|
*
|
||||||
|
* If the processQueue() call is the first to see the two bottommost
|
||||||
|
* two elements, it will process them and, when finished, it will set
|
||||||
|
* the `isPending` flag of the bottommost element to `false`, thus
|
||||||
|
* letting the next call know that it has work to do.
|
||||||
|
*
|
||||||
|
* If the queue was empty, run() will set the `isPending` flag of its
|
||||||
|
* first element to a falsy value, thus bootstrapping the process.
|
||||||
|
*/
|
||||||
|
static MAX_QUEUE_SIZE = 125000;
|
||||||
|
|
||||||
|
queue = [];
|
||||||
|
|
||||||
|
author = `*${this.constructor.name}*`;
|
||||||
|
|
||||||
|
async processQueue () {
|
||||||
|
DEBUG("Queue length", this.queue.length)
|
||||||
|
while (this.queue.length > 0) {
|
||||||
|
if (this.queue[0].isPending) {
|
||||||
|
DEBUG("Queue busy");
|
||||||
|
setTimeout(() => this.processQueue(), 1000); // We're not in a hurry
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const cur = this.queue.shift();
|
||||||
|
const next = this.queue[0];
|
||||||
|
const projectId = cur.pid;
|
||||||
|
// Are we being called because of a LGSP or because of a FGSP?
|
||||||
|
const forward = (cur.old?.labels?.includes("LGSP") || cur.new?.labels?.includes("LGSP"));
|
||||||
|
|
||||||
|
if (!projectId) {
|
||||||
|
throw {message: "No projectID found in event", cur};
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
async function getConfiguration (projectId) {
|
||||||
|
return await project.configuration.get(projectId);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
async function getLineChangeTime (data, forward = false) {
|
||||||
|
if (forward) {
|
||||||
|
const ospEvents = await event.list(projectId, {label: "FGSP"});
|
||||||
|
// DEBUG("ospEvents", ospEvents);
|
||||||
|
const osp = ospEvents.filter(i => i.tstamp > data.tstamp).pop();
|
||||||
|
DEBUG("fsp", osp);
|
||||||
|
// DEBUG("data", data);
|
||||||
|
|
||||||
|
if (osp) {
|
||||||
|
DEBUG("lineChangeTime", osp.tstamp - data.tstamp);
|
||||||
|
return { lineChangeTime: osp.tstamp - data.tstamp, osp };
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
const ospEvents = await event.list(projectId, {label: "LGSP"});
|
||||||
|
// DEBUG("ospEvents", ospEvents);
|
||||||
|
const osp = ospEvents.filter(i => i.tstamp < data.tstamp).shift();
|
||||||
|
DEBUG("lsp", osp);
|
||||||
|
// DEBUG("data", data);
|
||||||
|
|
||||||
|
if (osp) {
|
||||||
|
DEBUG("lineChangeTime", data.tstamp - osp.tstamp);
|
||||||
|
return { lineChangeTime: data.tstamp - osp.tstamp, osp };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function parseInterval (dt) {
|
||||||
|
const daySeconds = (dt/1000) % 86400;
|
||||||
|
const d = Math.floor((dt/1000) / 86400);
|
||||||
|
const dateObject = new Date(null);
|
||||||
|
dateObject.setSeconds(daySeconds);
|
||||||
|
const [ h, m, s ] = dateObject.toISOString().slice(11, 19).split(":").map(Number);
|
||||||
|
return {d, h, m, s};
|
||||||
|
}
|
||||||
|
|
||||||
|
function formatInterval (i) {
|
||||||
|
let str = "";
|
||||||
|
for (let [k, v] of Object.entries(i)) {
|
||||||
|
if (v) {
|
||||||
|
str += " " + v + " " + k;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return str.trim();
|
||||||
|
}
|
||||||
|
|
||||||
|
const deleteStaleEvents = async (seq) => {
|
||||||
|
if (seq) {
|
||||||
|
DEBUG("Will delete lct events related to sequence(s)", seq);
|
||||||
|
|
||||||
|
const jpq = `$."${this.author}"`;
|
||||||
|
|
||||||
|
const opts = {jpq};
|
||||||
|
|
||||||
|
if (Array.isArray(seq)) {
|
||||||
|
opts.sequences = unique(seq).filter(i => !!i);
|
||||||
|
} else {
|
||||||
|
opts.sequence = seq;
|
||||||
|
}
|
||||||
|
|
||||||
|
const staleEvents = await event.list(projectId, opts);
|
||||||
|
DEBUG(staleEvents.length ?? 0, "events to delete");
|
||||||
|
for (let staleEvent of staleEvents) {
|
||||||
|
DEBUG(`Deleting event id ${staleEvent.id} (seq = ${staleEvent.sequence}, point = ${staleEvent.point})`);
|
||||||
|
await event.del(projectId, staleEvent.id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const createLineChangeTimeEvents = async (lineChangeTime, data, osp) => {
|
||||||
|
|
||||||
|
const events = [];
|
||||||
|
const cfg = (await project.configuration.get(projectId));
|
||||||
|
const nlcd = cfg?.production?.nominalLineChangeDuration * 60*1000; // m → ms
|
||||||
|
DEBUG("nlcd", nlcd);
|
||||||
|
if (nlcd && lineChangeTime > nlcd) {
|
||||||
|
const excess = lineChangeTime-nlcd;
|
||||||
|
const excessString = formatInterval(parseInterval(excess));
|
||||||
|
DEBUG("excess", excess, excessString);
|
||||||
|
|
||||||
|
// ref: The later of the two events
|
||||||
|
const ref = forward ? osp : data;
|
||||||
|
const payload = {
|
||||||
|
// tstamp: new Date(ref.tstamp-1),
|
||||||
|
sequence: ref.sequence,
|
||||||
|
point: ref.point,
|
||||||
|
remarks: `_Nominal line change duration exceeded by ${excessString}_`,
|
||||||
|
labels: [ "Nav", "Prod" ],
|
||||||
|
meta: {
|
||||||
|
auto: true,
|
||||||
|
author: this.author,
|
||||||
|
[this.author]: {
|
||||||
|
parents: [
|
||||||
|
data.id,
|
||||||
|
osp.id
|
||||||
|
],
|
||||||
|
type: "excess",
|
||||||
|
value: excess
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
events.push(payload);
|
||||||
|
DEBUG("Created line change duration exceeded event", projectId, payload);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
const lctString = formatInterval(parseInterval(lineChangeTime));
|
||||||
|
|
||||||
|
// ref: The later of the two events
|
||||||
|
const ref = forward ? osp : data;
|
||||||
|
const payload = {
|
||||||
|
// tstamp: new Date(ref.tstamp-1),
|
||||||
|
sequence: ref.sequence,
|
||||||
|
point: ref.point,
|
||||||
|
remarks: `Line change time: ${lctString}`,
|
||||||
|
labels: [ "Nav", "Prod" ],
|
||||||
|
meta: {
|
||||||
|
auto: true,
|
||||||
|
author: this.author,
|
||||||
|
[this.author]: {
|
||||||
|
parents: [
|
||||||
|
data.id,
|
||||||
|
osp.id
|
||||||
|
],
|
||||||
|
type: "lineChangeTime",
|
||||||
|
value: lineChangeTime
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
events.push(payload);
|
||||||
|
DEBUG("Created line change duration event", projectId, payload);
|
||||||
|
|
||||||
|
return events;
|
||||||
|
}
|
||||||
|
|
||||||
|
const maybePostEvent = async (projectId, payload) => {
|
||||||
|
DEBUG("Posting event", projectId, payload);
|
||||||
|
await event.post(projectId, payload);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
// DEBUG("Previous", prev);
|
||||||
|
DEBUG("Current", cur);
|
||||||
|
DEBUG("Forward search", forward);
|
||||||
|
|
||||||
|
// We have these scenarios to consider:
|
||||||
|
// INSERT:
|
||||||
|
// `old` will be NULL
|
||||||
|
// Add event with line change time:
|
||||||
|
// - match validity with `new`
|
||||||
|
// - meta.ReportLineChangeTime.link refers to new.uid (or new.id?)
|
||||||
|
// UPDATE:
|
||||||
|
// `old` is not NULL
|
||||||
|
// `new` is not NULL
|
||||||
|
// Delete previous event from event_log (not event_log_full)
|
||||||
|
// Add event with line change time:
|
||||||
|
// - match validity with `new`
|
||||||
|
// - meta.ReportLineChangeTime.link refers to new.uid (or new.id?)
|
||||||
|
// DELETE:
|
||||||
|
// `old` is not NULL
|
||||||
|
// `new` will be NULL
|
||||||
|
// Delete previous event from event_log (not event_log_full)
|
||||||
|
|
||||||
|
await deleteStaleEvents([cur.old?.sequence, cur.new?.sequence]);
|
||||||
|
|
||||||
|
if (cur.operation == "INSERT") {
|
||||||
|
// NOTE: UPDATE on the event_log view translates to one UPDATE plus one INSERT
|
||||||
|
// on event_log_full, so we don't need to worry about UPDATE here.
|
||||||
|
const data = cur.new;
|
||||||
|
DEBUG("INSERT seen: will add lct events related to ", data.id);
|
||||||
|
|
||||||
|
if (withinValidity(data.validity)) {
|
||||||
|
DEBUG("Event within validity period", data.validity, new Date());
|
||||||
|
|
||||||
|
data.tstamp = new Date(data.tstamp);
|
||||||
|
const { lineChangeTime, osp } = await getLineChangeTime(data, forward);
|
||||||
|
|
||||||
|
if (lineChangeTime) {
|
||||||
|
|
||||||
|
const events = await createLineChangeTimeEvents(lineChangeTime, data, osp);
|
||||||
|
|
||||||
|
if (events?.length) {
|
||||||
|
DEBUG("Deleting other events for sequence", events[0].sequence);
|
||||||
|
await deleteStaleEvents(events[0].sequence);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (let payload of events) {
|
||||||
|
await maybePostEvent(projectId, payload);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
DEBUG("Event outside of validity range", data.validity, "lct events not inserted");
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// Processing of this shot has already been completed.
|
||||||
|
// The queue can now move forward.
|
||||||
|
} catch (err) {
|
||||||
|
ERROR("ReportLineChangeTime Error")
|
||||||
|
ERROR(err);
|
||||||
|
} finally {
|
||||||
|
if (next) {
|
||||||
|
next.isPending = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async run (data) {
|
||||||
|
DEBUG("Seen", data);
|
||||||
|
if (!data || data.channel !== "event") {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!(data.payload?.new?.labels) && !(data.payload?.old?.labels)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const n = data.payload.new;
|
||||||
|
const o = data.payload.old;
|
||||||
|
|
||||||
|
if (!n?.labels?.includes("FGSP") && !o?.labels?.includes("FGSP") &&
|
||||||
|
!n?.labels?.includes("LGSP") && !o?.labels?.includes("LGSP")) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.queue.length < ReportLineChangeTime.MAX_QUEUE_SIZE) {
|
||||||
|
|
||||||
|
const item = {
|
||||||
|
...structuredClone(data.payload),
|
||||||
|
isPending: this.queue.length,
|
||||||
|
};
|
||||||
|
DEBUG("Queueing", item);
|
||||||
|
this.queue.push(item);
|
||||||
|
|
||||||
|
} else {
|
||||||
|
ALERT("ReportLineChangeTime queue full at", this.queue.length);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.processQueue();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = ReportLineChangeTime;
|
||||||
@@ -11,7 +11,9 @@ async function list (projectId, opts = {}) {
|
|||||||
const limit = Math.abs(Number(opts.itemsPerPage)) || null;
|
const limit = Math.abs(Number(opts.itemsPerPage)) || null;
|
||||||
|
|
||||||
const sequence = opts.sequence && Number(opts.sequence) || null;
|
const sequence = opts.sequence && Number(opts.sequence) || null;
|
||||||
const sequences = opts.sequences && opts.sequences.split(/[^0-9]+/).map(v => Number(v)) || null;
|
const sequences = opts.sequences && (Array.isArray(opts.sequences)
|
||||||
|
? opts.sequences.map(Number)
|
||||||
|
: opts.sequences.split(/[^0-9]+/).map(Number)) || null;
|
||||||
const date0 = opts.date0 ?? null;
|
const date0 = opts.date0 ?? null;
|
||||||
const date1 = opts.date1 ?? null;
|
const date1 = opts.date1 ?? null;
|
||||||
const jpq = opts.jpq || null;
|
const jpq = opts.jpq || null;
|
||||||
|
|||||||
@@ -7,5 +7,6 @@ module.exports = {
|
|||||||
deepMerge: require('./deepMerge'),
|
deepMerge: require('./deepMerge'),
|
||||||
removeNulls: require('./removeNulls'),
|
removeNulls: require('./removeNulls'),
|
||||||
logicalPath: require('./logicalPath'),
|
logicalPath: require('./logicalPath'),
|
||||||
ranges: require('./ranges')
|
ranges: require('./ranges'),
|
||||||
|
unique: require('./unique')
|
||||||
};
|
};
|
||||||
|
|||||||
6
lib/www/server/lib/utils/unique.js
Normal file
6
lib/www/server/lib/utils/unique.js
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
|
||||||
|
function unique(array) {
|
||||||
|
return [...new Set(array)];
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = unique;
|
||||||
Reference in New Issue
Block a user