Rewrite automatic event handling system

This commit is contained in:
D. Berge
2025-08-15 14:45:46 +02:00
parent 2fab06d340
commit 387d20a4f0
9 changed files with 173 additions and 87 deletions

View File

@@ -1,29 +1,101 @@
const nodeAsync = require('async'); // npm install async
const { listen } = require('../lib/db/notify');
const db = require('../lib/db'); // Adjust paths; include all needed DB utils
const { schema2pid } = require('../lib/db/connection');
const unique = require('../lib/utils/unique'); // If needed by handlers
const withinValidity = require('../lib/utils/ranges').withinValidity; // If needed
const { ALERT, ERROR, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
// List of handler classes (add more as needed)
const handlerClasses = require('./handlers').Handlers;
// Channels to listen to (hardcoded for simplicity; could scan handlers for mentions)
const channels = require('../lib/db/channels');
const handlers = require('./handlers');
const { ActionsQueue } = require('../lib/queue');
const { ERROR, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
function start () {
// Queue config: Process one at a time for order; max retries=3
const eventQueue = nodeAsync.queue(async (task, callback) => {
const { data, ctx } = task;
DEBUG(`Processing event on channel ${data.channel} with timestamp ${data._received ?? 'unknown'}`);
const queue = new ActionsQueue();
const ctx = {}; // Context object
for (const handler of ctx.handlers) {
try {
await handler.run(data, ctx);
} catch (err) {
ERROR(`Error in handler ${handler.constructor.name}:`, err);
// Retry logic: Could add task.retries++, re-enqueue if < max
}
}
const { prepare, despatch } = handlers.init(ctx);
if (typeof callback === 'function') {
// async v3.2.6+ does not use callsbacks with AsyncFunctions, but anyway
callback();
}
}, 1); // Concurrency=1 for strict order
listen(channels, function (data) {
DEBUG("Incoming data", data);
eventQueue.error((err, task) => {
ALERT(`Queue error processing task:`, err, task);
});
// We don't bother awaiting
queue.enqueue(() => despatch(data, ctx));
DEBUG("Queue size", queue.length());
// Main setup function (call from server init)
async function setupEventHandlers(projectsConfig) {
// Shared context
const ctx = {
dryRun: Boolean(process.env.DOUGAL_HANDLERS_DRY_RUN) ?? false, // If true, don't commit changes
projects: { configuration: projectsConfig }, // From user config
handlers: handlerClasses.map(Cls => new Cls()), // Instances
// DB utils (add more as needed)
db,
schema2pid,
unique,
withinValidity
// Add other utils, e.g., ctx.logger = DEBUG;
};
// Optional: Replay recent events on startup to rebuild state
// await replayRecentEvents(ctx);
// Setup listener
const subscriber = await listen(channels, (rawData) => {
const data = {
...rawData,
enqueuedAt: new Date() // For monitoring
};
eventQueue.push({ data, ctx });
});
INFO("Events manager started");
DEBUG('Event handler system initialized with channels:', channels);
if (ctx.dryRun) {
DEBUG('DRY RUNNING');
}
// Return for cleanup if needed
return {
close: () => {
subscriber.events.removeAllListeners();
subscriber.close();
eventQueue.kill();
}
};
}
module.exports = { start }
// Optional: Replay last N events to rebuild handler state (e.g., this.prev)
// async function replayRecentEvents(ctx) {
// try {
// // Example: Fetch last 10 realtime events, sorted by tstamp
// const recentRealtime = await event.listAllProjects({ channel: 'realtime', limit: 10, sort: 'tstamp DESC' });
// // Assume event.listAllProjects is a custom DB method; implement if needed
//
// // Enqueue in original order (reverse sort)
// recentRealtime.reverse().forEach((evt) => {
// const data = { channel: 'realtime', payload: { new: evt } };
// eventQueue.push({ data, ctx });
// });
//
// // Similarly for 'event' channel if needed
// DEBUG('Replayed recent events for state rebuild');
// } catch (err) {
// ERROR('Error replaying events:', err);
// }
// }
if (require.main === module) {
start();
}
module.exports = { setupEventHandlers };