Compare commits

...

28 Commits

Author SHA1 Message Date
D. Berge
12a762f44f Fix typo in @dougal/binary 2025-08-16 14:55:53 +02:00
D. Berge
ebf13abc28 Merge branch '337-fix-event-queue' into 'devel'
Resolve "Automatic event detection fault: soft start on every shot during line"

Closes #337

See merge request wgp/dougal/software!61
2025-08-16 12:55:15 +00:00
D. Berge
b3552db02f Add error checking to ETag logic 2025-08-16 11:36:43 +02:00
D. Berge
cd882c0611 Add debug info to soft start detection 2025-08-16 11:36:43 +02:00
D. Berge
6fc9c020a4 Fix off-by-one error in LGSP detection 2025-08-16 11:36:43 +02:00
D. Berge
75284322f1 Modify full volume detection on Smartsource
The Smartsource firmware seems to have changed rendering the old
test invalid.
2025-08-16 11:36:43 +02:00
D. Berge
e849c47f01 Remove old queue implementation 2025-08-16 11:36:43 +02:00
D. Berge
387d20a4f0 Rewrite automatic event handling system 2025-08-16 11:36:43 +02:00
D. Berge
2fab06d340 Don't send timestamp when patching seq+point events.
Closes #339.
2025-08-16 11:35:35 +02:00
D. Berge
7d2fb5558a Hide switches to enable additional graphs.
All violin plots as well as position scatter plots and histograms
are shown by default. This is due to #338.

For some reason, having them enabled from the get go does not
cause any problems.
2025-08-15 18:09:51 +02:00
D. Berge
764e2cfb23 Rename endpoint 2025-08-14 13:34:36 +02:00
D. Berge
bf1af1f76c Make it explicit that :id is numeric 2025-08-14 13:34:27 +02:00
D. Berge
09e4cd2467 Add CSV event import.
Closes #336
2025-08-14 13:33:30 +02:00
D. Berge
2009d73a2b Fix action registration and unregistration 2025-08-13 17:03:00 +02:00
D. Berge
083ee812de Use cookies for authentication as a last resort.
Fixes #335
2025-08-13 16:54:38 +02:00
D. Berge
84510e8dc9 Add proper logging 2025-08-13 15:42:49 +02:00
D. Berge
7205ec42a8 Fix handler registration.
The way it was being done meant that unregisterHandlers would not
have worked.
2025-08-13 15:42:49 +02:00
D. Berge
73d85ef81f Fix scheduling of token refresh via websocket 2025-08-13 12:58:36 +02:00
D. Berge
6c4dc35461 Fix bad status on preplot lines tab
If there were no raw / final sequences on a line, planned sequences
would not show either.
2025-08-13 12:45:50 +02:00
D. Berge
a5ebff077d Fix authentication middleware erroring on IPv6 2025-08-13 11:50:20 +02:00
D. Berge
2a894692ce Throttle snack notifications 2025-08-12 00:22:09 +02:00
D. Berge
25690eeb52 Fix showSnack in main.js 2025-08-11 23:48:08 +02:00
D. Berge
3f9776b61d Let the user know when we're getting gateway errors 2025-08-11 23:47:25 +02:00
D. Berge
8c81daefc0 Move the two /configuration endpoints next to each other 2025-08-11 22:20:46 +02:00
D. Berge
c173610e87 Simplify middleware 2025-08-11 22:19:51 +02:00
D. Berge
301e5c0731 Set headers only on 304 2025-08-11 22:06:51 +02:00
D. Berge
48d9f45fe0 Clean up debug messages 2025-08-11 22:06:20 +02:00
D. Berge
cd23a78592 Merge branch '190-refactor-map' into 'devel'
Resolve "Refactor map"

Closes #190, #322, #323, #324, #325, #326, and #321

See merge request wgp/dougal/software!25
2025-08-11 13:01:00 +00:00
43 changed files with 802 additions and 321 deletions

View File

@@ -693,7 +693,7 @@ class DougalBinaryChunkSequential extends ArrayBuffer {
getRecord (index) {
if (index < 0 || index >= this.jCount) throw new Error(`Invalid record index: ${index}`);
const arr = [thid.udv, this.i, this.j0 + index * this.Δj];
const arr = [this.udv, this.i, this.j0 + index * this.Δj];
for (let m = 0; m < this.ΔelemCount; m++) {
const values = this.Δelem(m);

View File

@@ -92,18 +92,12 @@ export default {
this.$store.dispatch('registerHandler', {
table: '.jwt',
handler: (context, message) => {
this.handleJWT(context, message);
}
handler: this.handleJWT
});
this.$store.dispatch('registerHandler', {
table: 'project',
handler: (context, message) => {
this.handleProject(context, message);
}
handler: this.handleProject
});
},

View File

@@ -10,7 +10,10 @@
<v-spacer></v-spacer>
<template v-if="isFrontendRemote">
<v-icon v-if="serverConnected" class="mr-6" title="Connected to server via gateway">mdi-cloud-outline</v-icon>
<template v-if="serverConnected">
<v-icon v-if="isGatewayReliable" class="mr-6" title="Connected to server via gateway">mdi-cloud-outline</v-icon>
<v-icon v-else class="mr-6" color="orange" title="Gateway connection is unreliable. Expect outages.">mdi-cloud-off</v-icon>
</template>
<v-icon v-else class="mr-6" color="red" :title="`Server connection lost: the gateway cannot reach the remote server.\nWe will reconnect automatically when the link with the remote server is restored.`">mdi-cloud-off</v-icon>
</template>
<template v-else>
@@ -57,6 +60,13 @@ export default {
DougalNotificationsControl
},
data () {
return {
lastGatewayErrorTimestamp: 0,
gatewayErrorSilencePeriod: 60000,
}
},
computed: {
year () {
const date = new Date();
@@ -65,8 +75,24 @@ export default {
...mapState({
serverConnected: state => state.notify.serverConnected,
isFrontendRemote: state => state.api.serverInfo?.["remote-frontend"] ?? false
isFrontendRemote: state => state.api.serverInfo?.["remote-frontend"] ?? false,
isGatewayReliable: state => state.api.isGatewayReliable
})
},
watch: {
isGatewayReliable (val) {
if (val === false) {
const elapsed = Date.now() - this.lastGatewayErrorTimestamp;
const lastGatewayErrorTimestamp = Date.now();
if (elapsed > this.gatewayErrorSilencePeriod) {
this.$root.showSnack("Gateway error", "warning");
}
}
}
}
};
</script>

View File

@@ -3,8 +3,10 @@
<v-card-title class="headline">
Array inline / crossline error
<v-spacer></v-spacer>
<!--
<v-switch v-model="scatterplot" label="Scatterplot"></v-switch>
<v-switch class="ml-4" v-model="histogram" label="Histogram"></v-switch>
-->
</v-card-title>
<v-container fluid fill-height>
@@ -57,8 +59,8 @@ export default {
graph: [],
busy: false,
resizeObserver: null,
scatterplot: false,
histogram: false
scatterplot: true,
histogram: true
};
},

View File

@@ -3,8 +3,10 @@
<v-card-title class="headline">
Gun depth
<v-spacer></v-spacer>
<!--
<v-switch v-model="shotpoint" label="Shotpoint"></v-switch>
<v-switch class="ml-4" v-model="violinplot" label="Violin plot"></v-switch>
-->
</v-card-title>
<v-container fluid fill-height>
@@ -59,7 +61,7 @@ export default {
busy: false,
resizeObserver: null,
shotpoint: true,
violinplot: false
violinplot: true
};
},

View File

@@ -3,8 +3,10 @@
<v-card-title class="headline">
Gun pressures
<v-spacer></v-spacer>
<!--
<v-switch v-model="shotpoint" label="Shotpoint"></v-switch>
<v-switch class="ml-4" v-model="violinplot" label="Violin plot"></v-switch>
-->
</v-card-title>
<v-container fluid fill-height>
@@ -59,7 +61,7 @@ export default {
busy: false,
resizeObserver: null,
shotpoint: true,
violinplot: false
violinplot: true
};
},

View File

@@ -3,8 +3,10 @@
<v-card-title class="headline">
Gun timing
<v-spacer></v-spacer>
<!--
<v-switch v-model="shotpoint" label="Shotpoint"></v-switch>
<v-switch class="ml-4" v-model="violinplot" label="Violin plot"></v-switch>
-->
</v-card-title>
<v-container fluid fill-height>
@@ -59,7 +61,7 @@ export default {
busy: false,
resizeObserver: null,
shotpoint: true,
violinplot: false
violinplot: true
};
},

