Merge branch '18-implement-server-push' into 'devel'

Resolve "Implement server push"

Closes #18

See merge request wgp/dougal/software!1
This commit is contained in:
D. Berge
2020-09-06 12:30:33 +00:00
17 changed files with 339 additions and 55 deletions

View File

@@ -144,6 +144,39 @@ CREATE EXTENSION IF NOT EXISTS postgis_topology WITH SCHEMA topology;
COMMENT ON EXTENSION postgis_topology IS 'PostGIS topology spatial types and functions';
--
-- Name: notify(); Type: FUNCTION; Schema: public; Owner: postgres
--
CREATE FUNCTION public.notify() RETURNS trigger
LANGUAGE plpgsql
AS $$
DECLARE
channel text := TG_ARGV[0];
payload text;
BEGIN
payload := json_build_object(
'tstamp', CURRENT_TIMESTAMP,
'operation', TG_OP,
'schema', TG_TABLE_SCHEMA,
'table', TG_TABLE_NAME,
'old', row_to_json(OLD),
'new', row_to_json(NEW)
)::text;
IF octet_length(payload) < 8000 THEN
PERFORM pg_notify(channel, payload);
ELSE
-- We need to find another solution
RAISE INFO 'Payload over limit';
END IF;
RETURN NULL;
END;
$$;
ALTER FUNCTION public.notify() OWNER TO postgres;
--
-- Name: set_survey(text); Type: PROCEDURE; Schema: public; Owner: postgres
--
@@ -168,7 +201,8 @@ SET default_table_access_method = heap;
CREATE TABLE public.projects (
pid text NOT NULL,
name text NOT NULL,
schema text NOT NULL
schema text NOT NULL,
meta jsonb DEFAULT '{}'::jsonb NOT NULL
);
@@ -218,6 +252,20 @@ ALTER TABLE ONLY public.projects
CREATE INDEX tstamp_idx ON public.real_time_inputs USING btree (tstamp DESC);
--
-- Name: projects projects_tg; Type: TRIGGER; Schema: public; Owner: postgres
--
CREATE TRIGGER projects_tg AFTER INSERT OR DELETE OR UPDATE ON public.projects FOR EACH ROW EXECUTE FUNCTION public.notify('project');
--
-- Name: real_time_inputs real_time_inputs_tg; Type: TRIGGER; Schema: public; Owner: postgres
--
CREATE TRIGGER real_time_inputs_tg AFTER INSERT ON public.real_time_inputs FOR EACH ROW EXECUTE FUNCTION public.notify('realtime');
--
-- PostgreSQL database dump complete
--

View File

