const { pool } = require('../lib/db/connection'); var client; const channels = {}; async function notify (data) { if (data.channel in channels) { data._received = new Date(); try { const json = JSON.parse(data.payload); data.payload = json; } catch { // Ignore the error } for (const listener of channels[data.channel]) { await listener(JSON.parse(JSON.stringify(data))); } } } function reconnect () { console.log("Reconnecting"); // No need to provide parameters, channels should already be populated. listen(); } async function listen (addChannels, callback) { if (!client) { try { client = await pool.connect(); } catch (err) { console.error("Error connecting to DB", err); console.log("Will try again in 15 seconds"); setImmediate(() => client = null); setTimeout(() => { listen(addChannels, callback); }, 15000); return; } client.on('notification', notify); console.log("Websocket client connected", Object.keys(channels)); client.on('error', (err) => console.error("Events client error: ", err)); client.on('end', () => { console.warn("Websocket events client disconnected. Will attempt to reconnect in five seconds"); setImmediate(() => client = null); setTimeout(reconnect, 5000); }); } if (addChannels) { if (!Array.isArray(addChannels)) { addChannels = [addChannels]; } for (const channel of addChannels) { if (!(channel in channels)) { await client.query("LISTEN "+channel); channels[channel] = []; console.log("Listening to ", channel); } channels[channel].push(callback); } } } module.exports = { listen }