Rewrite events listener and handlers.

The events listener now uses a proper self-consuming queue and
the event handlers have been rewritten accordingly.

The way this works is that running init() on the handlers
library instantiates the handlers and returns two higher-order
functions, prepare() and despatch(). A call to the latter of
these is appended to the queue with each new incoming event.

The handlers have access to a context object (ctx) which may be
used to persist data between calls and/or exchange data between
handlers. This is used notably to give the handlers access to
project configurations, which are themselves refreshed by a
project configuration change handler (DetectProjectConfigurationChange).
This commit is contained in:
D. Berge
2023-10-14 20:53:42 +02:00
parent c1e35b2459
commit 6a21ddd1cd
7 changed files with 351 additions and 503 deletions

View File

@@ -3,130 +3,15 @@ const { event } = require('../../lib/db');
const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
class DetectSOLEOL {
/* Data may come much faster than we can process it, so we put it
* in a queue and process it at our own pace.
*
* The run() method fills the queue with the necessary data and then
* calls processQueue().
*
* The processQueue() method looks takes the first two elements in
* the queue and processes them if they are not already being taken
* care of by a previous processQueue() call this will happen when
* data is coming in faster than it can be processed.
*
* If the processQueue() call is the first to see the two bottommost
* two elements, it will process them and, when finished, it will set
* the `isPending` flag of the bottommost element to `false`, thus
* letting the next call know that it has work to do.
*
* If the queue was empty, run() will set the `isPending` flag of its
* first element to a falsy value, thus bootstrapping the process.
*/
static MAX_QUEUE_SIZE = 125000;
queue = [];
author = `*${this.constructor.name}*`;
prev = null;
async processQueue () {
DEBUG("Queue length", this.queue.length)
while (this.queue.length > 1) {
if (this.queue[0].isPending) {
DEBUG("Queue busy");
setImmediate(() => this.processQueue());
return;
}
const prev = this.queue.shift();
const cur = this.queue[0];
const sequence = Number(cur._sequence);
try {
DEBUG("Sequence", sequence);
// DEBUG("Previous", prev);
// DEBUG("Current", cur);
if (prev.lineName == cur.lineName && prev._sequence == cur._sequence &&
prev.lineStatus != "online" && cur.lineStatus == "online" && sequence) {
INFO("Transition to ONLINE detected");
// DEBUG(cur);
// DEBUG(prev);
// console.log("TRANSITION TO ONLINE", prev, cur);
// Check if there are already FSP, FGSP events for this sequence
const projectId = await schema2pid(cur._schema);
const sequenceEvents = await event.list(projectId, {sequence});
const labels = ["FSP", "FGSP"].filter(l => !sequenceEvents.find(i => i.labels.includes(l)));
if (labels.includes("FSP")) {
// At this point labels contains either FSP only or FSP + FGSP,
// depending on whether a FGSP event has already been entered.
const remarks = `SEQ ${cur._sequence}, SOL ${cur.lineName}, BSP: ${(cur.speed*3.6/1.852).toFixed(1)} kt, Water depth: ${Number(cur.waterDepth).toFixed(0)} m.`;
const payload = {
type: "sequence",
sequence,
point: cur._point,
remarks,
labels,
meta: {auto: true, author: `*${this.constructor.name}*`}
}
// console.log(projectId, payload);
INFO("Posting event", projectId, payload);
await event.post(projectId, payload);
} else {
// A first shot point has been already entered in the log,
// so we have nothing to do here.
INFO("FSP already in the log. Doing nothing");
}
} else if (prev.lineStatus == "online" && cur.lineStatus != "online") {
INFO("Transition to OFFLINE detected");
// DEBUG(cur);
// DEBUG(prev);
// console.log("TRANSITION TO OFFLINE", prev, cur);
// Check if there are already LSP, LGSP events for this sequence
const projectId = await schema2pid(prev._schema);
const sequenceEvents = await event.list(projectId, {sequence});
const labels = ["LSP", "LGSP"].filter(l => !sequenceEvents.find(i => i.labels.includes(l)));
if (labels.includes("LSP")) {
// At this point labels contains either LSP only or LSP + LGSP,
// depending on whether a LGSP event has already been entered.
const remarks = `SEQ ${prev._sequence}, EOL ${prev.lineName}, BSP: ${(prev.speed*3.6/1.852).toFixed(1)} kt, Water depth: ${Number(prev.waterDepth).toFixed(0)} m.`;
const payload = {
type: "sequence",
sequence,
point: prev._point,
remarks,
labels,
meta: {auto: true, author: `*${this.constructor.name}*`}
}
// console.log(projectId, payload);
INFO("Posting event", projectId, payload);
await event.post(projectId, payload);
} else {
// A first shot point has been already entered in the log,
// so we have nothing to do here.
INFO("LSP already in the log. Doing nothing");
}
}
// Processing of this shot has already been completed.
// The queue can now move forward.
} catch (err) {
console.error("DetectSOLEOL Error")
console.log(err);
} finally {
cur.isPending = false;
}
}
constructor () {
DEBUG(`${this.author} instantiated`);
}
async run (data) {
async run (data, ctx) {
if (!data || data.channel !== "realtime") {
return;
}
@@ -135,30 +20,69 @@ class DetectSOLEOL {
return;
}
const meta = data.payload.new.meta;
if (this.queue.length < DetectSOLEOL.MAX_QUEUE_SIZE) {
this.queue.push({
isPending: this.queue.length,
_schema: meta._schema,
time: meta.time,
shot: meta.shot,
lineStatus: meta.lineStatus,
_sequence: meta._sequence,
_point: meta._point,
lineName: meta.lineName,
speed: meta.speed,
waterDepth: meta.waterDepth
});
} else {
// FIXME Change to alert
console.error("DetectSOLEOL queue full at", this.queue.length);
if (!this.prev) {
DEBUG("Initialising `prev`");
this.prev = data;
return;
}
this.processQueue();
try {
DEBUG("Running");
// DEBUG("%j", data);
const cur = data?.payload?.new?.meta;
const prev = this.prev?.payload?.new?.meta;
const sequence = Number(cur._sequence);
// DEBUG("%j", prev);
// DEBUG("%j", cur);
DEBUG("prv.lineName: %s\ncur.lineName: %s\nprv._sequence: %s\ncur._sequence: %s\nprv.lineStatus: %s\ncur.lineStatus: %s", prev.lineName, cur.lineName, prev._sequence, cur._sequence, prev.lineStatus, cur.lineStatus);
if (prev.lineName == cur.lineName && prev._sequence == cur._sequence &&
prev.lineStatus != "online" && cur.lineStatus == "online" && sequence) {
INFO("Transition to ONLINE detected");
// We must use schema2pid because the pid may not have been
// populated for this event.
const projectId = await schema2pid(cur._schema ?? prev._schema);
const labels = ["FSP", "FGSP"];
const remarks = `SEQ ${cur._sequence}, SOL ${cur.lineName}, BSP: ${(cur.speed*3.6/1.852).toFixed(1)} kt, Water depth: ${Number(cur.waterDepth).toFixed(0)} m.`;
const payload = {
type: "sequence",
sequence,
point: cur._point,
remarks,
labels,
meta: {auto: true, author: `*${this.constructor.name}*`}
}
INFO("Posting event", projectId, payload);
await event.post(projectId, payload);
} else if (prev.lineName == cur.lineName && prev._sequence == cur._sequence &&
prev.lineStatus == "online" && cur.lineStatus != "online" && sequence) {
INFO("Transition to OFFLINE detected");
const projectId = await schema2pid(prev._schema ?? cur._schema);
const labels = ["LSP", "LGSP"];
const remarks = `SEQ ${cur._sequence}, EOL ${cur.lineName}, BSP: ${(cur.speed*3.6/1.852).toFixed(1)} kt, Water depth: ${Number(cur.waterDepth).toFixed(0)} m.`;
const payload = {
type: "sequence",
sequence,
point: cur._point,
remarks,
labels,
meta: {auto: true, author: `*${this.constructor.name}*`}
}
INFO("Posting event", projectId, payload);
await event.post(projectId, payload);
}
} catch (err) {
DEBUG(`${this.author} error`, err);
throw err;
} finally {
this.prev = data;
}
}
}
module.exports = DetectSOLEOL;