View File

@@ -1,8 +1,5 @@
<template>
<div class="line-status" v-if="sequences.length == 0">
<slot name="empty"></slot>
</div>
<div class="line-status" v-else-if="sequenceHref || plannedSequenceHref || pendingReshootHref">
<div class="line-status" v-if="sequenceHref || plannedSequenceHref || pendingReshootHref">
<router-link v-for="sequence in sequences" :key="sequence.sequence" v-if="sequenceHref"
class="sequence"
:class="sequence.status"
@@ -26,7 +23,7 @@
>
</router-link>
</div>
<div class="line-status" v-else>
<div class="line-status" v-else-if="sequences.length || plannedSequences.length || Object.keys(pendingReshoots).length">
<div v-for="sequence in sequences" :key="sequence.sequence"
class="sequence"
:class="sequence.status"
@@ -47,6 +44,9 @@
>
</div>
</div>
<div class="line-status" v-else>
<slot name="empty"></slot>
</div>
</template>
<style lang="stylus" scoped>

View File

@@ -62,9 +62,7 @@ new Vue({
showSnack(text, colour = "primary") {
console.log("showSnack", text, colour);
this.snackColour = colour;
this.snackText = text;
this.snack = true;
this.$store.dispatch("showSnack", [text, colour]);
},
sendJwt () {

View File

@@ -71,7 +71,7 @@ async function api ({state, getters, commit, dispatch}, [resource, init = {}, cb
res = await limiter.enqueue(async () => await fetch(url, init));
}
if (cache && !isCached) {
if (cache && !isCached && res.ok) { // Only cache successful responses
cache.put(url, res.clone());
}
@@ -95,6 +95,12 @@ async function api ({state, getters, commit, dispatch}, [resource, init = {}, cb
return [key, value];
});
state.serverInfo = entries.length ? Object.fromEntries(entries) : {};
if (state.serverInfo["remote-frontend"]) {
state.isGatewayReliable = ![ 502, 503, 504 ].includes(res.status);
} else {
state.isGatewayReliable = null;
}
}
if (res.ok) {

View File

@@ -2,7 +2,8 @@ const state = () => ({
apiUrl: "/api",
requestsCount: 0,
maxConcurrent: 15,
serverInfo: {} // Contents of the last received X-Dougal-Server HTTP header
serverInfo: {}, // Contents of the last received X-Dougal-Server HTTP header
isGatewayReliable: null, // True if we start seeing HTTP 502504 responses
});
export default state;

View File

@@ -80,4 +80,4 @@ function processServerEvent({ commit, dispatch, state, rootState }, message) {
state.debouncedRunners[table](message);
}
export default { registerHandler, processServerEvent };
export default { registerHandler, unregisterHandler, processServerEvent };

View File

@@ -30,4 +30,10 @@ function UNREGISTER_HANDLER(state, { table, handler }) {
}
export default { setServerEvent, clearServerEvent, setServerConnectionState, REGISTER_HANDLER };
export default {
setServerEvent,
clearServerEvent,
setServerConnectionState,
REGISTER_HANDLER,
UNREGISTER_HANDLER
};

View File

@@ -29,21 +29,6 @@ async function logout ({ commit, dispatch }) {
commit('setPreferences', {});
}
function setCookie(context, {name, value, expiry, path}) {
if (!path) path = "/";
if (!value) value = "";
if (name) {
if (expiry) {
document.cookie = `${name}=${value}; expiry=${(new Date(expiry)).toUTCString()}; path=${path}`;
} else {
document.cookie = `${name}=${value}; path=${path}`;
}
} else {
console.warn(`seCookie: You must supply a name`);
}
}
function setCredentials({ state, commit, getters, dispatch, rootState }, { force, token, response } = {}) {
try {
let tokenValue = token;
@@ -61,6 +46,7 @@ function setCredentials({ state, commit, getters, dispatch, rootState }, { force
const decoded = jwt_decode(tokenValue);
commit('setToken', tokenValue);
commit('setUser', decoded ? new User(decoded, rootState.api.api) : null);
commit('setCookie', {name: "JWT", value: tokenValue, expires: (decoded.exp??0)*1000});
console.log('Credentials refreshed at', new Date().toISOString());
} else {
@@ -71,6 +57,7 @@ function setCredentials({ state, commit, getters, dispatch, rootState }, { force
if (err.name === 'InvalidTokenError') {
commit('setToken', null);
commit('setUser', null);
commit('clearCookie', "JWT")
}
}
dispatch('loadUserPreferences');
@@ -105,7 +92,6 @@ async function loadUserPreferences({ state, commit }) {
export default {
login,
logout,
setCookie,
setCredentials,
saveUserPreference,
loadUserPreferences

View File

@@ -16,4 +16,18 @@ function setPreferences (state, preferences) {
state.preferences = preferences;
}
export default { setToken, setUser, setPreferences };
function setCookie (state, opts = {}) {
const name = opts.name ?? "JWT";
const value = opts.value ?? "";
const expires = opts.expires ? (new Date(opts.expires)) : (new Date(0));
const path = opts.path ?? "/";
const sameSite = opts.sameSite ?? "Lax";
document.cookie = `${name}=${value};path=${path};SameSite=${sameSite};expires=${expires.toUTCString()}`;
}
function clearCookie (state, name) {
setCookie(state, {name});
}
export default { setToken, setUser, setPreferences, setCookie, clearCookie };

View File

@@ -737,6 +737,13 @@ export default {
if (event.id) {
const id = event.id;
delete event.id;
// If this is an edit, ensure that it is *either*
// a timestamp event or a sequence + point one.
if (event.sequence && event.point && event.tstamp) {
delete event.tstamp;
}
this.putEvent(id, event, callback); // No await
} else {
this.postEvent(event, callback); // No await

View File

@@ -121,10 +121,12 @@ app.map({
get: [ mw.auth.access.read, mw.project.summary.get ],
},
'/project/:project/configuration': {
get: [ mw.project.configuration.get ], // Get project configuration
patch: [ mw.auth.access.edit, mw.project.configuration.patch ], // Modify project configuration
put: [ mw.auth.access.edit, mw.project.configuration.put ], // Overwrite configuration
},
'/project/:project/configuration/:path(*)?': {
get: [ mw.auth.access.read, mw.configuration.get ],
},
/*
* GIS endpoints
@@ -223,16 +225,28 @@ app.map({
'changes/:since': {
get: [ mw.auth.access.read, mw.event.changes ]
},
// TODO Rename -/:sequence → sequence/:sequence
// NOTE: old alias for /sequence/:sequence
'-/:sequence/': { // NOTE: We need to avoid conflict with the next endpoint ☹
get: [ mw.auth.access.read, mw.event.sequence.get ],
},
':id/': {
'sequence/:sequence/': {
get: [ mw.auth.access.read, mw.event.sequence.get ],
},
':id(\\d+)/': {
get: [ mw.auth.access.read, mw.event.get ],
put: [ mw.auth.access.write, mw.event.put ],
patch: [ mw.auth.access.write, mw.event.patch ],
delete: [mw.auth.access.write, mw.event.delete ]
},
'import': {
put: [ mw.auth.access.write, mw.event.import.csv, mw.event.import.put ],
post: [ mw.auth.access.write, mw.event.import.csv, mw.event.import.put ],
'/:filename': {
put: [ mw.auth.access.read, mw.event.import.csv, mw.event.import.put ],
post: [ mw.auth.access.write, mw.event.import.csv, mw.event.import.put ],
delete: [ mw.auth.access.write, mw.event.import.delete ]
},
},
},
/*
@@ -272,10 +286,6 @@ app.map({
'/project/:project/label/': {
get: [ mw.auth.access.read, mw.label.list ],
// post: [ mw.label.post ],
},
'/project/:project/configuration/:path(*)?': {
get: [ mw.auth.access.read, mw.configuration.get ],
// post: [ mw.auth.access.admin, mw.label.post ],
},
'/project/:project/info/:path(*)': {
get: [ mw.auth.operations, mw.auth.access.read, mw.info.get ],

View File

@@ -1,6 +1,7 @@
const { projectOrganisations, vesselOrganisations/*, orgAccess */} = require('../../../lib/db/project/organisations');
const ServerUser = require('../../../lib/db/user/User');
const { Organisations } = require('@dougal/organisations');
const { ERROR, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
/** Second-order function.
* Returns a middleware that checks if the user has access to
@@ -14,11 +15,7 @@ function operation (operation) {
if (req.params.project) {
const projectOrgs = new Organisations(await projectOrganisations(req.params.project));
const availableOrgs = projectOrgs.accessToOperation(operation).filter(user.organisations);
console.log("Operation: ", operation);
console.log("User: ", user.name);
console.log("User orgs: ", user.organisations);
console.log("Project orgs: ", projectOrgs);
console.log("Available orgs: ", availableOrgs);
DEBUG(`operation = ${operation}, user = ${user?.name}, user orgs = %j, project orgs = %j, availableOrgs = %j`, user.organisations.toJSON(), projectOrgs.toJSON(), availableOrgs.toJSON());
if (availableOrgs.length > 0) {
next();
return;
@@ -26,16 +23,13 @@ function operation (operation) {
} else {
const vesselOrgs = new Organisations(await vesselOrganisations());
const availableOrgs = vesselOrgs.accessToOperation(operation).filter(user.organisations);
console.log("Operation: ", operation);
console.log("User: ", user.name);
console.log("User orgs: ", user.organisations);
console.log("Vessel orgs: ", vesselOrgs);
console.log("Available orgs: ", availableOrgs);
DEBUG(`operation = ${operation}, user = ${user?.name}, user orgs = %j, vessel orgs = %j, availableOrgs = %j`, user.organisations.toJSON(), vesselOrgs.toJSON(), availableOrgs.toJSON());
if (availableOrgs.length > 0) {
next();
return;
}
}
DEBUG(`Access denied to operation ${operation}.`);
next({status: 403, message: "Access denied"});
}
}

View File

@@ -1,41 +1,123 @@
const dns = require('dns');
const { Netmask } = require('netmask');
const ipaddr = require('ipaddr.js');
const { isIPv6, isIPv4 } = require('net');
const cfg = require('../../../lib/config');
const jwt = require('../../../lib/jwt');
const user = require('../../../lib/db/user');
const ServerUser = require('../../../lib/db/user/User');
const { ERROR, WARNING, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
function parseIP(ip) {
if (!ip || typeof ip !== 'string') {
WARNING('Invalid IP input:', ip);
return null;
}
// Handle comma-separated X-Forwarded-For (e.g., "87.90.254.127,")
const cleanIp = ip.split(',')[0].trim();
if (!cleanIp) {
WARNING('Empty IP after parsing:', ip);
return null;
}
// Convert IPv6-mapped IPv4 (e.g., ::ffff:127.0.0.1 -> 127.0.0.1)
if (cleanIp.startsWith('::ffff:') && isIPv4(cleanIp.split('::ffff:')[1])) {
return cleanIp.split('::ffff:')[1];
}
return cleanIp;
}
function normalizeCIDR(range) {
if (!range || typeof range !== 'string') {
WARNING('Invalid CIDR range:', range);
return null;
}
// If no /prefix, assume /32 for IPv4 or /128 for IPv6
if (!range.includes('/')) {
try {
const parsed = ipaddr.parse(range);
const prefix = parsed.kind() === 'ipv4' ? 32 : 128;
return `${range}/${prefix}`;
} catch (err) {
WARNING(`Failed to parse bare IP ${range}:`, err.message);
return null;
}
}
return range;
}
async function authorisedIP(req, res) {
const validIPs = await user.ip({active: true}); // Get all active IP logins
validIPs.forEach( i => i.$block = new Netmask(i.ip) );
validIPs.sort( (a, b) => b.$block.bitmask - a.$block.netmask ); // More specific IPs have precedence
for (const ip of validIPs) {
const block = ip.$block;
if (block.contains(req.ip)) {
const ip = parseIP(req.ip || req.headers['x-forwarded-for'] || req.headers['x-real-ip']);
DEBUG('authorisedIP:', { ip, headers: req.headers }); // Debug
if (!ip) {
WARNING('No valid IP provided:', { ip, headers: req.headers });
return false;
}
let addr;
try {
addr = ipaddr.parse(ip);
} catch (err) {
WARNING('Invalid IP:', ip, err.message);
return false;
}
const validIPs = await user.ip({ active: true }); // Get active IP logins
// Attach parsed CIDR to each IP entry
validIPs.forEach(i => {
const normalized = normalizeCIDR(i.ip);
if (!normalized) {
i.$range = null;
return;
}
try {
const [rangeAddr, prefix] = ipaddr.parseCIDR(normalized);
i.$range = { addr: rangeAddr, prefix };
} catch (err) {
WARNING(`Invalid CIDR range ${i.ip}:`, err.message);
i.$range = null; // Skip invalid ranges
}
});
// Filter out invalid ranges and sort by specificity (descending prefix length)
const validRanges = validIPs.filter(i => i.$range).sort((a, b) => b.$range.prefix - a.$range.prefix);
for (const ipEntry of validRanges) {
const { addr: rangeAddr, prefix } = ipEntry.$range;
try {
if (addr.match(rangeAddr, prefix)) {
const payload = {
...ip,
ip: req.ip,
...ipEntry,
ip,
autologin: true
};
delete payload.$block;
delete payload.$range;
delete payload.hash;
delete payload.active;
jwt.issue(payload, req, res);
return true;
}
} catch (err) {
WARNING(`Error checking range ${ipEntry.ip}:`, err.message);
continue;
}
}
return false;
}
async function authorisedHost(req, res) {
const validHosts = await user.host({active: true}); // Get all active host logins
const ip = parseIP(req.ip || req.headers['x-forwarded-for'] || req.headers['x-real-ip']);
DEBUG('authorisedHost:', { ip, headers: req.headers }); // Debug
if (!ip) {
WARNING('No valid IP for host check:', { ip, headers: req.headers });
return false;
}
const validHosts = await user.host({ active: true });
for (const key in validHosts) {
try {
const ip = await dns.promises.resolve(key);
if (ip == req.ip) {
const resolvedIPs = await dns.promises.resolve(key);
if (resolvedIPs.includes(ip)) {
const payload = {
...validHosts[key],
ip: req.ip,
ip,
autologin: true
};
delete payload.$block;
@@ -45,49 +127,28 @@ async function authorisedHost (req, res) {
return true;
}
} catch (err) {
if (err.code != "ENODATA") {
console.error(err);
if (err.code !== 'ENODATA') {
ERROR(`DNS error for host ${key}:`, err);
}
}
}
return false;
}
// TODO: Check client TLS certificates
// Probably will do this via Nginx with
// ssl_verify_client optional;
// and then putting either of the
// $ssl_client_s_dn or $ssl_client_escaped_cert
// variables into an HTTP header for Node
// to check (naturally, it must be ensured
// that a user cannot just insert the header
// in a request).
async function auth(req, res, next) {
if (res.headersSent) {
// Nothing to do, this request must have been
// handled already by another middleware.
return;
return; // Handled by another middleware
}
// Check for a valid JWT (already decoded by a previous
// middleware).
// Check for valid JWT
if (req.user) {
if (!req.user.autologin) {
// If this is not an automatic login, check if the token is in the
// second half of its lifetime. If so, reissue a new one, valid for
// another cfg.jwt.options.expiresIn seconds.
if (req.user.exp) {
if (!req.user.autologin && req.user.exp) {
const ttl = req.user.exp - Date.now() / 1000;
if (ttl < cfg.jwt.options.expiresIn / 2) {
const credentials = await ServerUser.fromSQL(null, req.user.id);
if (credentials) {
// Refresh token
payload = Object.assign({}, credentials.toJSON());
jwt.issue(Object.assign({}, credentials.toJSON()), req, res);
}
const payload = Object.assign({}, credentials.toJSON());
jwt.issue(payload, req, res);
}
}
}
@@ -95,19 +156,27 @@ async function auth (req, res, next) {
return;
}
// Check if the IP is known to us
// Check IP and host
if (await authorisedIP(req, res)) {
next();
return;
}
// Check if the hostname is known to us
if (await authorisedHost(req, res)) {
next();
return;
}
next({status: 401, message: "Not authorised"});
// If *all* else fails, check if the user came with a cookie
// (see https://gitlab.com/wgp/dougal/software/-/issues/335)
if (req.cookies.JWT) {
const token = req.cookies.JWT;
delete req.cookies.JWT;
DEBUG("falling back to cookie-based authentication");
req.user = await jwt.checkValidCredentials({jwt: token});
return await auth(req, res, next);
}
next({ status: 401, message: 'Not authorised' });
}
module.exports = auth;

View File

@@ -23,9 +23,9 @@ function ifNoneMatch (req, res, next) {
if (cached) {
DEBUG("ETag match. Returning cached response (ETag: %s, If-None-Match: %s) for %s %s",
cached.etag, req.get("If-None-Match"), req.method, req.url);
setHeaders(res, cached.headers);
if (req.method == "GET" || req.method == "HEAD") {
res.status(304).send();
setHeaders(res, cached.headers);
res.status(304).end();
// No next()
} else if (!isIdempotentMethod(req.method)) {
res.status(412).send();

View File

@@ -66,8 +66,18 @@ const rels = [
function invalidateCache (data, cache) {
return new Promise((resolve, reject) => {
if (!data) {
ERROR("invalidateCache called with no data");
return;
}
if (!data.payload) {
ERROR("invalidateCache called without a payload; channel = %s", data.channel);
return;
}
const channel = data.channel;
const project = data.payload.pid ?? data.payload?.new?.pid ?? data.payload?.old?.pid;
const project = data.payload?.pid ?? data.payload?.new?.pid ?? data.payload?.old?.pid;
const operation = data.payload.operation;
const table = data.payload.table;
const fields = { channel, project, operation, table };

View File

@@ -0,0 +1,146 @@
const Busboy = require('busboy');
const { parse } = require('csv-parse/sync');
async function middleware(req, res, next) {
const contentType = req.headers['content-type'] || '';
let csvText = null;
let filename = null;
if (req.params.filename && contentType.startsWith('text/csv')) {
csvText = typeof req.body === 'string' ? req.body : req.body.toString('utf8');
filename = req.params.filename;
processCsv();
} else if (contentType.startsWith('multipart/form-data')) {
const busboy = Busboy({ headers: req.headers });
let found = false;
busboy.on('file', (name, file, info) => {
if (found) {
file.resume();
return;
}
if (info.mimeType === 'text/csv') {
found = true;
filename = info.filename || 'unnamed.csv';
csvText = '';
file.setEncoding('utf8');
file.on('data', (data) => { csvText += data; });
file.on('end', () => {});
} else {
file.resume();
}
});
busboy.on('field', () => {}); // Ignore fields
busboy.on('finish', () => {
if (!found) {
return next();
}
processCsv();
});
req.pipe(busboy);
return;
} else {
return next();
}
function processCsv() {
let records;
try {
records = parse(csvText, {
relax_quotes: true,
quote: '"',
escape: '"',
skip_empty_lines: true,
trim: true
});
} catch (e) {
return res.status(400).json({ error: 'Invalid CSV' });
}
if (!records.length) {
return res.status(400).json({ error: 'Empty CSV' });
}
const headers = records[0].map(h => h.toLowerCase().trim());
const rows = records.slice(1);
let lastDate = null;
let lastTime = null;
const currentDate = new Date().toISOString().slice(0, 10);
const currentTime = new Date().toISOString().slice(11, 19);
const events = [];
for (let row of rows) {
let object = { labels: [] };
for (let k = 0; k < headers.length; k++) {
let key = headers[k];
let val = row[k] ? row[k].trim() : '';
if (!key) continue;
if (['remarks', 'event', 'comment', 'comments', 'text'].includes(key)) {
object.remarks = val;
} else if (key === 'label') {
if (val) object.labels.push(val);
} else if (key === 'labels') {
if (val) object.labels.push(...val.split(';').map(l => l.trim()).filter(l => l));
} else if (key === 'sequence' || key === 'seq') {
if (val) object.sequence = Number(val);
} else if (['point', 'shot', 'shotpoint'].includes(key)) {
if (val) object.point = Number(val);
} else if (key === 'date') {
object.date = val;
} else if (key === 'time') {
object.time = val;
} else if (key === 'timestamp') {
object.timestamp = val;
} else if (key === 'latitude') {
object.latitude = parseFloat(val);
} else if (key === 'longitude') {
object.longitude = parseFloat(val);
}
}
if (!object.remarks) continue;
let useSeqPoint = Number.isFinite(object.sequence) && Number.isFinite(object.point);
let tstamp = null;
if (!useSeqPoint) {
if (object.timestamp) {
tstamp = new Date(object.timestamp);
}
if (!tstamp || isNaN(tstamp.getTime())) {
let dateStr = object.date || lastDate || currentDate;
let timeStr = object.time || lastTime || currentTime;
if (timeStr.length === 5) timeStr += ':00';
let full = `${dateStr}T${timeStr}.000Z`;
tstamp = new Date(full);
if (isNaN(tstamp.getTime())) continue;
}
if (object.date) lastDate = object.date;
if (object.time) lastTime = object.time;
}
let event = {
remarks: object.remarks,
labels: object.labels,
meta: {
author: "*CSVImport*",
"*CSVImport*": {
filename,
tstamp: new Date().toISOString()
}
}
};
if (!isNaN(object.latitude) && !isNaN(object.longitude)) {
event.meta.geometry = {
type: "Point",
coordinates: [object.longitude, object.latitude]
};
}
if (useSeqPoint) {
event.sequence = object.sequence;
event.point = object.point;
} else if (tstamp) {
event.tstamp = tstamp.toISOString();
} else {
continue;
}
events.push(event);
}
req.body = events;
next();
}
}
module.exports = middleware;

View File

@@ -0,0 +1,18 @@
const { event } = require('../../../../lib/db');
module.exports = async function (req, res, next) {
try {
if (req.params.project && req.params.filename) {
await event.unimport(req.params.project, req.params.filename, req.query);
res.status(204).end();
} else {
res.status(400).send({message: "Malformed request"});
}
next();
} catch (err) {
next(err);
}
};

View File

@@ -0,0 +1,6 @@
module.exports = {
csv: require('./csv'),
put: require('./put'),
delete: require('./delete'),
}

View File

@@ -0,0 +1,16 @@
const { event } = require('../../../../lib/db');
module.exports = async function (req, res, next) {
try {
const payload = req.body;
await event.import(req.params.project, payload, req.query);
res.status(200).send(payload);
next();
} catch (err) {
next(err);
}
};

View File

@@ -7,5 +7,6 @@ module.exports = {
put: require('./put'),
patch: require('./patch'),
delete: require('./delete'),
changes: require('./changes')
changes: require('./changes'),
import: require('./import'),
}

View File

@@ -10,7 +10,6 @@ function json (req, res, next) {
} else {
res.status(404).send({message: "Not found"});
}
next();
}
function yaml (req, res, next) {
@@ -19,7 +18,6 @@ function yaml (req, res, next) {
} else {
res.status(404).send({message: "Not found"});
}
next();
}
function csv (req, res, next) {
@@ -33,7 +31,6 @@ function csv (req, res, next) {
} else {
res.status(404).send({message: "Not found"});
}
next();
}
module.exports = async function (req, res, next) {
@@ -53,9 +50,10 @@ module.exports = async function (req, res, next) {
await handlers[mimetype](req, res, next);
} else {
res.status(406).send();
next();
}
next();
} catch (err) {
console.error(err);
next(err);
}
}

View File

@@ -1,4 +1,3 @@
const project = require('../../lib/db/project');
const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
class DetectProjectConfigurationChange {
@@ -10,7 +9,7 @@ class DetectProjectConfigurationChange {
// Grab project configurations.
// NOTE that this will run asynchronously
this.run({channel: "project"}, ctx);
//this.run({channel: "project"}, ctx);
}
async run (data, ctx) {
@@ -28,13 +27,13 @@ class DetectProjectConfigurationChange {
try {
DEBUG("Project configuration change detected")
const projects = await project.get();
project.organisations.setCache(projects);
const projects = await ctx.db.project.get();
ctx.db.project.organisations.setCache(projects);
const _ctx_data = {};
for (let pid of projects.map(i => i.pid)) {
DEBUG("Retrieving configuration for", pid);
const cfg = await project.configuration.get(pid);
const cfg = await ctx.db.project.configuration.get(pid);
if (cfg?.archived === true) {
DEBUG(pid, "is archived. Ignoring");
continue;

View File

@@ -1,5 +1,3 @@
const { schema2pid } = require('../../lib/db/connection');
const { event } = require('../../lib/db');
const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
class DetectSoftStart {
@@ -33,14 +31,19 @@ class DetectSoftStart {
const prev = this.prev?.payload?.new?.meta;
// DEBUG("%j", prev);
// DEBUG("%j", cur);
DEBUG("cur.num_guns: %d\ncur.num_active: %d\nprv.num_active: %d\ntest passed: %j", cur.num_guns, cur.num_active, prev.num_active, cur.num_active >= 1 && !prev.num_active && cur.num_active < cur.num_guns);
if (cur.lineStatus == "online" || prev.lineStatus == "online") {
DEBUG("lineStatus is online, assuming not in a soft start situation");
return;
}
DEBUG("cur.num_guns: %d\ncur.num_active: %d\nprv.num_active: %d\ncur.num_nofire: %d\nprev.num_nofire: %d", cur.num_guns, cur.num_active, prev.num_active, cur.num_nofire, prev.num_nofire);
if (cur.num_active >= 1 && !prev.num_active && cur.num_active < cur.num_guns) {
INFO("Soft start detected @", cur.tstamp);
// FIXME Shouldn't need to use schema2pid as pid already present in payload.
const projectId = await schema2pid(cur._schema ?? prev._schema);
const projectId = await ctx.schema2pid(cur._schema ?? prev._schema);
// TODO: Try and grab the corresponding comment from the configuration?
const payload = {
@@ -50,12 +53,16 @@ class DetectSoftStart {
meta: {auto: true, author: `*${this.constructor.name}*`}
};
DEBUG("Posting event", projectId, payload);
await event.post(projectId, payload);
if (ctx.dryRun) {
DEBUG(`DRY RUN: await ctx.db.event.post(${projectId}, ${payload});`);
} else {
await ctx.db.event.post(projectId, payload);
}
} else if (cur.num_active == cur.num_guns && prev.num_active < cur.num_active) {
} else if ((cur.num_active == cur.num_guns || (prev.num_nofire > 0 && cur.num_nofire == 0)) && prev.num_active < cur.num_active) {
INFO("Full volume detected @", cur.tstamp);
const projectId = await schema2pid(cur._schema ?? prev._schema);
const projectId = await ctx.schema2pid(cur._schema ?? prev._schema);
// TODO: Try and grab the corresponding comment from the configuration?
const payload = {
@@ -65,7 +72,11 @@ class DetectSoftStart {
meta: {auto: true, author: `*${this.constructor.name}*`}
};
DEBUG("Posting event", projectId, payload);
await event.post(projectId, payload);
if (ctx.dryRun) {
DEBUG(`DRY RUN: await ctx.db.event.post(${projectId}, ${payload});`);
} else {
await ctx.db.event.post(projectId, payload);
}
}
} catch (err) {

View File

@@ -1,5 +1,3 @@
const { schema2pid } = require('../../lib/db/connection');
const { event } = require('../../lib/db');
const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
class DetectSOLEOL {
@@ -43,7 +41,7 @@ class DetectSOLEOL {
// We must use schema2pid because the pid may not have been
// populated for this event.
const projectId = await schema2pid(cur._schema ?? prev._schema);
const projectId = await ctx.schema2pid(cur._schema ?? prev._schema);
const labels = ["FSP", "FGSP"];
const remarks = `SEQ ${cur._sequence}, SOL ${cur.lineName}, BSP: ${(cur.speed*3.6/1.852).toFixed(1)} kt, Water depth: ${Number(cur.waterDepth).toFixed(0)} m.`;
const payload = {
@@ -55,24 +53,32 @@ class DetectSOLEOL {
meta: {auto: true, author: `*${this.constructor.name}*`}
}
INFO("Posting event", projectId, payload);
await event.post(projectId, payload);
if (ctx.dryRun) {
DEBUG(`DRY RUN: await ctx.db.event.post(${projectId}, ${payload});`);
} else {
await ctx.db.event.post(projectId, payload);
}
} else if (prev.lineName == cur.lineName && prev._sequence == cur._sequence &&
prev.lineStatus == "online" && cur.lineStatus != "online" && sequence) {
INFO("Transition to OFFLINE detected");
const projectId = await schema2pid(prev._schema ?? cur._schema);
const projectId = await ctx.schema2pid(prev._schema ?? cur._schema);
const labels = ["LSP", "LGSP"];
const remarks = `SEQ ${cur._sequence}, EOL ${cur.lineName}, BSP: ${(cur.speed*3.6/1.852).toFixed(1)} kt, Water depth: ${Number(cur.waterDepth).toFixed(0)} m.`;
const remarks = `SEQ ${prev._sequence}, EOL ${prev.lineName}, BSP: ${(prev.speed*3.6/1.852).toFixed(1)} kt, Water depth: ${Number(prev.waterDepth).toFixed(0)} m.`;
const payload = {
type: "sequence",
sequence,
point: cur._point,
point: prev._point,
remarks,
labels,
meta: {auto: true, author: `*${this.constructor.name}*`}
}
INFO("Posting event", projectId, payload);
await event.post(projectId, payload);
if (ctx.dryRun) {
DEBUG(`DRY RUN: await ctx.db.event.post(${projectId}, ${payload});`);
} else {
await ctx.db.event.post(projectId, payload);
}
}
} catch (err) {

View File

@@ -8,37 +8,6 @@ const Handlers = [
require('./detect-fdsp')
];
function init (ctx) {
const instances = Handlers.map(Handler => new Handler(ctx));
function prepare (data, ctx) {
const promises = [];
for (let instance of instances) {
const promise = new Promise(async (resolve, reject) => {
try {
DEBUG("Run", instance.author);
const result = await instance.run(data, ctx);
DEBUG("%s result: %O", instance.author, result);
resolve(result);
} catch (err) {
ERROR("%s error:\n%O", instance.author, err);
reject(err);
}
});
promises.push(promise);
}
return promises;
}
function despatch (data, ctx) {
return Promise.allSettled(prepare(data, ctx));
}
return { instances, prepare, despatch };
}
module.exports = {
Handlers,
init
};

View File

@@ -1,6 +1,3 @@
const { event, project } = require('../../lib/db');
const { withinValidity } = require('../../lib/utils/ranges');
const unique = require('../../lib/utils/unique');
const { ALERT, ERROR, WARNING, NOTICE, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
class ReportLineChangeTime {
@@ -44,7 +41,7 @@ class ReportLineChangeTime {
async function getLineChangeTime (data, forward = false) {
if (forward) {
const ospEvents = await event.list(projectId, {label: "FGSP"});
const ospEvents = await ctx.db.event.list(projectId, {label: "FGSP"});
// DEBUG("ospEvents", ospEvents);
const osp = ospEvents.filter(i => i.tstamp > data.tstamp).pop();
DEBUG("fsp", osp);
@@ -55,7 +52,7 @@ class ReportLineChangeTime {
return { lineChangeTime: osp.tstamp - data.tstamp, osp };
}
} else {
const ospEvents = await event.list(projectId, {label: "LGSP"});
const ospEvents = await ctx.db.event.list(projectId, {label: "LGSP"});
// DEBUG("ospEvents", ospEvents);
const osp = ospEvents.filter(i => i.tstamp < data.tstamp).shift();
DEBUG("lsp", osp);
@@ -96,16 +93,20 @@ class ReportLineChangeTime {
const opts = {jpq};
if (Array.isArray(seq)) {
opts.sequences = unique(seq).filter(i => !!i);
opts.sequences = ctx.unique(seq).filter(i => !!i);
} else {
opts.sequence = seq;
}
const staleEvents = await event.list(projectId, opts);
const staleEvents = await ctx.db.event.list(projectId, opts);
DEBUG(staleEvents.length ?? 0, "events to delete");
for (let staleEvent of staleEvents) {
DEBUG(`Deleting event id ${staleEvent.id} (seq = ${staleEvent.sequence}, point = ${staleEvent.point})`);
await event.del(projectId, staleEvent.id);
if (ctx.dryRun) {
DEBUG(`await ctx.db.event.del(${projectId}, ${staleEvent.id});`);
} else {
await ctx.db.event.del(projectId, staleEvent.id);
}
}
}
}
@@ -180,7 +181,11 @@ class ReportLineChangeTime {
const maybePostEvent = async (projectId, payload) => {
DEBUG("Posting event", projectId, payload);
await event.post(projectId, payload);
if (ctx.dryRun) {
DEBUG(`await ctx.db.event.post(${projectId}, ${payload});`);
} else {
await ctx.db.event.post(projectId, payload);
}
}
@@ -192,7 +197,7 @@ class ReportLineChangeTime {
const data = n;
DEBUG("INSERT seen: will add lct events related to ", data.id);
if (withinValidity(data.validity)) {
if (ctx.withinValidity(data.validity)) {
DEBUG("Event within validity period", data.validity, new Date());
data.tstamp = new Date(data.tstamp);

View File

@@ -1,29 +1,101 @@
const nodeAsync = require('async'); // npm install async
const { listen } = require('../lib/db/notify');
const db = require('../lib/db'); // Adjust paths; include all needed DB utils
const { schema2pid } = require('../lib/db/connection');
const unique = require('../lib/utils/unique'); // If needed by handlers
const withinValidity = require('../lib/utils/ranges').withinValidity; // If needed
const { ALERT, ERROR, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
// List of handler classes (add more as needed)
const handlerClasses = require('./handlers').Handlers;
// Channels to listen to (hardcoded for simplicity; could scan handlers for mentions)
const channels = require('../lib/db/channels');
const handlers = require('./handlers');
const { ActionsQueue } = require('../lib/queue');
const { ERROR, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
function start () {
// Queue config: Process one at a time for order; max retries=3
const eventQueue = nodeAsync.queue(async (task, callback) => {
const { data, ctx } = task;
DEBUG(`Processing event on channel ${data.channel} with timestamp ${data._received ?? 'unknown'}`);
const queue = new ActionsQueue();
const ctx = {}; // Context object
for (const handler of ctx.handlers) {
try {
await handler.run(data, ctx);
} catch (err) {
ERROR(`Error in handler ${handler.constructor.name}:`, err);
// Retry logic: Could add task.retries++, re-enqueue if < max
}
}
const { prepare, despatch } = handlers.init(ctx);
if (typeof callback === 'function') {
// async v3.2.6+ does not use callsbacks with AsyncFunctions, but anyway
callback();
}
}, 1); // Concurrency=1 for strict order
listen(channels, function (data) {
DEBUG("Incoming data", data);
// We don't bother awaiting
queue.enqueue(() => despatch(data, ctx));
DEBUG("Queue size", queue.length());
eventQueue.error((err, task) => {
ALERT(`Queue error processing task:`, err, task);
});
INFO("Events manager started");
// Main setup function (call from server init)
async function setupEventHandlers(projectsConfig) {
// Shared context
const ctx = {
dryRun: Boolean(process.env.DOUGAL_HANDLERS_DRY_RUN) ?? false, // If true, don't commit changes
projects: { configuration: projectsConfig }, // From user config
handlers: handlerClasses.map(Cls => new Cls()), // Instances
// DB utils (add more as needed)
db,
schema2pid,
unique,
withinValidity
// Add other utils, e.g., ctx.logger = DEBUG;
};
// Optional: Replay recent events on startup to rebuild state
// await replayRecentEvents(ctx);
// Setup listener
const subscriber = await listen(channels, (rawData) => {
const data = {
...rawData,
enqueuedAt: new Date() // For monitoring
};
eventQueue.push({ data, ctx });
});
DEBUG('Event handler system initialized with channels:', channels);
if (ctx.dryRun) {
DEBUG('DRY RUNNING');
}
module.exports = { start }
if (require.main === module) {
start();
// Return for cleanup if needed
return {
close: () => {
subscriber.events.removeAllListeners();
subscriber.close();
eventQueue.kill();
}
};
}
// Optional: Replay last N events to rebuild handler state (e.g., this.prev)
// async function replayRecentEvents(ctx) {
// try {
// // Example: Fetch last 10 realtime events, sorted by tstamp
// const recentRealtime = await event.listAllProjects({ channel: 'realtime', limit: 10, sort: 'tstamp DESC' });
// // Assume event.listAllProjects is a custom DB method; implement if needed
//
// // Enqueue in original order (reverse sort)
// recentRealtime.reverse().forEach((evt) => {
// const data = { channel: 'realtime', payload: { new: evt } };
// eventQueue.push({ data, ctx });
// });
//
// // Similarly for 'event' channel if needed
// DEBUG('Replayed recent events for state rebuild');
// } catch (err) {
// ERROR('Error replaying events:', err);
// }
// }
module.exports = { setupEventHandlers };

View File

@@ -2,18 +2,37 @@
const { ERROR, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
async function getProjectConfigurations (opts = {}) {
const includeArchived = {includeArchived: false, ...opts};
let projectConfigurations = {};
try {
const db = require('./lib/db');
const pids = (await db.project.get())
.filter(i => includeArchived || !i.archived)
.map(i => i.pid);
for (const pid of pids) {
DEBUG(`Reading project configuration for ${pid}`);
const cfg = await db.project.configuration.get(pid);
projectConfigurations[pid] = cfg;
}
} catch (err) {
ERROR("Failed to get project configurations");
ERROR(err);
}
return projectConfigurations;
}
async function main () {
// Check that we're running against the correct database version
const version = require('./lib/version');
INFO("Running version", await version.describe());
version.compatible()
.then( (versions) => {
.then( async (versions) => {
try {
const api = require('./api');
const ws = require('./ws');
const periodicTasks = require('./periodic-tasks').init();
const { fork } = require('child_process');
const { setupEventHandlers } = require('./events');
const port = process.env.HTTP_PORT || 3000;
const host = process.env.HTTP_HOST || "127.0.0.1";
@@ -25,33 +44,31 @@ async function main () {
periodicTasks.start();
const eventManagerPath = [__dirname, "events"].join("/");
const eventManager = fork(eventManagerPath, /*{ stdio: 'ignore' }*/);
const projectConfigurations = await getProjectConfigurations();
const handlerSystem = await setupEventHandlers(projectConfigurations);
process.on("SIGINT", async () => {
DEBUG("Interrupted (SIGINT)");
eventManager.kill()
handlerSystem.close();
await periodicTasks.cleanup();
process.exit(0);
})
process.on("SIGHUP", async () => {
DEBUG("Stopping (SIGHUP)");
eventManager.kill()
handlerSystem.close();
await periodicTasks.cleanup();
process.exit(0);
})
process.on('beforeExit', async () => {
DEBUG("Preparing to exit");
eventManager.kill()
handlerSystem.close();
await periodicTasks.cleanup();
});
process.on('exit', async () => {
DEBUG("Exiting");
// eventManager.kill()
// periodicTasks.cleanup();
});
} catch (err) {
ERROR(err);

View File

@@ -0,0 +1,105 @@
const { DEBUG, ERROR } = require('DOUGAL_ROOT/debug')(__filename);
const { setSurvey, transaction } = require('../connection');
/** Remove a previous import from the database.
*
* ATTENTION!
*
* This will not just mark the events as deleted but actually
* remove them.
*/
async function bulk_unimport (projectId, filename, opts = {}) {
const client = opts.client ?? await setSurvey(projectId);
try {
const text = `
DELETE
FROM event_log
WHERE meta ? 'author'
AND meta->(meta->>'author')->>'filename' = $1;
`;
const values = [ filename ];
DEBUG("Removing all event data imported from filename '%s'", filename);
await client.query(text, values);
} catch (err) {
err.origin = __filename;
throw err;
} finally {
if (client !== opts.client) client.release();
}
return;
}
async function bulk_import (projectId, payload, opts = {}) {
const client = opts.client ?? await setSurvey(projectId);
try {
if (!payload.length) {
DEBUG("Called with no rows to be imported. Returning");
return [];
}
const filename = payload[0].meta[payload[0].meta.author].filename;
// Delete previous data from this file
await transaction.begin(client);
await bulk_unimport(projectId, filename, {client});
// Prepare arrays for each column
const tstamps = [];
const sequences = [];
const points = [];
const remarks = [];
const labels = [];
const metas = [];
for (const event of payload) {
tstamps.push(event.tstamp ? new Date(event.tstamp) : null);
sequences.push(Number.isInteger(event.sequence) ? event.sequence : null);
points.push(Number.isInteger(event.point) ? event.point : null);
remarks.push(event.remarks || '');
labels.push(Array.isArray(event.labels) && event.labels.length
? `{${event.labels.map(l => `"${l.replace(/"/g, '""')}"`).join(',')}}`
: '{}'
);
metas.push(event.meta ? JSON.stringify(event.meta) : '{}');
}
const text = `
INSERT INTO event_log (tstamp, sequence, point, remarks, labels, meta)
SELECT
UNNEST($1::TIMESTAMP[]) AS tstamp,
UNNEST($2::INTEGER[]) AS sequence,
UNNEST($3::INTEGER[]) AS point,
replace_placeholders(UNNEST($4::TEXT[]), UNNEST($1::TIMESTAMP[]), UNNEST($2::INTEGER[]), UNNEST($3::INTEGER[])) AS remarks,
UNNEST($5::TEXT[])::TEXT[] AS labels,
UNNEST($6::JSONB[]) AS meta
RETURNING id;
`;
const values = [ tstamps, sequences, points, remarks, labels, metas ];
DEBUG("Importing %d rows from filename '%s'", payload.length, filename);
const res = await client.query(text, values);
transaction.commit(client);
return res.rows.map(row => row.id);
} catch (err) {
err.origin = __filename;
throw err;
} finally {
if (client !== opts.client) client.release();
}
return;
}
module.exports = { import: bulk_import, unimport: bulk_unimport };

View File

@@ -6,5 +6,7 @@ module.exports = {
put: require('./put'),
patch: require('./patch'),
del: require('./delete'),
changes: require('./changes')
changes: require('./changes'),
import: require('./import').import,
unimport: require('./import').unimport,
}

View File

@@ -0,0 +1,37 @@
const { DEBUG, ERROR } = require('DOUGAL_ROOT/debug')(__filename);
const { setSurvey, transaction } = require('../connection');
/** Remove a previous import from the database.
*
* ATTENTION!
*
* This will not just mark the events as deleted but actually
* remove them.
*/
async function unimport (projectId, filename, opts = {}) {
const client = await setSurvey(projectId);
try {
const text = `
DELETE
FROM event_log
WHERE meta ? 'author'
AND meta->(meta->'author')->>'filename' = $1;
`;
const values = [ filename ];
DEBUG("Removing all event data imported from filename '%s'", filename);
await client.query(text, values);
} catch (err) {
err.origin = __filename;
throw err;
} finally {
client.release();
}
return;
}
module.exports = post;

View File

@@ -1,52 +0,0 @@
const Queue = require('./queue');
// Inspired by:
// https://stackoverflow.com/questions/53540348/js-async-await-tasks-queue#53540586
class ActionsQueue extends Queue {
constructor (items = []) {
super(items);
this.pending = false;
}
enqueue (action) {
return new Promise ((resolve, reject) => {
super.enqueue({ action, resolve, reject });
this.dequeue();
});
}
async dequeue () {
if (this.pending) {
return false;
}
const item = super.dequeue();
if (!item) {
return false;
}
try {
this.pending = true;
const result = await item.action(this);
this.pending = false;
item.resolve(result);
} catch (err) {
this.pending = false;
item.reject(err);
} finally {
this.dequeue();
}
}
}
module.exports = ActionsQueue;

View File

@@ -1,6 +0,0 @@
module.exports = {
Queue: require('./queue'),
ActionsQueue: require('./actions-queue')
};

View File

@@ -1,22 +0,0 @@
class Queue {
constructor (items = []) {
this.items = items;
}
enqueue (item) {
this.items.push(item);
}
dequeue () {
return this.items.shift();
}
length () {
return this.items.length;
}
}
module.exports = Queue;

View File

@@ -29,7 +29,9 @@
"@dougal/binary": "file:../../modules/@dougal/binary",
"@dougal/organisations": "file:../../modules/@dougal/organisations",
"@dougal/user": "file:../../modules/@dougal/user",
"async": "^3.2.6",
"body-parser": "gitlab:aaltronav/contrib/expressjs/body-parser",
"busboy": "^1.6.0",
"compression": "^1.8.1",
"cookie-parser": "^1.4.5",
"csv": "^6.3.3",
@@ -37,11 +39,11 @@
"debug": "^4.3.4",
"express": "^4.17.1",
"express-jwt": "^8.4.1",
"ipaddr.js": "^1.9.1",
"json2csv": "^5.0.6",
"jsonwebtoken": "^9.0.2",
"leaflet-headless": "git+https://git@gitlab.com/aaltronav/contrib/leaflet-headless.git#devel",
"marked": "^4.0.12",
"netmask": "^2.0.2",
"node-fetch": "^2.6.1",
"nunjucks": "^3.2.3",
"path-to-regexp": "^6.2.1",

View File

@@ -20,8 +20,10 @@ function start (server, pingInterval=30000) {
const exp = decoded?.exp;
if (exp) {
const timeout = (exp*1000 - Date.now()) / 2;
if (!socket._jwtRefresh) {
socket._jwtRefresh = setTimeout(() => refreshJwt(token), timeout);
console.log(`Scheduled JWT refresh in ${timeout/1000} seconds at time ${(new Date(Date.now() + timeout)).toISOString()}`);
}
} else {
console.log("Token has no exp claim. Refresh not scheduled");
}
@@ -76,8 +78,8 @@ function start (server, pingInterval=30000) {
});
socket.on('close', () => {
if (socket._jwtTimeout) {
clearTimeout(socket._jwtTimeout);
if (socket._jwtRefresh) {
clearTimeout(socket._jwtRefresh);
}
});
});

52
package-lock.json generated
View File

@@ -5366,14 +5366,6 @@
"node": ">= 0.10"
}
},
"lib/www/client/source/node_modules/ipaddr.js": {
"version": "2.1.0",
"dev": true,
"license": "MIT",
"engines": {
"node": ">= 10"
}
},
"lib/www/client/source/node_modules/is-arrayish": {
"version": "0.2.1",
"dev": true,
@@ -9367,7 +9359,9 @@
"@dougal/binary": "file:../../modules/@dougal/binary",
"@dougal/organisations": "file:../../modules/@dougal/organisations",
"@dougal/user": "file:../../modules/@dougal/user",
"async": "^3.2.6",
"body-parser": "gitlab:aaltronav/contrib/expressjs/body-parser",
"busboy": "^1.6.0",
"compression": "^1.8.1",
"cookie-parser": "^1.4.5",
"csv": "^6.3.3",
@@ -9375,11 +9369,11 @@
"debug": "^4.3.4",
"express": "^4.17.1",
"express-jwt": "^8.4.1",
"ipaddr.js": "^1.9.1",
"json2csv": "^5.0.6",
"jsonwebtoken": "^9.0.2",
"leaflet-headless": "git+https://git@gitlab.com/aaltronav/contrib/leaflet-headless.git#devel",
"marked": "^4.0.12",
"netmask": "^2.0.2",
"node-fetch": "^2.6.1",
"nunjucks": "^3.2.3",
"path-to-regexp": "^6.2.1",
@@ -10180,13 +10174,6 @@
"node": ">= 0.6"
}
},
"lib/www/server/node_modules/netmask": {
"version": "2.0.2",
"license": "MIT",
"engines": {
"node": ">= 0.4.0"
}
},
"lib/www/server/node_modules/nunjucks": {
"version": "3.2.4",
"license": "BSD-2-Clause",
@@ -14185,6 +14172,11 @@
"node": ">=0.8"
}
},
"node_modules/async": {
"version": "3.2.6",
"resolved": "https://registry.npmjs.org/async/-/async-3.2.6.tgz",
"integrity": "sha512-htCUDlxyyCLMgaM3xXg0C0LW2xqfuQ6p05pCEIsXuyQ+a1koYKTuBMzRNwmybfLgvJDMd0r1LTn4+E0Ti6C2AA=="
},
"node_modules/asynckit": {
"version": "0.4.0",
"resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz",
@@ -14289,6 +14281,17 @@
"node": ">=0.10.0"
}
},
"node_modules/busboy": {
"version": "1.6.0",
"resolved": "https://registry.npmjs.org/busboy/-/busboy-1.6.0.tgz",
"integrity": "sha512-8SFQbg/0hQ9xy3UNTB0YEnsNBbWfhf7RtnzpL7TkBiTBRfrQ9Fxcnz7VJsleJpyp6rVLvXiuORqjlHi5q+PYuA==",
"dependencies": {
"streamsearch": "^1.1.0"
},
"engines": {
"node": ">=10.16.0"
}
},
"node_modules/bytes": {
"version": "3.1.2",
"resolved": "https://registry.npmjs.org/bytes/-/bytes-3.1.2.tgz",
@@ -15585,6 +15588,15 @@
"node": ">=12"
}
},
"node_modules/ipaddr.js": {
"version": "2.2.0",
"resolved": "https://registry.npmjs.org/ipaddr.js/-/ipaddr.js-2.2.0.tgz",
"integrity": "sha512-Ag3wB2o37wslZS19hZqorUnrnzSkpOVy+IiiDEiTqNubEYpYuHWIf6K4psgN2ZWKExS4xhVCrRVfb/wfW8fWJA==",
"dev": true,
"engines": {
"node": ">= 10"
}
},
"node_modules/is-buffer": {
"version": "1.1.6",
"resolved": "https://registry.npmjs.org/is-buffer/-/is-buffer-1.1.6.tgz",
@@ -16510,6 +16522,14 @@
"node": ">= 0.8"
}
},
"node_modules/streamsearch": {
"version": "1.1.0",
"resolved": "https://registry.npmjs.org/streamsearch/-/streamsearch-1.1.0.tgz",
"integrity": "sha512-Mcc5wHehp9aXz1ax6bZUyY5afg9u2rv5cqQI3mRrYkGC8rW2hM02jWuwjtL++LS5qinSyhj2QfLyNsuc+VsExg==",
"engines": {
"node": ">=10.0.0"
}
},
"node_modules/string_decoder": {
"version": "1.3.0",
"resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.3.0.tgz",