const nodeAsync = require('async'); // npm install async const { listen } = require('../lib/db/notify'); const db = require('../lib/db'); // Adjust paths; include all needed DB utils const { schema2pid } = require('../lib/db/connection'); const unique = require('../lib/utils/unique'); // If needed by handlers const withinValidity = require('../lib/utils/ranges').withinValidity; // If needed const { ALERT, ERROR, DEBUG } = require('DOUGAL_ROOT/debug')(__filename); // List of handler classes (add more as needed) const handlerClasses = require('./handlers').Handlers; // Channels to listen to (hardcoded for simplicity; could scan handlers for mentions) const channels = require('../lib/db/channels'); // Queue config: Process one at a time for order; max retries=3 const eventQueue = nodeAsync.queue(async (task, callback) => { const { data, ctx } = task; DEBUG(`Processing event on channel ${data.channel} with timestamp ${data._received ?? 'unknown'}`); for (const handler of ctx.handlers) { try { await handler.run(data, ctx); } catch (err) { ERROR(`Error in handler ${handler.constructor.name}:`, err); // Retry logic: Could add task.retries++, re-enqueue if < max } } if (typeof callback === 'function') { // async v3.2.6+ does not use callsbacks with AsyncFunctions, but anyway callback(); } }, 1); // Concurrency=1 for strict order eventQueue.error((err, task) => { ALERT(`Queue error processing task:`, err, task); }); // Main setup function (call from server init) async function setupEventHandlers(projectsConfig) { // Shared context const ctx = { dryRun: Boolean(process.env.DOUGAL_HANDLERS_DRY_RUN) ?? false, // If true, don't commit changes projects: { configuration: projectsConfig }, // From user config handlers: handlerClasses.map(Cls => new Cls()), // Instances // DB utils (add more as needed) db, schema2pid, unique, withinValidity // Add other utils, e.g., ctx.logger = DEBUG; }; // Optional: Replay recent events on startup to rebuild state // await replayRecentEvents(ctx); // Setup listener const subscriber = await listen(channels, (rawData) => { const data = { ...rawData, enqueuedAt: new Date() // For monitoring }; eventQueue.push({ data, ctx }); }); DEBUG('Event handler system initialized with channels:', channels); if (ctx.dryRun) { DEBUG('DRY RUNNING'); } // Return for cleanup if needed return { close: () => { subscriber.events.removeAllListeners(); subscriber.close(); eventQueue.kill(); } }; } // Optional: Replay last N events to rebuild handler state (e.g., this.prev) // async function replayRecentEvents(ctx) { // try { // // Example: Fetch last 10 realtime events, sorted by tstamp // const recentRealtime = await event.listAllProjects({ channel: 'realtime', limit: 10, sort: 'tstamp DESC' }); // // Assume event.listAllProjects is a custom DB method; implement if needed // // // Enqueue in original order (reverse sort) // recentRealtime.reverse().forEach((evt) => { // const data = { channel: 'realtime', payload: { new: evt } }; // eventQueue.push({ data, ctx }); // }); // // // Similarly for 'event' channel if needed // DEBUG('Replayed recent events for state rebuild'); // } catch (err) { // ERROR('Error replaying events:', err); // } // } module.exports = { setupEventHandlers };