From 7d5fb4bceb83b709988072680f94e5f136f08112 Mon Sep 17 00:00:00 2001 From: "D. Berge" Date: Fri, 1 Sep 2023 12:22:54 +0200 Subject: [PATCH] Add notifier to DB library --- lib/www/server/lib/db/connection.js | 9 +++++++- lib/www/server/lib/db/notify.js | 36 +++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 1 deletion(-) create mode 100644 lib/www/server/lib/db/notify.js diff --git a/lib/www/server/lib/db/connection.js b/lib/www/server/lib/db/connection.js index 6accbfa..7b9a74a 100644 --- a/lib/www/server/lib/db/connection.js +++ b/lib/www/server/lib/db/connection.js @@ -1,5 +1,5 @@ const { Pool, Client, types } = require('pg'); - +const createSubscriber = require('pg-listen'); const cfg = require("../config"); const pool = new Pool(cfg.db); @@ -14,6 +14,12 @@ numericTypeOIDs.forEach(oid => { // types.setTypeParser(oid, function (v) { return JSON.parse(v); }); // }) +function makeSubscriber (opts = cfg.db) { + const subscriber = createSubscriber(opts); + process.on("exit", () => subscriber.close()); + return subscriber; +} + const transaction = { async begin (client) { return await client.query("BEGIN;"); @@ -72,6 +78,7 @@ async function fetchRow (cursor) { module.exports = { pool, + makeSubscriber, transaction, setSurvey, schema2pid, diff --git a/lib/www/server/lib/db/notify.js b/lib/www/server/lib/db/notify.js new file mode 100644 index 0000000..5ff9e9c --- /dev/null +++ b/lib/www/server/lib/db/notify.js @@ -0,0 +1,36 @@ +const { makeSubscriber } = require('./connection'); + + +async function listen (addChannels, callback) { + + const client = makeSubscriber(); + + client.events.on("error", (err) => { + console.error("Postgres LISTEN subscriber error", err); + setTimeout(() => client.connect(), 5000); + }); + + client.connect(); + + if (!Array.isArray(addChannels)) { + addChannels = [addChannels]; + } + + for (const channel of addChannels) { + await client.listenTo(channel); + client.notifications.on(channel, (payload) => { + const data = { + channel, + _received: new Date(), + payload + }; + callback(data); + }); + } + + return client; +} + +module.exports = { + listen +};