Implement pub-sub handler system for ws notifications.

This commit is contained in:
D. Berge
2025-08-06 10:59:17 +02:00
parent be7157b62c
commit 447003c3b5
7 changed files with 260 additions and 50 deletions

View File

@@ -23,6 +23,7 @@
"leaflet-arrowheads": "^1.2.2",
"leaflet-realtime": "^2.2.0",
"leaflet.markercluster": "^1.4.1",
"lodash.debounce": "^4.0.8",
"marked": "^9.1.4",
"path-browserify": "^1.0.1",
"plotly.js-dist": "^2.27.0",

View File

@@ -54,7 +54,6 @@ export default {
computed: {
snackText () { return this.$store.state.snack.snackText },
snackColour () { return this.$store.state.snack.snackColour },
...mapGetters(["serverEvent"])
},
watch: {
@@ -77,24 +76,45 @@ export default {
this.$store.commit('setSnackText', "");
}
},
async serverEvent (event) {
if (event.channel == "project" && event.payload?.schema == "public") {
// Projects changed in some way or another
await this.refreshProjects();
} else if (event.channel == ".jwt" && event.payload?.token) {
await this.setCredentials({token: event.payload?.token});
}
}
},
methods: {
handleJWT (context, {payload}) {
this.setCredentials({token: payload.token});
},
handleProject (context, {payload}) {
this.refreshProjects();
},
registerNotificationHandlers () {
this.$store.dispatch('registerHandler', {
table: '.jwt',
handler: (context, message) => {
this.handleJWT(context, message);
}
});
this.$store.dispatch('registerHandler', {
table: 'project',
handler: (context, message) => {
this.handleProject(context, message);
}
});
},
...mapActions(["setCredentials", "refreshProjects"])
},
async mounted () {
// Local Storage values are always strings
this.$vuetify.theme.dark = localStorage.getItem("darkTheme") == "true";
this.registerNotificationHandlers();
await this.setCredentials();
this.refreshProjects();
}

View File

@@ -0,0 +1,136 @@
import debounce from 'lodash/debounce';
function old_processServerEvent({state, getters, commit, dispatch}, [message]) {
//console.log("Processing server event", message);
function processPlan ({payload}) {
if (payload.operation == "INSERT") {
commit("setSequence", payload.new);
} else if (payload.operation == "UPDATE") {
commit("replaceSequence", [payload.old, payload.new]);
} else if (payload.operation == "DELETE") {
commit("deleteSequence", payload.old);
}
}
commit("setServerEvent", message);
if (!message) {
console.warn("processServerEvent called without arguments");
return;
}
if (!message.channel) {
console.warn("processServerEvent message missing channel");
return;
}
if (!message.payload) {
console.warn("processServerEvent message missing payload");
return;
}
if (message.payload.operation == "INSERT") {
if (message.payload.new == null) {
console.warn("Expected payload.new to be non-null");
return;
}
} else if (message.payload.operation == "UPDATE") {
if (message.payload.old == null || message.payload.new == null) {
console.warn("Expected payload.old and paylaod.new to be non-null");
return;
}
} else if (message.payload.operation == "DELETE") {
if (message.payload.old == null) {
console.warn("Expected payload.old to be non-null");
return;
}
} else {
console.warn(`Unrecognised operation: ${message.payload.operation}`);
}
if (message.channel == "planned_lines") {
// Process a change in the planner
processPlan(message);
}
}
function registerHandler({ commit }, { table, handler }) {
commit('REGISTER_HANDLER', { table, handler });
}
function processServerEvent({ commit, dispatch, state, rootState }, message) {
//console.log("processServerEvent", message);
// Error handling for invalid messages
if (!message) {
console.error("processServerEvent called without arguments");
return;
}
if (!message.channel) {
console.error("processServerEvent message missing channel");
return;
}
if (!message.payload) {
console.error("processServerEvent message missing payload");
return;
}
if (message.payload.operation == "INSERT") {
if (message.payload.new == null) {
console.error("Expected payload.new to be non-null");
return;
}
} else if (message.payload.operation == "UPDATE") {
if (message.payload.old == null || message.payload.new == null) {
console.error("Expected payload.old and paylaod.new to be non-null");
return;
}
} else if (message.payload.operation == "DELETE") {
if (message.payload.old == null) {
console.error("Expected payload.old to be non-null");
return;
}
} else {
console.warn(`Unrecognised operation: ${message.payload.operation}`);
}
const table = message.channel; // or message.payload?.table;
//console.log("table=", table);
if (!table || !state.handlers[table] || state.handlers[table].length === 0) {
return;
}
// Create a debounced runner per table if not exists
if (!state.debouncedRunners) {
state.debouncedRunners = {}; // Not reactive needed? Or use Vue.set
}
if (!state.debouncedRunners[table]) {
const config = {
wait: 300, // min silence in ms
maxWait: 1000, // max wait before force run, adjustable
trailing: true,
leading: false
};
state.debouncedRunners[table] = debounce((lastMessage) => {
const context = { commit, dispatch, state: rootState, rootState }; // Approximate action context
state.handlers[table].forEach(handler => {
try {
//console.log("Trying handler:", handler);
handler(context, lastMessage);
} catch (e) {
console.error(`Error in handler for table ${table}:`, e);
}
});
}, config.wait, { maxWait: config.maxWait });
}
// Call the debounced function with the current message
// Debounce will use the last call's argument if multiple
state.debouncedRunners[table](message);
}
export default { registerHandler, processServerEvent };

