mirror of
https://gitlab.com/wgp/dougal/software.git
synced 2025-12-06 11:47:09 +00:00
Compare commits
13 Commits
v2025.33.2
...
v2025.33.3
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
12a762f44f | ||
|
|
ebf13abc28 | ||
|
|
b3552db02f | ||
|
|
cd882c0611 | ||
|
|
6fc9c020a4 | ||
|
|
75284322f1 | ||
|
|
e849c47f01 | ||
|
|
387d20a4f0 | ||
|
|
2fab06d340 | ||
|
|
7d2fb5558a | ||
|
|
764e2cfb23 | ||
|
|
bf1af1f76c | ||
|
|
09e4cd2467 |
@@ -693,7 +693,7 @@ class DougalBinaryChunkSequential extends ArrayBuffer {
|
||||
getRecord (index) {
|
||||
if (index < 0 || index >= this.jCount) throw new Error(`Invalid record index: ${index}`);
|
||||
|
||||
const arr = [thid.udv, this.i, this.j0 + index * this.Δj];
|
||||
const arr = [this.udv, this.i, this.j0 + index * this.Δj];
|
||||
|
||||
for (let m = 0; m < this.ΔelemCount; m++) {
|
||||
const values = this.Δelem(m);
|
||||
|
||||
@@ -3,8 +3,10 @@
|
||||
<v-card-title class="headline">
|
||||
Array inline / crossline error
|
||||
<v-spacer></v-spacer>
|
||||
<!--
|
||||
<v-switch v-model="scatterplot" label="Scatterplot"></v-switch>
|
||||
<v-switch class="ml-4" v-model="histogram" label="Histogram"></v-switch>
|
||||
-->
|
||||
</v-card-title>
|
||||
|
||||
<v-container fluid fill-height>
|
||||
@@ -57,8 +59,8 @@ export default {
|
||||
graph: [],
|
||||
busy: false,
|
||||
resizeObserver: null,
|
||||
scatterplot: false,
|
||||
histogram: false
|
||||
scatterplot: true,
|
||||
histogram: true
|
||||
};
|
||||
},
|
||||
|
||||
|
||||
@@ -3,8 +3,10 @@
|
||||
<v-card-title class="headline">
|
||||
Gun depth
|
||||
<v-spacer></v-spacer>
|
||||
<!--
|
||||
<v-switch v-model="shotpoint" label="Shotpoint"></v-switch>
|
||||
<v-switch class="ml-4" v-model="violinplot" label="Violin plot"></v-switch>
|
||||
-->
|
||||
</v-card-title>
|
||||
|
||||
<v-container fluid fill-height>
|
||||
@@ -59,7 +61,7 @@ export default {
|
||||
busy: false,
|
||||
resizeObserver: null,
|
||||
shotpoint: true,
|
||||
violinplot: false
|
||||
violinplot: true
|
||||
};
|
||||
},
|
||||
|
||||
|
||||
@@ -3,8 +3,10 @@
|
||||
<v-card-title class="headline">
|
||||
Gun pressures
|
||||
<v-spacer></v-spacer>
|
||||
<!--
|
||||
<v-switch v-model="shotpoint" label="Shotpoint"></v-switch>
|
||||
<v-switch class="ml-4" v-model="violinplot" label="Violin plot"></v-switch>
|
||||
-->
|
||||
</v-card-title>
|
||||
|
||||
<v-container fluid fill-height>
|
||||
@@ -59,7 +61,7 @@ export default {
|
||||
busy: false,
|
||||
resizeObserver: null,
|
||||
shotpoint: true,
|
||||
violinplot: false
|
||||
violinplot: true
|
||||
};
|
||||
},
|
||||
|
||||
|
||||
@@ -3,8 +3,10 @@
|
||||
<v-card-title class="headline">
|
||||
Gun timing
|
||||
<v-spacer></v-spacer>
|
||||
<!--
|
||||
<v-switch v-model="shotpoint" label="Shotpoint"></v-switch>
|
||||
<v-switch class="ml-4" v-model="violinplot" label="Violin plot"></v-switch>
|
||||
-->
|
||||
</v-card-title>
|
||||
|
||||
<v-container fluid fill-height>
|
||||
@@ -59,7 +61,7 @@ export default {
|
||||
busy: false,
|
||||
resizeObserver: null,
|
||||
shotpoint: true,
|
||||
violinplot: false
|
||||
violinplot: true
|
||||
};
|
||||
},
|
||||
|
||||
|
||||
@@ -737,6 +737,13 @@ export default {
|
||||
if (event.id) {
|
||||
const id = event.id;
|
||||
delete event.id;
|
||||
|
||||
// If this is an edit, ensure that it is *either*
|
||||
// a timestamp event or a sequence + point one.
|
||||
if (event.sequence && event.point && event.tstamp) {
|
||||
delete event.tstamp;
|
||||
}
|
||||
|
||||
this.putEvent(id, event, callback); // No await
|
||||
} else {
|
||||
this.postEvent(event, callback); // No await
|
||||
|
||||
@@ -225,16 +225,28 @@ app.map({
|
||||
'changes/:since': {
|
||||
get: [ mw.auth.access.read, mw.event.changes ]
|
||||
},
|
||||
// TODO Rename -/:sequence → sequence/:sequence
|
||||
// NOTE: old alias for /sequence/:sequence
|
||||
'-/:sequence/': { // NOTE: We need to avoid conflict with the next endpoint ☹
|
||||
get: [ mw.auth.access.read, mw.event.sequence.get ],
|
||||
},
|
||||
':id/': {
|
||||
'sequence/:sequence/': {
|
||||
get: [ mw.auth.access.read, mw.event.sequence.get ],
|
||||
},
|
||||
':id(\\d+)/': {
|
||||
get: [ mw.auth.access.read, mw.event.get ],
|
||||
put: [ mw.auth.access.write, mw.event.put ],
|
||||
patch: [ mw.auth.access.write, mw.event.patch ],
|
||||
delete: [mw.auth.access.write, mw.event.delete ]
|
||||
},
|
||||
'import': {
|
||||
put: [ mw.auth.access.write, mw.event.import.csv, mw.event.import.put ],
|
||||
post: [ mw.auth.access.write, mw.event.import.csv, mw.event.import.put ],
|
||||
'/:filename': {
|
||||
put: [ mw.auth.access.read, mw.event.import.csv, mw.event.import.put ],
|
||||
post: [ mw.auth.access.write, mw.event.import.csv, mw.event.import.put ],
|
||||
delete: [ mw.auth.access.write, mw.event.import.delete ]
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
/*
|
||||
|
||||
@@ -66,8 +66,18 @@ const rels = [
|
||||
|
||||
function invalidateCache (data, cache) {
|
||||
return new Promise((resolve, reject) => {
|
||||
if (!data) {
|
||||
ERROR("invalidateCache called with no data");
|
||||
return;
|
||||
}
|
||||
|
||||
if (!data.payload) {
|
||||
ERROR("invalidateCache called without a payload; channel = %s", data.channel);
|
||||
return;
|
||||
}
|
||||
|
||||
const channel = data.channel;
|
||||
const project = data.payload.pid ?? data.payload?.new?.pid ?? data.payload?.old?.pid;
|
||||
const project = data.payload?.pid ?? data.payload?.new?.pid ?? data.payload?.old?.pid;
|
||||
const operation = data.payload.operation;
|
||||
const table = data.payload.table;
|
||||
const fields = { channel, project, operation, table };
|
||||
|
||||
146
lib/www/server/api/middleware/event/import/csv.js
Normal file
146
lib/www/server/api/middleware/event/import/csv.js
Normal file
@@ -0,0 +1,146 @@
|
||||
const Busboy = require('busboy');
|
||||
const { parse } = require('csv-parse/sync');
|
||||
|
||||
async function middleware(req, res, next) {
|
||||
const contentType = req.headers['content-type'] || '';
|
||||
let csvText = null;
|
||||
let filename = null;
|
||||
|
||||
if (req.params.filename && contentType.startsWith('text/csv')) {
|
||||
csvText = typeof req.body === 'string' ? req.body : req.body.toString('utf8');
|
||||
filename = req.params.filename;
|
||||
processCsv();
|
||||
} else if (contentType.startsWith('multipart/form-data')) {
|
||||
const busboy = Busboy({ headers: req.headers });
|
||||
let found = false;
|
||||
busboy.on('file', (name, file, info) => {
|
||||
if (found) {
|
||||
file.resume();
|
||||
return;
|
||||
}
|
||||
if (info.mimeType === 'text/csv') {
|
||||
found = true;
|
||||
filename = info.filename || 'unnamed.csv';
|
||||
csvText = '';
|
||||
file.setEncoding('utf8');
|
||||
file.on('data', (data) => { csvText += data; });
|
||||
file.on('end', () => {});
|
||||
} else {
|
||||
file.resume();
|
||||
}
|
||||
});
|
||||
busboy.on('field', () => {}); // Ignore fields
|
||||
busboy.on('finish', () => {
|
||||
if (!found) {
|
||||
return next();
|
||||
}
|
||||
processCsv();
|
||||
});
|
||||
req.pipe(busboy);
|
||||
return;
|
||||
} else {
|
||||
return next();
|
||||
}
|
||||
|
||||
function processCsv() {
|
||||
let records;
|
||||
try {
|
||||
records = parse(csvText, {
|
||||
relax_quotes: true,
|
||||
quote: '"',
|
||||
escape: '"',
|
||||
skip_empty_lines: true,
|
||||
trim: true
|
||||
});
|
||||
} catch (e) {
|
||||
return res.status(400).json({ error: 'Invalid CSV' });
|
||||
}
|
||||
if (!records.length) {
|
||||
return res.status(400).json({ error: 'Empty CSV' });
|
||||
}
|
||||
const headers = records[0].map(h => h.toLowerCase().trim());
|
||||
const rows = records.slice(1);
|
||||
let lastDate = null;
|
||||
let lastTime = null;
|
||||
const currentDate = new Date().toISOString().slice(0, 10);
|
||||
const currentTime = new Date().toISOString().slice(11, 19);
|
||||
const events = [];
|
||||
for (let row of rows) {
|
||||
let object = { labels: [] };
|
||||
for (let k = 0; k < headers.length; k++) {
|
||||
let key = headers[k];
|
||||
let val = row[k] ? row[k].trim() : '';
|
||||
if (!key) continue;
|
||||
if (['remarks', 'event', 'comment', 'comments', 'text'].includes(key)) {
|
||||
object.remarks = val;
|
||||
} else if (key === 'label') {
|
||||
if (val) object.labels.push(val);
|
||||
} else if (key === 'labels') {
|
||||
if (val) object.labels.push(...val.split(';').map(l => l.trim()).filter(l => l));
|
||||
} else if (key === 'sequence' || key === 'seq') {
|
||||
if (val) object.sequence = Number(val);
|
||||
} else if (['point', 'shot', 'shotpoint'].includes(key)) {
|
||||
if (val) object.point = Number(val);
|
||||
} else if (key === 'date') {
|
||||
object.date = val;
|
||||
} else if (key === 'time') {
|
||||
object.time = val;
|
||||
} else if (key === 'timestamp') {
|
||||
object.timestamp = val;
|
||||
} else if (key === 'latitude') {
|
||||
object.latitude = parseFloat(val);
|
||||
} else if (key === 'longitude') {
|
||||
object.longitude = parseFloat(val);
|
||||
}
|
||||
}
|
||||
if (!object.remarks) continue;
|
||||
let useSeqPoint = Number.isFinite(object.sequence) && Number.isFinite(object.point);
|
||||
let tstamp = null;
|
||||
if (!useSeqPoint) {
|
||||
if (object.timestamp) {
|
||||
tstamp = new Date(object.timestamp);
|
||||
}
|
||||
if (!tstamp || isNaN(tstamp.getTime())) {
|
||||
let dateStr = object.date || lastDate || currentDate;
|
||||
let timeStr = object.time || lastTime || currentTime;
|
||||
if (timeStr.length === 5) timeStr += ':00';
|
||||
let full = `${dateStr}T${timeStr}.000Z`;
|
||||
tstamp = new Date(full);
|
||||
if (isNaN(tstamp.getTime())) continue;
|
||||
}
|
||||
if (object.date) lastDate = object.date;
|
||||
if (object.time) lastTime = object.time;
|
||||
}
|
||||
let event = {
|
||||
remarks: object.remarks,
|
||||
labels: object.labels,
|
||||
meta: {
|
||||
author: "*CSVImport*",
|
||||
"*CSVImport*": {
|
||||
filename,
|
||||
tstamp: new Date().toISOString()
|
||||
}
|
||||
}
|
||||
};
|
||||
if (!isNaN(object.latitude) && !isNaN(object.longitude)) {
|
||||
event.meta.geometry = {
|
||||
type: "Point",
|
||||
coordinates: [object.longitude, object.latitude]
|
||||
};
|
||||
}
|
||||
if (useSeqPoint) {
|
||||
event.sequence = object.sequence;
|
||||
event.point = object.point;
|
||||
} else if (tstamp) {
|
||||
event.tstamp = tstamp.toISOString();
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
events.push(event);
|
||||
}
|
||||
req.body = events;
|
||||
next();
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = middleware;
|
||||
18
lib/www/server/api/middleware/event/import/delete.js
Normal file
18
lib/www/server/api/middleware/event/import/delete.js
Normal file
@@ -0,0 +1,18 @@
|
||||
|
||||
const { event } = require('../../../../lib/db');
|
||||
|
||||
module.exports = async function (req, res, next) {
|
||||
|
||||
try {
|
||||
if (req.params.project && req.params.filename) {
|
||||
await event.unimport(req.params.project, req.params.filename, req.query);
|
||||
res.status(204).end();
|
||||
} else {
|
||||
res.status(400).send({message: "Malformed request"});
|
||||
}
|
||||
next();
|
||||
} catch (err) {
|
||||
next(err);
|
||||
}
|
||||
|
||||
};
|
||||
6
lib/www/server/api/middleware/event/import/index.js
Normal file
6
lib/www/server/api/middleware/event/import/index.js
Normal file
@@ -0,0 +1,6 @@
|
||||
|
||||
module.exports = {
|
||||
csv: require('./csv'),
|
||||
put: require('./put'),
|
||||
delete: require('./delete'),
|
||||
}
|
||||
16
lib/www/server/api/middleware/event/import/put.js
Normal file
16
lib/www/server/api/middleware/event/import/put.js
Normal file
@@ -0,0 +1,16 @@
|
||||
|
||||
const { event } = require('../../../../lib/db');
|
||||
|
||||
module.exports = async function (req, res, next) {
|
||||
|
||||
try {
|
||||
const payload = req.body;
|
||||
|
||||
await event.import(req.params.project, payload, req.query);
|
||||
res.status(200).send(payload);
|
||||
next();
|
||||
} catch (err) {
|
||||
next(err);
|
||||
}
|
||||
|
||||
};
|
||||
@@ -7,5 +7,6 @@ module.exports = {
|
||||
put: require('./put'),
|
||||
patch: require('./patch'),
|
||||
delete: require('./delete'),
|
||||
changes: require('./changes')
|
||||
changes: require('./changes'),
|
||||
import: require('./import'),
|
||||
}
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
const project = require('../../lib/db/project');
|
||||
const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
|
||||
|
||||
class DetectProjectConfigurationChange {
|
||||
@@ -10,7 +9,7 @@ class DetectProjectConfigurationChange {
|
||||
|
||||
// Grab project configurations.
|
||||
// NOTE that this will run asynchronously
|
||||
this.run({channel: "project"}, ctx);
|
||||
//this.run({channel: "project"}, ctx);
|
||||
}
|
||||
|
||||
async run (data, ctx) {
|
||||
@@ -28,13 +27,13 @@ class DetectProjectConfigurationChange {
|
||||
try {
|
||||
DEBUG("Project configuration change detected")
|
||||
|
||||
const projects = await project.get();
|
||||
project.organisations.setCache(projects);
|
||||
const projects = await ctx.db.project.get();
|
||||
ctx.db.project.organisations.setCache(projects);
|
||||
|
||||
const _ctx_data = {};
|
||||
for (let pid of projects.map(i => i.pid)) {
|
||||
DEBUG("Retrieving configuration for", pid);
|
||||
const cfg = await project.configuration.get(pid);
|
||||
const cfg = await ctx.db.project.configuration.get(pid);
|
||||
if (cfg?.archived === true) {
|
||||
DEBUG(pid, "is archived. Ignoring");
|
||||
continue;
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
const { schema2pid } = require('../../lib/db/connection');
|
||||
const { event } = require('../../lib/db');
|
||||
const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
|
||||
|
||||
class DetectSoftStart {
|
||||
@@ -33,14 +31,19 @@ class DetectSoftStart {
|
||||
const prev = this.prev?.payload?.new?.meta;
|
||||
// DEBUG("%j", prev);
|
||||
// DEBUG("%j", cur);
|
||||
DEBUG("cur.num_guns: %d\ncur.num_active: %d\nprv.num_active: %d\ntest passed: %j", cur.num_guns, cur.num_active, prev.num_active, cur.num_active >= 1 && !prev.num_active && cur.num_active < cur.num_guns);
|
||||
if (cur.lineStatus == "online" || prev.lineStatus == "online") {
|
||||
DEBUG("lineStatus is online, assuming not in a soft start situation");
|
||||
return;
|
||||
}
|
||||
|
||||
DEBUG("cur.num_guns: %d\ncur.num_active: %d\nprv.num_active: %d\ncur.num_nofire: %d\nprev.num_nofire: %d", cur.num_guns, cur.num_active, prev.num_active, cur.num_nofire, prev.num_nofire);
|
||||
|
||||
|
||||
if (cur.num_active >= 1 && !prev.num_active && cur.num_active < cur.num_guns) {
|
||||
INFO("Soft start detected @", cur.tstamp);
|
||||
|
||||
// FIXME Shouldn't need to use schema2pid as pid already present in payload.
|
||||
const projectId = await schema2pid(cur._schema ?? prev._schema);
|
||||
const projectId = await ctx.schema2pid(cur._schema ?? prev._schema);
|
||||
|
||||
// TODO: Try and grab the corresponding comment from the configuration?
|
||||
const payload = {
|
||||
@@ -50,12 +53,16 @@ class DetectSoftStart {
|
||||
meta: {auto: true, author: `*${this.constructor.name}*`}
|
||||
};
|
||||
DEBUG("Posting event", projectId, payload);
|
||||
await event.post(projectId, payload);
|
||||
if (ctx.dryRun) {
|
||||
DEBUG(`DRY RUN: await ctx.db.event.post(${projectId}, ${payload});`);
|
||||
} else {
|
||||
await ctx.db.event.post(projectId, payload);
|
||||
}
|
||||
|
||||
} else if (cur.num_active == cur.num_guns && prev.num_active < cur.num_active) {
|
||||
} else if ((cur.num_active == cur.num_guns || (prev.num_nofire > 0 && cur.num_nofire == 0)) && prev.num_active < cur.num_active) {
|
||||
INFO("Full volume detected @", cur.tstamp);
|
||||
|
||||
const projectId = await schema2pid(cur._schema ?? prev._schema);
|
||||
const projectId = await ctx.schema2pid(cur._schema ?? prev._schema);
|
||||
|
||||
// TODO: Try and grab the corresponding comment from the configuration?
|
||||
const payload = {
|
||||
@@ -65,7 +72,11 @@ class DetectSoftStart {
|
||||
meta: {auto: true, author: `*${this.constructor.name}*`}
|
||||
};
|
||||
DEBUG("Posting event", projectId, payload);
|
||||
await event.post(projectId, payload);
|
||||
if (ctx.dryRun) {
|
||||
DEBUG(`DRY RUN: await ctx.db.event.post(${projectId}, ${payload});`);
|
||||
} else {
|
||||
await ctx.db.event.post(projectId, payload);
|
||||
}
|
||||
}
|
||||
|
||||
} catch (err) {
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
const { schema2pid } = require('../../lib/db/connection');
|
||||
const { event } = require('../../lib/db');
|
||||
const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
|
||||
|
||||
class DetectSOLEOL {
|
||||
@@ -43,7 +41,7 @@ class DetectSOLEOL {
|
||||
|
||||
// We must use schema2pid because the pid may not have been
|
||||
// populated for this event.
|
||||
const projectId = await schema2pid(cur._schema ?? prev._schema);
|
||||
const projectId = await ctx.schema2pid(cur._schema ?? prev._schema);
|
||||
const labels = ["FSP", "FGSP"];
|
||||
const remarks = `SEQ ${cur._sequence}, SOL ${cur.lineName}, BSP: ${(cur.speed*3.6/1.852).toFixed(1)} kt, Water depth: ${Number(cur.waterDepth).toFixed(0)} m.`;
|
||||
const payload = {
|
||||
@@ -55,24 +53,32 @@ class DetectSOLEOL {
|
||||
meta: {auto: true, author: `*${this.constructor.name}*`}
|
||||
}
|
||||
INFO("Posting event", projectId, payload);
|
||||
await event.post(projectId, payload);
|
||||
if (ctx.dryRun) {
|
||||
DEBUG(`DRY RUN: await ctx.db.event.post(${projectId}, ${payload});`);
|
||||
} else {
|
||||
await ctx.db.event.post(projectId, payload);
|
||||
}
|
||||
} else if (prev.lineName == cur.lineName && prev._sequence == cur._sequence &&
|
||||
prev.lineStatus == "online" && cur.lineStatus != "online" && sequence) {
|
||||
INFO("Transition to OFFLINE detected");
|
||||
|
||||
const projectId = await schema2pid(prev._schema ?? cur._schema);
|
||||
const projectId = await ctx.schema2pid(prev._schema ?? cur._schema);
|
||||
const labels = ["LSP", "LGSP"];
|
||||
const remarks = `SEQ ${cur._sequence}, EOL ${cur.lineName}, BSP: ${(cur.speed*3.6/1.852).toFixed(1)} kt, Water depth: ${Number(cur.waterDepth).toFixed(0)} m.`;
|
||||
const remarks = `SEQ ${prev._sequence}, EOL ${prev.lineName}, BSP: ${(prev.speed*3.6/1.852).toFixed(1)} kt, Water depth: ${Number(prev.waterDepth).toFixed(0)} m.`;
|
||||
const payload = {
|
||||
type: "sequence",
|
||||
sequence,
|
||||
point: cur._point,
|
||||
point: prev._point,
|
||||
remarks,
|
||||
labels,
|
||||
meta: {auto: true, author: `*${this.constructor.name}*`}
|
||||
}
|
||||
INFO("Posting event", projectId, payload);
|
||||
await event.post(projectId, payload);
|
||||
if (ctx.dryRun) {
|
||||
DEBUG(`DRY RUN: await ctx.db.event.post(${projectId}, ${payload});`);
|
||||
} else {
|
||||
await ctx.db.event.post(projectId, payload);
|
||||
}
|
||||
}
|
||||
|
||||
} catch (err) {
|
||||
|
||||
@@ -8,37 +8,6 @@ const Handlers = [
|
||||
require('./detect-fdsp')
|
||||
];
|
||||
|
||||
function init (ctx) {
|
||||
|
||||
const instances = Handlers.map(Handler => new Handler(ctx));
|
||||
|
||||
function prepare (data, ctx) {
|
||||
const promises = [];
|
||||
for (let instance of instances) {
|
||||
const promise = new Promise(async (resolve, reject) => {
|
||||
try {
|
||||
DEBUG("Run", instance.author);
|
||||
const result = await instance.run(data, ctx);
|
||||
DEBUG("%s result: %O", instance.author, result);
|
||||
resolve(result);
|
||||
} catch (err) {
|
||||
ERROR("%s error:\n%O", instance.author, err);
|
||||
reject(err);
|
||||
}
|
||||
});
|
||||
promises.push(promise);
|
||||
}
|
||||
return promises;
|
||||
}
|
||||
|
||||
function despatch (data, ctx) {
|
||||
return Promise.allSettled(prepare(data, ctx));
|
||||
}
|
||||
|
||||
return { instances, prepare, despatch };
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
Handlers,
|
||||
init
|
||||
};
|
||||
|
||||
@@ -1,6 +1,3 @@
|
||||
const { event, project } = require('../../lib/db');
|
||||
const { withinValidity } = require('../../lib/utils/ranges');
|
||||
const unique = require('../../lib/utils/unique');
|
||||
const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
|
||||
|
||||
class ReportLineChangeTime {
|
||||
@@ -44,7 +41,7 @@ class ReportLineChangeTime {
|
||||
|
||||
async function getLineChangeTime (data, forward = false) {
|
||||
if (forward) {
|
||||
const ospEvents = await event.list(projectId, {label: "FGSP"});
|
||||
const ospEvents = await ctx.db.event.list(projectId, {label: "FGSP"});
|
||||
// DEBUG("ospEvents", ospEvents);
|
||||
const osp = ospEvents.filter(i => i.tstamp > data.tstamp).pop();
|
||||
DEBUG("fsp", osp);
|
||||
@@ -55,7 +52,7 @@ class ReportLineChangeTime {
|
||||
return { lineChangeTime: osp.tstamp - data.tstamp, osp };
|
||||
}
|
||||
} else {
|
||||
const ospEvents = await event.list(projectId, {label: "LGSP"});
|
||||
const ospEvents = await ctx.db.event.list(projectId, {label: "LGSP"});
|
||||
// DEBUG("ospEvents", ospEvents);
|
||||
const osp = ospEvents.filter(i => i.tstamp < data.tstamp).shift();
|
||||
DEBUG("lsp", osp);
|
||||
@@ -96,16 +93,20 @@ class ReportLineChangeTime {
|
||||
const opts = {jpq};
|
||||
|
||||
if (Array.isArray(seq)) {
|
||||
opts.sequences = unique(seq).filter(i => !!i);
|
||||
opts.sequences = ctx.unique(seq).filter(i => !!i);
|
||||
} else {
|
||||
opts.sequence = seq;
|
||||
}
|
||||
|
||||
const staleEvents = await event.list(projectId, opts);
|
||||
const staleEvents = await ctx.db.event.list(projectId, opts);
|
||||
DEBUG(staleEvents.length ?? 0, "events to delete");
|
||||
for (let staleEvent of staleEvents) {
|
||||
DEBUG(`Deleting event id ${staleEvent.id} (seq = ${staleEvent.sequence}, point = ${staleEvent.point})`);
|
||||
await event.del(projectId, staleEvent.id);
|
||||
if (ctx.dryRun) {
|
||||
DEBUG(`await ctx.db.event.del(${projectId}, ${staleEvent.id});`);
|
||||
} else {
|
||||
await ctx.db.event.del(projectId, staleEvent.id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -180,7 +181,11 @@ class ReportLineChangeTime {
|
||||
|
||||
const maybePostEvent = async (projectId, payload) => {
|
||||
DEBUG("Posting event", projectId, payload);
|
||||
await event.post(projectId, payload);
|
||||
if (ctx.dryRun) {
|
||||
DEBUG(`await ctx.db.event.post(${projectId}, ${payload});`);
|
||||
} else {
|
||||
await ctx.db.event.post(projectId, payload);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -192,7 +197,7 @@ class ReportLineChangeTime {
|
||||
const data = n;
|
||||
DEBUG("INSERT seen: will add lct events related to ", data.id);
|
||||
|
||||
if (withinValidity(data.validity)) {
|
||||
if (ctx.withinValidity(data.validity)) {
|
||||
DEBUG("Event within validity period", data.validity, new Date());
|
||||
|
||||
data.tstamp = new Date(data.tstamp);
|
||||
|
||||
@@ -1,29 +1,101 @@
|
||||
const nodeAsync = require('async'); // npm install async
|
||||
const { listen } = require('../lib/db/notify');
|
||||
const db = require('../lib/db'); // Adjust paths; include all needed DB utils
|
||||
const { schema2pid } = require('../lib/db/connection');
|
||||
const unique = require('../lib/utils/unique'); // If needed by handlers
|
||||
const withinValidity = require('../lib/utils/ranges').withinValidity; // If needed
|
||||
const { ALERT, ERROR, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
|
||||
|
||||
// List of handler classes (add more as needed)
|
||||
const handlerClasses = require('./handlers').Handlers;
|
||||
|
||||
// Channels to listen to (hardcoded for simplicity; could scan handlers for mentions)
|
||||
const channels = require('../lib/db/channels');
|
||||
const handlers = require('./handlers');
|
||||
const { ActionsQueue } = require('../lib/queue');
|
||||
const { ERROR, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
|
||||
|
||||
function start () {
|
||||
// Queue config: Process one at a time for order; max retries=3
|
||||
const eventQueue = nodeAsync.queue(async (task, callback) => {
|
||||
const { data, ctx } = task;
|
||||
DEBUG(`Processing event on channel ${data.channel} with timestamp ${data._received ?? 'unknown'}`);
|
||||
|
||||
const queue = new ActionsQueue();
|
||||
const ctx = {}; // Context object
|
||||
for (const handler of ctx.handlers) {
|
||||
try {
|
||||
await handler.run(data, ctx);
|
||||
} catch (err) {
|
||||
ERROR(`Error in handler ${handler.constructor.name}:`, err);
|
||||
// Retry logic: Could add task.retries++, re-enqueue if < max
|
||||
}
|
||||
}
|
||||
|
||||
const { prepare, despatch } = handlers.init(ctx);
|
||||
if (typeof callback === 'function') {
|
||||
// async v3.2.6+ does not use callsbacks with AsyncFunctions, but anyway
|
||||
callback();
|
||||
}
|
||||
}, 1); // Concurrency=1 for strict order
|
||||
|
||||
listen(channels, function (data) {
|
||||
DEBUG("Incoming data", data);
|
||||
eventQueue.error((err, task) => {
|
||||
ALERT(`Queue error processing task:`, err, task);
|
||||
});
|
||||
|
||||
// We don't bother awaiting
|
||||
queue.enqueue(() => despatch(data, ctx));
|
||||
DEBUG("Queue size", queue.length());
|
||||
// Main setup function (call from server init)
|
||||
async function setupEventHandlers(projectsConfig) {
|
||||
// Shared context
|
||||
const ctx = {
|
||||
dryRun: Boolean(process.env.DOUGAL_HANDLERS_DRY_RUN) ?? false, // If true, don't commit changes
|
||||
projects: { configuration: projectsConfig }, // From user config
|
||||
handlers: handlerClasses.map(Cls => new Cls()), // Instances
|
||||
// DB utils (add more as needed)
|
||||
db,
|
||||
schema2pid,
|
||||
unique,
|
||||
withinValidity
|
||||
// Add other utils, e.g., ctx.logger = DEBUG;
|
||||
};
|
||||
|
||||
// Optional: Replay recent events on startup to rebuild state
|
||||
// await replayRecentEvents(ctx);
|
||||
|
||||
// Setup listener
|
||||
const subscriber = await listen(channels, (rawData) => {
|
||||
const data = {
|
||||
...rawData,
|
||||
enqueuedAt: new Date() // For monitoring
|
||||
};
|
||||
eventQueue.push({ data, ctx });
|
||||
});
|
||||
|
||||
INFO("Events manager started");
|
||||
DEBUG('Event handler system initialized with channels:', channels);
|
||||
if (ctx.dryRun) {
|
||||
DEBUG('DRY RUNNING');
|
||||
}
|
||||
|
||||
// Return for cleanup if needed
|
||||
return {
|
||||
close: () => {
|
||||
subscriber.events.removeAllListeners();
|
||||
subscriber.close();
|
||||
eventQueue.kill();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
module.exports = { start }
|
||||
// Optional: Replay last N events to rebuild handler state (e.g., this.prev)
|
||||
// async function replayRecentEvents(ctx) {
|
||||
// try {
|
||||
// // Example: Fetch last 10 realtime events, sorted by tstamp
|
||||
// const recentRealtime = await event.listAllProjects({ channel: 'realtime', limit: 10, sort: 'tstamp DESC' });
|
||||
// // Assume event.listAllProjects is a custom DB method; implement if needed
|
||||
//
|
||||
// // Enqueue in original order (reverse sort)
|
||||
// recentRealtime.reverse().forEach((evt) => {
|
||||
// const data = { channel: 'realtime', payload: { new: evt } };
|
||||
// eventQueue.push({ data, ctx });
|
||||
// });
|
||||
//
|
||||
// // Similarly for 'event' channel if needed
|
||||
// DEBUG('Replayed recent events for state rebuild');
|
||||
// } catch (err) {
|
||||
// ERROR('Error replaying events:', err);
|
||||
// }
|
||||
// }
|
||||
|
||||
if (require.main === module) {
|
||||
start();
|
||||
}
|
||||
module.exports = { setupEventHandlers };
|
||||
|
||||
@@ -2,18 +2,37 @@
|
||||
|
||||
const { ERROR, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
|
||||
|
||||
async function getProjectConfigurations (opts = {}) {
|
||||
const includeArchived = {includeArchived: false, ...opts};
|
||||
let projectConfigurations = {};
|
||||
try {
|
||||
const db = require('./lib/db');
|
||||
const pids = (await db.project.get())
|
||||
.filter(i => includeArchived || !i.archived)
|
||||
.map(i => i.pid);
|
||||
for (const pid of pids) {
|
||||
DEBUG(`Reading project configuration for ${pid}`);
|
||||
const cfg = await db.project.configuration.get(pid);
|
||||
projectConfigurations[pid] = cfg;
|
||||
}
|
||||
} catch (err) {
|
||||
ERROR("Failed to get project configurations");
|
||||
ERROR(err);
|
||||
}
|
||||
return projectConfigurations;
|
||||
}
|
||||
|
||||
async function main () {
|
||||
// Check that we're running against the correct database version
|
||||
const version = require('./lib/version');
|
||||
INFO("Running version", await version.describe());
|
||||
version.compatible()
|
||||
.then( (versions) => {
|
||||
.then( async (versions) => {
|
||||
try {
|
||||
const api = require('./api');
|
||||
const ws = require('./ws');
|
||||
const periodicTasks = require('./periodic-tasks').init();
|
||||
|
||||
const { fork } = require('child_process');
|
||||
const { setupEventHandlers } = require('./events');
|
||||
|
||||
const port = process.env.HTTP_PORT || 3000;
|
||||
const host = process.env.HTTP_HOST || "127.0.0.1";
|
||||
@@ -25,33 +44,31 @@ async function main () {
|
||||
|
||||
periodicTasks.start();
|
||||
|
||||
const eventManagerPath = [__dirname, "events"].join("/");
|
||||
const eventManager = fork(eventManagerPath, /*{ stdio: 'ignore' }*/);
|
||||
const projectConfigurations = await getProjectConfigurations();
|
||||
const handlerSystem = await setupEventHandlers(projectConfigurations);
|
||||
|
||||
process.on("SIGINT", async () => {
|
||||
DEBUG("Interrupted (SIGINT)");
|
||||
eventManager.kill()
|
||||
handlerSystem.close();
|
||||
await periodicTasks.cleanup();
|
||||
process.exit(0);
|
||||
})
|
||||
|
||||
process.on("SIGHUP", async () => {
|
||||
DEBUG("Stopping (SIGHUP)");
|
||||
eventManager.kill()
|
||||
handlerSystem.close();
|
||||
await periodicTasks.cleanup();
|
||||
process.exit(0);
|
||||
})
|
||||
|
||||
process.on('beforeExit', async () => {
|
||||
DEBUG("Preparing to exit");
|
||||
eventManager.kill()
|
||||
handlerSystem.close();
|
||||
await periodicTasks.cleanup();
|
||||
});
|
||||
|
||||
process.on('exit', async () => {
|
||||
DEBUG("Exiting");
|
||||
// eventManager.kill()
|
||||
// periodicTasks.cleanup();
|
||||
});
|
||||
} catch (err) {
|
||||
ERROR(err);
|
||||
|
||||
105
lib/www/server/lib/db/event/import.js
Normal file
105
lib/www/server/lib/db/event/import.js
Normal file
@@ -0,0 +1,105 @@
|
||||
const { DEBUG, ERROR } = require('DOUGAL_ROOT/debug')(__filename);
|
||||
const { setSurvey, transaction } = require('../connection');
|
||||
|
||||
|
||||
/** Remove a previous import from the database.
|
||||
*
|
||||
* ATTENTION!
|
||||
*
|
||||
* This will not just mark the events as deleted but actually
|
||||
* remove them.
|
||||
*/
|
||||
async function bulk_unimport (projectId, filename, opts = {}) {
|
||||
|
||||
const client = opts.client ?? await setSurvey(projectId);
|
||||
try {
|
||||
|
||||
const text = `
|
||||
DELETE
|
||||
FROM event_log
|
||||
WHERE meta ? 'author'
|
||||
AND meta->(meta->>'author')->>'filename' = $1;
|
||||
`;
|
||||
const values = [ filename ];
|
||||
|
||||
DEBUG("Removing all event data imported from filename '%s'", filename);
|
||||
await client.query(text, values);
|
||||
} catch (err) {
|
||||
err.origin = __filename;
|
||||
throw err;
|
||||
} finally {
|
||||
if (client !== opts.client) client.release();
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
async function bulk_import (projectId, payload, opts = {}) {
|
||||
|
||||
const client = opts.client ?? await setSurvey(projectId);
|
||||
try {
|
||||
|
||||
if (!payload.length) {
|
||||
DEBUG("Called with no rows to be imported. Returning");
|
||||
return [];
|
||||
}
|
||||
|
||||
const filename = payload[0].meta[payload[0].meta.author].filename;
|
||||
|
||||
// Delete previous data from this file
|
||||
await transaction.begin(client);
|
||||
await bulk_unimport(projectId, filename, {client});
|
||||
|
||||
|
||||
// Prepare arrays for each column
|
||||
const tstamps = [];
|
||||
const sequences = [];
|
||||
const points = [];
|
||||
const remarks = [];
|
||||
const labels = [];
|
||||
const metas = [];
|
||||
|
||||
for (const event of payload) {
|
||||
tstamps.push(event.tstamp ? new Date(event.tstamp) : null);
|
||||
sequences.push(Number.isInteger(event.sequence) ? event.sequence : null);
|
||||
points.push(Number.isInteger(event.point) ? event.point : null);
|
||||
remarks.push(event.remarks || '');
|
||||
labels.push(Array.isArray(event.labels) && event.labels.length
|
||||
? `{${event.labels.map(l => `"${l.replace(/"/g, '""')}"`).join(',')}}`
|
||||
: '{}'
|
||||
);
|
||||
metas.push(event.meta ? JSON.stringify(event.meta) : '{}');
|
||||
}
|
||||
|
||||
|
||||
const text = `
|
||||
INSERT INTO event_log (tstamp, sequence, point, remarks, labels, meta)
|
||||
SELECT
|
||||
UNNEST($1::TIMESTAMP[]) AS tstamp,
|
||||
UNNEST($2::INTEGER[]) AS sequence,
|
||||
UNNEST($3::INTEGER[]) AS point,
|
||||
replace_placeholders(UNNEST($4::TEXT[]), UNNEST($1::TIMESTAMP[]), UNNEST($2::INTEGER[]), UNNEST($3::INTEGER[])) AS remarks,
|
||||
UNNEST($5::TEXT[])::TEXT[] AS labels,
|
||||
UNNEST($6::JSONB[]) AS meta
|
||||
RETURNING id;
|
||||
`;
|
||||
const values = [ tstamps, sequences, points, remarks, labels, metas ];
|
||||
|
||||
DEBUG("Importing %d rows from filename '%s'", payload.length, filename);
|
||||
const res = await client.query(text, values);
|
||||
|
||||
transaction.commit(client);
|
||||
|
||||
return res.rows.map(row => row.id);
|
||||
} catch (err) {
|
||||
err.origin = __filename;
|
||||
throw err;
|
||||
} finally {
|
||||
if (client !== opts.client) client.release();
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
module.exports = { import: bulk_import, unimport: bulk_unimport };
|
||||
@@ -6,5 +6,7 @@ module.exports = {
|
||||
put: require('./put'),
|
||||
patch: require('./patch'),
|
||||
del: require('./delete'),
|
||||
changes: require('./changes')
|
||||
changes: require('./changes'),
|
||||
import: require('./import').import,
|
||||
unimport: require('./import').unimport,
|
||||
}
|
||||
|
||||
37
lib/www/server/lib/db/event/unimport.js
Normal file
37
lib/www/server/lib/db/event/unimport.js
Normal file
@@ -0,0 +1,37 @@
|
||||
|
||||
const { DEBUG, ERROR } = require('DOUGAL_ROOT/debug')(__filename);
|
||||
const { setSurvey, transaction } = require('../connection');
|
||||
|
||||
/** Remove a previous import from the database.
|
||||
*
|
||||
* ATTENTION!
|
||||
*
|
||||
* This will not just mark the events as deleted but actually
|
||||
* remove them.
|
||||
*/
|
||||
async function unimport (projectId, filename, opts = {}) {
|
||||
|
||||
const client = await setSurvey(projectId);
|
||||
try {
|
||||
|
||||
const text = `
|
||||
DELETE
|
||||
FROM event_log
|
||||
WHERE meta ? 'author'
|
||||
AND meta->(meta->'author')->>'filename' = $1;
|
||||
`;
|
||||
const values = [ filename ];
|
||||
|
||||
DEBUG("Removing all event data imported from filename '%s'", filename);
|
||||
await client.query(text, values);
|
||||
} catch (err) {
|
||||
err.origin = __filename;
|
||||
throw err;
|
||||
} finally {
|
||||
client.release();
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
module.exports = post;
|
||||
@@ -1,52 +0,0 @@
|
||||
const Queue = require('./queue');
|
||||
|
||||
// Inspired by:
|
||||
// https://stackoverflow.com/questions/53540348/js-async-await-tasks-queue#53540586
|
||||
|
||||
class ActionsQueue extends Queue {
|
||||
|
||||
constructor (items = []) {
|
||||
super(items);
|
||||
|
||||
this.pending = false;
|
||||
}
|
||||
|
||||
enqueue (action) {
|
||||
return new Promise ((resolve, reject) => {
|
||||
super.enqueue({ action, resolve, reject });
|
||||
this.dequeue();
|
||||
});
|
||||
}
|
||||
|
||||
async dequeue () {
|
||||
|
||||
if (this.pending) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const item = super.dequeue();
|
||||
|
||||
if (!item) {
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
|
||||
this.pending = true;
|
||||
|
||||
const result = await item.action(this);
|
||||
|
||||
this.pending = false;
|
||||
item.resolve(result);
|
||||
} catch (err) {
|
||||
this.pending = false;
|
||||
item.reject(err);
|
||||
} finally {
|
||||
this.dequeue();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
module.exports = ActionsQueue;
|
||||
@@ -1,6 +0,0 @@
|
||||
|
||||
module.exports = {
|
||||
Queue: require('./queue'),
|
||||
ActionsQueue: require('./actions-queue')
|
||||
};
|
||||
|
||||
@@ -1,22 +0,0 @@
|
||||
|
||||
class Queue {
|
||||
|
||||
constructor (items = []) {
|
||||
this.items = items;
|
||||
}
|
||||
|
||||
enqueue (item) {
|
||||
this.items.push(item);
|
||||
}
|
||||
|
||||
dequeue () {
|
||||
return this.items.shift();
|
||||
}
|
||||
|
||||
length () {
|
||||
return this.items.length;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
module.exports = Queue;
|
||||
@@ -29,7 +29,9 @@
|
||||
"@dougal/binary": "file:../../modules/@dougal/binary",
|
||||
"@dougal/organisations": "file:../../modules/@dougal/organisations",
|
||||
"@dougal/user": "file:../../modules/@dougal/user",
|
||||
"async": "^3.2.6",
|
||||
"body-parser": "gitlab:aaltronav/contrib/expressjs/body-parser",
|
||||
"busboy": "^1.6.0",
|
||||
"compression": "^1.8.1",
|
||||
"cookie-parser": "^1.4.5",
|
||||
"csv": "^6.3.3",
|
||||
|
||||
26
package-lock.json
generated
26
package-lock.json
generated
@@ -9359,7 +9359,9 @@
|
||||
"@dougal/binary": "file:../../modules/@dougal/binary",
|
||||
"@dougal/organisations": "file:../../modules/@dougal/organisations",
|
||||
"@dougal/user": "file:../../modules/@dougal/user",
|
||||
"async": "^3.2.6",
|
||||
"body-parser": "gitlab:aaltronav/contrib/expressjs/body-parser",
|
||||
"busboy": "^1.6.0",
|
||||
"compression": "^1.8.1",
|
||||
"cookie-parser": "^1.4.5",
|
||||
"csv": "^6.3.3",
|
||||
@@ -14170,6 +14172,11 @@
|
||||
"node": ">=0.8"
|
||||
}
|
||||
},
|
||||
"node_modules/async": {
|
||||
"version": "3.2.6",
|
||||
"resolved": "https://registry.npmjs.org/async/-/async-3.2.6.tgz",
|
||||
"integrity": "sha512-htCUDlxyyCLMgaM3xXg0C0LW2xqfuQ6p05pCEIsXuyQ+a1koYKTuBMzRNwmybfLgvJDMd0r1LTn4+E0Ti6C2AA=="
|
||||
},
|
||||
"node_modules/asynckit": {
|
||||
"version": "0.4.0",
|
||||
"resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz",
|
||||
@@ -14274,6 +14281,17 @@
|
||||
"node": ">=0.10.0"
|
||||
}
|
||||
},
|
||||
"node_modules/busboy": {
|
||||
"version": "1.6.0",
|
||||
"resolved": "https://registry.npmjs.org/busboy/-/busboy-1.6.0.tgz",
|
||||
"integrity": "sha512-8SFQbg/0hQ9xy3UNTB0YEnsNBbWfhf7RtnzpL7TkBiTBRfrQ9Fxcnz7VJsleJpyp6rVLvXiuORqjlHi5q+PYuA==",
|
||||
"dependencies": {
|
||||
"streamsearch": "^1.1.0"
|
||||
},
|
||||
"engines": {
|
||||
"node": ">=10.16.0"
|
||||
}
|
||||
},
|
||||
"node_modules/bytes": {
|
||||
"version": "3.1.2",
|
||||
"resolved": "https://registry.npmjs.org/bytes/-/bytes-3.1.2.tgz",
|
||||
@@ -16504,6 +16522,14 @@
|
||||
"node": ">= 0.8"
|
||||
}
|
||||
},
|
||||
"node_modules/streamsearch": {
|
||||
"version": "1.1.0",
|
||||
"resolved": "https://registry.npmjs.org/streamsearch/-/streamsearch-1.1.0.tgz",
|
||||
"integrity": "sha512-Mcc5wHehp9aXz1ax6bZUyY5afg9u2rv5cqQI3mRrYkGC8rW2hM02jWuwjtL++LS5qinSyhj2QfLyNsuc+VsExg==",
|
||||
"engines": {
|
||||
"node": ">=10.0.0"
|
||||
}
|
||||
},
|
||||
"node_modules/string_decoder": {
|
||||
"version": "1.3.0",
|
||||
"resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.3.0.tgz",
|
||||
|
||||
Reference in New Issue
Block a user