Add notifier to DB library

This commit is contained in:
D. Berge
2023-09-01 12:22:54 +02:00
parent fb10e56487
commit ed14fd0ced
2 changed files with 44 additions and 1 deletions

View File

@@ -1,5 +1,5 @@
const { Pool, Client, types } = require('pg'); const { Pool, Client, types } = require('pg');
const createSubscriber = require('pg-listen');
const cfg = require("../config"); const cfg = require("../config");
const pool = new Pool(cfg.db); const pool = new Pool(cfg.db);
@@ -14,6 +14,12 @@ numericTypeOIDs.forEach(oid => {
// types.setTypeParser(oid, function (v) { return JSON.parse(v); }); // types.setTypeParser(oid, function (v) { return JSON.parse(v); });
// }) // })
function makeSubscriber (opts = cfg.db) {
const subscriber = createSubscriber(opts);
process.on("exit", () => subscriber.close());
return subscriber;
}
const transaction = { const transaction = {
async begin (client) { async begin (client) {
return await client.query("BEGIN;"); return await client.query("BEGIN;");
@@ -81,6 +87,7 @@ async function fetchRow (cursor) {
module.exports = { module.exports = {
pool, pool,
makeSubscriber,
transaction, transaction,
setSurvey, setSurvey,
schema2pid, schema2pid,

View File

@@ -0,0 +1,36 @@
const { makeSubscriber } = require('./connection');
async function listen (addChannels, callback) {
const client = makeSubscriber();
client.events.on("error", (err) => {
console.error("Postgres LISTEN subscriber error", err);
setTimeout(() => client.connect(), 5000);
});
client.connect();
if (!Array.isArray(addChannels)) {
addChannels = [addChannels];
}
for (const channel of addChannels) {
await client.listenTo(channel);
client.notifications.on(channel, (payload) => {
const data = {
channel,
_received: new Date(),
payload
};
callback(data);
});
}
return client;
}
module.exports = {
listen
};