@@ -180,7 +180,8 @@ export default {
sequences: { type: Object, default: null },
defaultTimestamp: { type: [ Date, String, Number, Function ], default: null },
defaultSequence: { type: Number, default: null },
defaultShotNumber: { type: Number, default: null },
defaultShotpoint: { type: Number, default: null },
eventMode: { type: String, default: "timed" },
presetRemarks: { type: [ Object, Array ], default: null },
presetLabels: { type: [ Object, Array ], default: null }
},
@@ -210,18 +211,20 @@ export default {
computed: {
eventType () {
return this.timeInput
? "timed"
: this.shotInput
? "seq"
: this.eventMode;
},
formTitle () {
if (this.timeInput) {
return "New event at time";
} else if (this.shotInput) {
return "New event at shotpoint";
} else if (this.defaultTimestamp) {
return "New event at " +
this.defaultTimestampAsDate.toISOString().replace(/(.{10})T(.{8}).{4}Z$/, "$1 $2");
} else if (this.defaultShotNumber) {
return "New event on shotpoint " + this.defaultShotNumber;
if (this.eventType == "seq") {
return `New event at shotpoint ${this.shot.point}`;
} else {
return "New event at time "+this.tstamp.toISOString().replace(/(.{10})T(.{8}).{4}Z$/, "$1 $2");
}
return "New event";
},
defaultTimestampAsDate () {
@@ -239,13 +242,13 @@ export default {
tstamp () {
return this.timeInput
? new Date(this.tsDate+"T"+this.tsTime+"Z")
: this.defaultTimestampAsDate;
: this.defaultTimestampAsDate || new Date();
},
shot () {
return this.shotInput
? { sequence: this.sequence, point: Number(this.point) }
: { sequence: this.defaultSequence, point: this.defaultShotNumber };
: { sequence: this.defaultSequence, point: this.defaultShotpoint };
},
isTimedEvent () {
@@ -255,7 +258,7 @@ export default {
isShotEvent () {
return Boolean((this.shotInput && this.shot.sequence && this.shot.point) ||
(this.defaultSequence && this.defaultShotNumber && !this.timeInput));
(this.defaultSequence && this.defaultShotpoint && !this.timeInput));
},
isValid () {
@@ -307,7 +310,8 @@ export default {
this.updateTimeFields();
await this.updateSequences();
this.sequence = this.defaultSequence;
this.point = this.defaultShotNumber;
this.point = this.defaultShotpoint;
this.shotInput = this.eventMode == "seq";
}
},

View File

@@ -4,6 +4,8 @@ import router from './router'
import store from './store'
import vuetify from './plugins/vuetify'
import vueDebounce from 'vue-debounce'
import { mapMutations } from 'vuex';
Vue.config.productionTip = false
@@ -17,7 +19,10 @@ new Vue({
snackText: null,
snackColour: null,
user: null
user: null,
wsUrl: "/ws",
ws: null
}
},
@@ -34,8 +39,51 @@ new Vue({
this.snackColour = colour;
this.snackText = text;
this.snack = true;
}
},
initWs () {
if (this.ws && this.ws.readyState == 1) {
console.log("WebSocket already initialised");
return;
}
this.ws = new WebSocket(this.wsUrl);
this.ws.addEventListener("message", (ev) => {
const msg = JSON.parse(ev.data);
if (msg.payload) {
msg.payload = JSON.parse(msg.payload);
}
this.setServerEvent(msg);
});
this.ws.addEventListener("open", (ev) => {
console.log("WebSocket connection open", ev);
});
this.ws.addEventListener("close", (ev) => {
console.warn("WebSocket connection closed", ev);
delete this.ws;
setTimeout( this.initWs, 5000 );
});
this.ws.addEventListener("error", (ev) => {
console.error("WebSocket connection error", ev);
setTimeout( this.initWs, 60000 );
});
},
...mapMutations(['setServerEvent'])
},
created () {
this.wsUrl = location.protocol == "https:"
? "wss://"+location.host+this.wsUrl
: "ws://"+location.host+this.wsUrl;
this.$nextTick( () => this.initWs() );
},
router,

View File

@@ -4,6 +4,7 @@ import Vuex from 'vuex'
import api from './modules/api'
import snack from './modules/snack'
import project from './modules/project'
import notify from './modules/notify'
Vue.use(Vuex)
@@ -11,6 +12,7 @@ export default new Vuex.Store({
modules: {
api,
snack,
project
project,
notify
}
})

View File

@@ -0,0 +1,35 @@
const _ = (s,k) =>
k.split(".").reduce((a, b) => (a !== null && typeof a != "undefined") ? a[b] : a, s);
function serverEvent (state) {
return state.serverEvent;
}
function online (state) {
return !!_(state, "serverEvent.payload.new.meta._online");
}
function lineStatus (state) {
return (_(state, "serverEvent.payload.new.meta.lineStatus")||"").trim();
}
function lineName (state) {
return (_(state, "serverEvent.payload.new.meta.lineName")||"").trim();
}
function sequence (state) {
const v = _(state, "serverEvent.payload.new.meta._sequence");
return Number(v) || v;
}
function line (state) {
const v = _(state, "serverEvent.payload.new.meta._line");
return Number(v) || v;
}
function point (state) {
const v = _(state, "serverEvent.payload.new.meta._point");
return Number(v) || v;
}
export default { serverEvent, online, lineName, sequence, line, point };

View File

@@ -0,0 +1,6 @@
import state from './state'
import getters from './getters'
import actions from './actions'
import mutations from './mutations'
export default { state, getters, actions, mutations };

View File

@@ -0,0 +1,10 @@
function setServerEvent (state, serverEvent) {
state.serverEvent = serverEvent;
}
function clearServerEvent (state) {
state.serverEvent = null;
}
export default { setServerEvent, clearServerEvent };

View File

@@ -0,0 +1,5 @@
const state = () => ({
serverEvent: null
});
export default state;

View File

@@ -23,7 +23,9 @@
:allowed-labels="userLabels"
:preset-remarks="presetRemarks"
:default-timestamp="defaultEventTimestamp"
:default-sequence="$route.params.sequence && (Number($route.params.sequence) || Number($route.params.sequence.split(';').sort().pop()))"
:default-sequence="defaultSequence"
:default-shotpoint="point"
:event-mode="online?'seq':'timed'"
@save="saveEvent"
></dougal-event-edit-dialog>
@@ -340,7 +342,15 @@ export default {
return filtered;
},
...mapGetters(['loading'])
defaultSequence () {
if (this.$route.params.sequence) {
return Number(this.$route.params.sequence.split(";").pop());
} else {
return this.sequence;
}
},
...mapGetters(['loading', 'online', 'sequence', 'line', 'point', 'lineName'])
},

View File

@@ -141,7 +141,6 @@ const layers = {
type: 'json',
}, {
start: false,
interval: 6 * 1000,
getFeatureId (feature) {
return feature.properties.vesselName || feature.properties.vesselId;
},
@@ -161,7 +160,7 @@ const layers = {
},
onEachFeature (feature, layer) {
layer.bindPopup(function () {
return makeRealTimePopup(feature.properties);
return makeRealTimePopup(feature);
});
}
}),
@@ -185,7 +184,7 @@ const layers = {
layers["Real-time"].on('update', function (e) {
Object.keys(e.features).forEach( (id) => {
const feature = e.features[id];
this.getLayer(id).bindPopup(makeRealTimePopup(feature.properties));
this.getLayer(id).bindPopup(makeRealTimePopup(feature));
});
}, this);
@@ -205,7 +204,8 @@ layers["Real-time (trail)"].on('remove', function (e) {
this.stop();
});
function makeRealTimePopup(p) {
function makeRealTimePopup(feature) {
const p = feature.properties;
const online = p._online
? `
<table>
@@ -215,9 +215,12 @@ function makeRealTimePopup(p) {
<tr><td><b>Shot:</b></td><td>${p._point}</td></tr>
<tr><td><b>Crossline:</b></td><td>${p.crossline || "???"} m</td></tr>
<tr><td><b>Inline:</b></td><td>${p.inline || "???"} m</td></tr>
<tr><td><b>Source fired:</b></td><td>${p.src_number|| "???"}</td></tr>
<tr><td><b>Manifold press.:</b></td><td>${p.manifold|| "???"} psi</td></tr>
</table>
`
: "";
const wgs84 = `${feature.geometry.coordinates[1].toFixed(6)}, ${feature.geometry.coordinates[0].toFixed(6)}`
const popup = `
Position as of ${p.tstamp}<br/><hr/>
${online}
@@ -225,6 +228,8 @@ function makeRealTimePopup(p) {
<tr><td><b>Speed:</b></td><td>${p.speed ? p.speed*3.6/1.852 : "???"} kt</td></tr>
<tr><td><b>CMG:</b></td><td>${p.cmg || "???"}°</td></tr>
<tr><td><b>Water depth:</b></td><td>${p.waterDepth || "???"} m</td></tr>
<tr><td><b>WGS84:</b></td><td>${wgs84}</td></tr>
<tr><td><b>Local grid:</b></td><td>${p.easting.toFixed(1)}, ${p.northing.toFixed(1)}</td></tr>
</table>
`
return popup;
@@ -267,7 +272,7 @@ export default {
},
computed: {
...mapGetters(['loading'])
...mapGetters(['loading', 'serverEvent', 'lineName'])
},
watch: {
@@ -280,6 +285,20 @@ export default {
el.classList.add("d-none");
}
}
},
serverEvent (event) {
if (event.channel == "realtime") {
const rtLayer = layers["Real-time"];
if (rtLayer.isRunning()) {
const geojson = {
type: "Feature",
geometry: event.payload.new.geometry,
properties: event.payload.new.meta
};
rtLayer.update(geojson);
}
}
}
},
@@ -378,6 +397,7 @@ export default {
}
const layerControl = L.control.layers(tileMaps, layers).addTo(map);
const scaleControl = L.control.scale().addTo(map);
if (init.position) {
map.setView(init.position.slice(1), init.position[0]);

View File

@@ -1,5 +1,6 @@
const api = require('./api');
const ws = require('./ws');
api.start(process.env.HTTP_PORT || 3000, process.env.HTTP_PATH);
const server = api.start(process.env.HTTP_PORT || 3000, process.env.HTTP_PATH);
ws.start(server);

View File

@@ -31,23 +31,23 @@ function parse (buffer) {
throw new NavHeaderError("Expected SmartSource marker not found at position " + s, buf);
}
},
blk_siz: (buf, ctx) => {
return Number(ascii(4));
},
line: (buf, ctx) => {
return ascii(30).trim();
},
shot: (buf, ctx) => {
return Number(ascii(10));
},
mask: (buf, ctx) => {
return Number(ascii(2));
},
trg_mode: (buf, ctx) => {
const trg_mode = ascii(1);
switch (trg_mode) {
@@ -59,81 +59,81 @@ function parse (buffer) {
throw new NavHeaderError("Unknown SmartSource trigger mode: " + trg_mode, buf);
}
},
time: (buf, ctx) => {
const time = ascii(17);
'20/08/30:05:45:58'
return new Date(time.replace(/(\d{2})\/(\d{2})\/(\d{2}):(\d{2}):(\d{2}):(\d{2})/, "20$1-$2-$3T$4:$5:$6Z"));
},
src_number: (buf, ctx) => {
return Number(ascii(1));
},
num_subarray: (buf, ctx) => {
return Number(ascii(1));
},
num_guns: (buf, ctx) => {
return Number(ascii(2));
},
num_active: (buf, ctx) => {
return Number(ascii(2));
},
num_delta: (buf, ctx) => {
return Number(ascii(2));
},
num_auto: (buf, ctx) => {
return Number(ascii(2));
},
num_nofire: (buf, ctx) => {
return Number(ascii(2));
},
spread: (buf, ctx) => {
// Convert to ms
return Number(ascii(4))/10;
},
volume: (buf, ctx) => {
return Number(ascii(6));
},
avg_delta: (buf, ctx) => {
return Number(ascii(5));
},
std_delta: (buf, ctx) => {
return Number(ascii(5));
},
baroPress: (buf, ctx) => {
// Converted to millibars
return Number(ascii(6))/100;
},
manifold: (buf, ctx) => {
return Number(ascii(4)); // PSI
},
spare: (buf, ctx) => {
return ascii(88).trim();
},
};
const gun = {
string: (buf, ctx) => {
return Number(ascii(1));
},
gun: (buf, ctx) => {
return Number(ascii(2));
},
source: (buf, ctx) => {
return Number(ascii(1));
},
@@ -217,12 +217,12 @@ function parse (buffer) {
for (const key of Object.keys(header)) {
smartsource[key] = header[key](buffer, smartsource);
}
smartsource.guns = [];
for (let n=0; n<smartsource.num_guns; n++) {
const gunItem = {};
const gunItem = [];
for (const key of Object.keys(gun)) {
gunItem[key] = gun[key](buffer, gunItem);
gunItem.push(gun[key](buffer, gunItem));
}
smartsource.guns.push(gunItem);
}
@@ -230,7 +230,7 @@ function parse (buffer) {
return smartsource;
}
module.exports = {
name: "SmartSource",
detect,

View File

@@ -614,6 +614,11 @@
"resolved": "https://registry.npmjs.org/vary/-/vary-1.1.2.tgz",
"integrity": "sha1-IpnwLG3tMNSllhsLn3RSShj2NPw="
},
"ws": {
"version": "7.3.1",
"resolved": "https://registry.npmjs.org/ws/-/ws-7.3.1.tgz",
"integrity": "sha512-D3RuNkynyHmEJIpD2qrgVkc9DQ23OrN/moAwZX4L8DfvszsJxpjQuUq3LMx6HoYji9fbIOBY18XWBsAux1ZZUA=="
},
"xtend": {
"version": "4.0.2",
"resolved": "https://registry.npmjs.org/xtend/-/xtend-4.0.2.tgz",

View File

@@ -15,6 +15,7 @@
"jsonwebtoken": "^8.5.1",
"node-fetch": "^2.6.0",
"pg": "^8.3.0",
"ws": "^7.3.1",
"yaml": "^2.0.0-0"
}
}

40
lib/www/server/ws/db.js Normal file
View File

@@ -0,0 +1,40 @@
const { pool } = require('../lib/db/connection');
var client;
const channels = {};
async function notify (data) {
if (data.channel in channels) {
data._received = new Date();
for (const listener of channels[data.channel]) {
listener(data);
}
}
}
async function listen (addChannels, callback) {
if (!client) {
client = await pool.connect();
client.on('notification', notify);
console.log("Client connected");
}
if (!Array.isArray(addChannels)) {
addChannels = [addChannels];
}
for (const channel of addChannels) {
if (!(channel in channels)) {
await client.query("LISTEN "+channel);
channels[channel] = new Set();
console.log("Listening to ", channel);
}
channels[channel].add(callback);
}
}
module.exports = {
listen
}

View File

@@ -0,0 +1,49 @@
const ws = require('ws');
const URL = require('url');
const db = require('./db');
function start (server, pingInterval=30000) {
const wsServer = new ws.Server({ noServer: true });
wsServer.on('connection', socket => {
socket.alive = true;
socket.on('pong', function () { this.alive = true; })
socket.on('message', message => console.log(message));
});
server.on('upgrade', (request, socket, head) => {
console.log("Received upgrade request", request.url);
const url = URL.parse(request.url);
if (/^\/ws\/?$/.test(url.pathname)) {
wsServer.handleUpgrade(request, socket, head, socket => {
wsServer.emit('connection', socket, request);
});
}
});
db.listen(["realtime", "event", "project"], (data) => {
console.log("DB realtime", data);
console.log(wsServer.clients.length, "clients");
wsServer.clients.forEach( (socket) => {
socket.send(JSON.stringify(data));
})
});
const interval = setInterval( () => {
wsServer.clients.forEach( (socket) => {
if (!socket.alive) {
return socket.terminate();
}
socket.alive = false;
socket.ping();
})
}, pingInterval);
wsServer.on('close', () => clearInterval(interval));
return wsServer;
}
module.exports = {
start
}