View File

@@ -11,4 +11,12 @@ function setServerConnectionState (state, isConnected) {
state.serverConnected = !!isConnected;
}
export default { setServerEvent, clearServerEvent, setServerConnectionState };
function REGISTER_HANDLER(state, { table, handler }) {
if (!state.handlers[table]) {
state.handlers[table] = [];
}
state.handlers[table].push(handler);
}
export default { setServerEvent, clearServerEvent, setServerConnectionState, REGISTER_HANDLER };

View File

@@ -1,6 +1,7 @@
const state = () => ({
serverEvent: null,
serverConnected: false
serverConnected: false,
handlers: {}, // table: array of functions (each fn receives { commit, dispatch, state, rootState, message })
});
export default state;

View File

@@ -43,41 +43,81 @@ export default {
return this.loading || this.projectId;
},
...mapGetters(["loading", "projectId", "projectSchema", "serverEvent"])
},
watch: {
async serverEvent (event) {
if (event.channel == "project" && event.payload?.operation == "DELETE" && event.payload?.schema == "public") {
// Project potentially deleted
await this.getProject(this.$route.params.project);
} else if (event.payload?.schema == this.projectSchema) {
if (event.channel == "event") {
this.refreshEvents();
} else if (event.channel == "planned_lines") {
this.refreshPlan();
} else if (["raw_lines", "raw_shots", "final_lines", "final_shots"].includes(event.channel)) {
this.refreshSequences();
} else if (["preplot_lines", "preplot_points"].includes(event.channel)) {
this.refreshLines();
} else if (event.channel == "info") {
if ((event.payload?.new ?? event.payload?.old)?.key == "plan") {
this.refreshPlan();
}
} else if (event.channel == "project") {
this.getProject(this.$route.params.project);
}
}
}
...mapGetters(["loading", "projectId", "projectSchema"])
},
methods: {
handleLines (context, {payload}) {
if (payload.pid != this.projectId) {
console.warn(`${this.projectId} ignoring notification for ${payload.pid}`);
return;
}
this.refreshLines();
},
handlePlannedLines (context, {payload}) {
if (payload.pid != this.projectId) {
console.warn(`${this.projectId} ignoring notification for ${payload.pid}`);
return;
}
this.refreshPlan();
},
handleSequences (context, {payload}) {
if (payload.pid != this.projectId) {
console.warn(`${this.projectId} ignoring notification for ${payload.pid}`);
return;
}
console.log("handleSequences");
this.refreshSequences();
},
registerNotificationHandlers () {
["preplot_lines", "preplot_points"].forEach( table => {
this.$store.dispatch('registerHandler', {
table,
handler: (context, message) => {
this.handleLines(context, message);
}
})
});
this.$store.dispatch('registerHandler', {
table: 'planned_lines',
handler: (context, message) => {
this.handlePlannedLines(context, message);
}
});
["raw_lines", "raw_shots", "final_lines", "final_shots"].forEach( table => {
this.$store.dispatch('registerHandler', {
table,
handler: (context, message) => {
this.handleSequences(context, message);
}
})
});
},
...mapActions(["getProject", "refreshLines", "refreshSequences", "refreshEvents", "refreshLabels", "refreshPlan"])
},
async mounted () {
await this.getProject(this.$route.params.project);
if (this.projectFound) {
this.registerNotificationHandlers();
this.refreshLines();
this.refreshSequences();
this.refreshEvents();

View File

@@ -184,17 +184,7 @@ export default {
: this.items.filter(i => !i.archived);
},
...mapGetters(['loading', 'serverEvent', 'projects'])
},
watch: {
async serverEvent (event) {
if (event.channel == "project" && event.payload?.schema == "public") {
if (event.payload?.operation == "DELETE" || event.payload?.operation == "INSERT") {
await this.load();
}
}
}
...mapGetters(['loading', 'projects'])
},
methods: {
@@ -220,6 +210,7 @@ export default {
},
async load () {
await this.refreshProjects();
await this.list();
const promises = [];
for (const key in this.items) {
@@ -234,6 +225,18 @@ export default {
}
},
registerNotificationHandlers () {
this.$store.dispatch('registerHandler', {
table: 'project`',
handler: (context, message) => {
if (message.payload?.table == "public") {
this.load();
}
}
});
},
contextMenu (e, {item}) {
e.preventDefault();
this.contextMenuShow = false;
@@ -372,10 +375,11 @@ export default {
},
...mapActions(["api", "showSnack"])
...mapActions(["api", "showSnack", "refreshProjects"])
},
mounted () {
this.registerNotificationHandlers();
this.load();
}
}