mirror of
https://gitlab.com/wgp/dougal/software.git
synced 2025-12-06 11:17:08 +00:00
Refactor SOL/EOL real-time detection handler.
This also implements a generic handler mechanism that can be reused for other purposes, such as sending email / XMPP notifications, doing real-time QC checks and so on. Fixes #113.
This commit is contained in:
146
lib/www/server/events/handlers/detect-soleol.js
Normal file
146
lib/www/server/events/handlers/detect-soleol.js
Normal file
@@ -0,0 +1,146 @@
|
||||
const { schema2pid } = require('../../lib/db/connection');
|
||||
const { event } = require('../../lib/db');
|
||||
|
||||
class DetectSOLEOL {
|
||||
/* Data may come much faster than we can process it, so we put it
|
||||
* in a queue and process it at our own pace.
|
||||
*
|
||||
* The run() method fills the queue with the necessary data and then
|
||||
* calls processQueue().
|
||||
*
|
||||
* The processQueue() method looks takes the first two elements in
|
||||
* the queue and processes them if they are not already being taken
|
||||
* care of by a previous processQueue() call – this will happen when
|
||||
* data is coming in faster than it can be processed.
|
||||
*
|
||||
* If the processQueue() call is the first to see the two bottommost
|
||||
* two elements, it will process them and, when finished, it will set
|
||||
* the `isPending` flag of the bottommost element to `false`, thus
|
||||
* letting the next call know that it has work to do.
|
||||
*
|
||||
* If the queue was empty, run() will set the `isPending` flag of its
|
||||
* first element to a falsy value, thus bootstrapping the process.
|
||||
*/
|
||||
static MAX_QUEUE_SIZE = 125000;
|
||||
|
||||
queue = [];
|
||||
|
||||
async processQueue () {
|
||||
while (this.queue.length > 1) {
|
||||
if (this.queue[0].isPending) {
|
||||
setImmediate(() => this.processQueue());
|
||||
return;
|
||||
}
|
||||
|
||||
const prev = this.queue.shift();
|
||||
const cur = this.queue[0];
|
||||
|
||||
const sequence = Number(cur._sequence);
|
||||
|
||||
try {
|
||||
|
||||
if (prev.lineName == cur.lineName && prev._sequence == cur._sequence &&
|
||||
prev.lineStatus != "online" && cur.lineStatus == "online" && sequence) {
|
||||
// console.log("TRANSITION TO ONLINE", prev, cur);
|
||||
|
||||
// Check if there are already FSP, FGSP events for this sequence
|
||||
const projectId = await schema2pid(cur._schema);
|
||||
const sequenceEvents = await event.list(projectId, {sequence});
|
||||
|
||||
const labels = ["FSP", "FGSP"].filter(l => !sequenceEvents.find(i => i.labels.includes(l)));
|
||||
|
||||
if (labels.includes("FSP")) {
|
||||
// At this point labels contains either FSP only or FSP + FGSP,
|
||||
// depending on whether a FGSP event has already been entered.
|
||||
|
||||
const remarks = `SEQ ${cur._sequence}, SOL ${cur.lineName}, BSP: ${(cur.speed*3.6/1.852).toFixed(1)} kt, Water depth: ${Number(cur.waterDepth).toFixed(0)} m.`;
|
||||
const payload = {
|
||||
type: "sequence",
|
||||
sequence,
|
||||
point: cur._point,
|
||||
remarks,
|
||||
labels
|
||||
}
|
||||
|
||||
// console.log(projectId, payload);
|
||||
await event.post(projectId, payload);
|
||||
} else {
|
||||
// A first shot point has been already entered in the log,
|
||||
// so we have nothing to do here.
|
||||
}
|
||||
} else if (prev.lineStatus == "online" && cur.lineStatus != "online") {
|
||||
// console.log("TRANSITION TO OFFLINE", prev, cur);
|
||||
|
||||
// Check if there are already LSP, LGSP events for this sequence
|
||||
const projectId = await schema2pid(prev._schema);
|
||||
const sequenceEvents = await event.list(projectId, {sequence});
|
||||
|
||||
const labels = ["LSP", "LGSP"].filter(l => !sequenceEvents.find(i => i.labels.includes(l)));
|
||||
|
||||
if (labels.includes("LSP")) {
|
||||
// At this point labels contains either LSP only or LSP + LGSP,
|
||||
// depending on whether a LGSP event has already been entered.
|
||||
|
||||
const remarks = `SEQ ${prev._sequence}, EOL ${prev.lineName}, BSP: ${(prev.speed*3.6/1.852).toFixed(1)} kt, Water depth: ${Number(prev.waterDepth).toFixed(0)} m.`;
|
||||
const payload = {
|
||||
type: "sequence",
|
||||
sequence,
|
||||
point: prev._point,
|
||||
remarks,
|
||||
labels
|
||||
}
|
||||
|
||||
// console.log(projectId, payload);
|
||||
await event.post(projectId, payload);
|
||||
} else {
|
||||
// A first shot point has been already entered in the log,
|
||||
// so we have nothing to do here.
|
||||
}
|
||||
}
|
||||
// Processing of this shot has already been completed.
|
||||
// The queue can now move forward.
|
||||
} catch (err) {
|
||||
console.error("DetectSOLEOL Error")
|
||||
console.log(err);
|
||||
} finally {
|
||||
cur.isPending = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async run (data) {
|
||||
if (!data || !data.channel === "realtime") {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!(data.payload && data.payload.new && data.payload.new.meta)) {
|
||||
return;
|
||||
}
|
||||
|
||||
const meta = data.payload.new.meta;
|
||||
|
||||
if (this.queue.length < DetectSOLEOL.MAX_QUEUE_SIZE) {
|
||||
|
||||
this.queue.push({
|
||||
isPending: this.queue.length,
|
||||
_schema: meta._schema,
|
||||
time: meta.time,
|
||||
shot: meta.shot,
|
||||
lineStatus: meta.lineStatus,
|
||||
_sequence: meta._sequence,
|
||||
_point: meta._point,
|
||||
lineName: meta.lineName,
|
||||
speed: meta.speed,
|
||||
waterDepth: meta.waterDepth
|
||||
});
|
||||
|
||||
} else {
|
||||
// FIXME Change to alert
|
||||
console.error("DetectSOLEOL queue full at", this.queue.length);
|
||||
}
|
||||
|
||||
this.processQueue();
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = DetectSOLEOL;
|
||||
12
lib/www/server/events/handlers/index.js
Normal file
12
lib/www/server/events/handlers/index.js
Normal file
@@ -0,0 +1,12 @@
|
||||
const Handlers = [
|
||||
require('./detect-soleol')
|
||||
];
|
||||
|
||||
function init () {
|
||||
return Handlers.map(Handler => new Handler());
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
Handlers,
|
||||
init
|
||||
}
|
||||
@@ -1,56 +1,21 @@
|
||||
const { schema2pid } = require('../lib/db/connection');
|
||||
const { listen } = require('../ws/db');
|
||||
const { event } = require('../lib/db');
|
||||
const channels = require('../lib/db/channels');
|
||||
const handlers = require('./handlers').init();
|
||||
|
||||
function start () {
|
||||
let prevPos = null;
|
||||
|
||||
listen(["realtime"], function (data) {
|
||||
if (!(data.payload && data.payload.new && data.payload.new.meta)) {
|
||||
console.log("Wrong event", data);
|
||||
return;
|
||||
listen(channels, async function (data) {
|
||||
for (const handler of handlers) {
|
||||
// NOTE: We are intentionally passing the same instance
|
||||
// of the data to every handler. This means that earlier
|
||||
// handlers could, in principle, modify the data to be
|
||||
// consumed by latter ones, provided that they are
|
||||
// synchronous (as otherwise, the completion order is
|
||||
// undefined).
|
||||
await handler.run(data);
|
||||
}
|
||||
|
||||
const pos = data.payload.new.meta;
|
||||
|
||||
if (prevPos) {
|
||||
if (pos.lineStatus == "online") {
|
||||
if (prevPos.lineStatus != "online") {
|
||||
// FIXME TODO Check if there are already FSP, FGSP events for this sequence
|
||||
// Tag this as FSP/FGSP
|
||||
const remarks = `SEQ ${pos._sequence}, SOL ${pos.lineName}, BSP: ${(pos.speed*3.6/1.852).toFixed(1)} kt, Water depth: ${Number(pos.waterDepth).toFixed(0)} m.`;
|
||||
const payload = {
|
||||
type: "sequence",
|
||||
sequence: pos._sequence,
|
||||
point: pos._point,
|
||||
remarks,
|
||||
labels: [ "FSP", "FGSP" ]
|
||||
}
|
||||
schema2pid(pos._schema).then(projectId => event.post(projectId, payload));
|
||||
// console.log("post fsp", pos._schema);
|
||||
}
|
||||
} else {
|
||||
if (prevPos.lineStatus == "online") {
|
||||
// FIXME TODO Check if there are already LSP, LGSP events for this sequence
|
||||
// Tag this as LSP/LGSP
|
||||
const remarks = `SEQ ${prevPos._sequence}, EOL ${prevPos.lineName}, BSP: ${(prevPos.speed*3.6/1.852).toFixed(1)} kt, Water depth: ${Number(prevPos.waterDepth).toFixed(0)} m.`;
|
||||
const payload = {
|
||||
type: "sequence",
|
||||
sequence: prevPos._sequence,
|
||||
point: prevPos._point,
|
||||
remarks,
|
||||
labels: [ "LSP", "LGSP" ]
|
||||
}
|
||||
schema2pid(prevPos._schema).then(projectId => event.post(projectId, payload));
|
||||
// console.log("post lsp", prevPos._schema);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
prevPos = JSON.parse(JSON.stringify(pos));
|
||||
});
|
||||
|
||||
console.log("Events manager started");
|
||||
console.log("Events manager started.", handlers.length, "active handlers");
|
||||
}
|
||||
|
||||
module.exports = { start }
|
||||
|
||||
Reference in New Issue
Block a user