diff --git a/lib/www/server/index.js b/lib/www/server/index.js index c6ca828..f2ae7f3 100644 --- a/lib/www/server/index.js +++ b/lib/www/server/index.js @@ -1,5 +1,6 @@ const api = require('./api'); +const ws = require('./ws'); - -api.start(process.env.HTTP_PORT || 3000, process.env.HTTP_PATH); +const server = api.start(process.env.HTTP_PORT || 3000, process.env.HTTP_PATH); +ws.start(server); diff --git a/lib/www/server/package-lock.json b/lib/www/server/package-lock.json index 045c7fc..75496bd 100644 --- a/lib/www/server/package-lock.json +++ b/lib/www/server/package-lock.json @@ -614,6 +614,11 @@ "resolved": "https://registry.npmjs.org/vary/-/vary-1.1.2.tgz", "integrity": "sha1-IpnwLG3tMNSllhsLn3RSShj2NPw=" }, + "ws": { + "version": "7.3.1", + "resolved": "https://registry.npmjs.org/ws/-/ws-7.3.1.tgz", + "integrity": "sha512-D3RuNkynyHmEJIpD2qrgVkc9DQ23OrN/moAwZX4L8DfvszsJxpjQuUq3LMx6HoYji9fbIOBY18XWBsAux1ZZUA==" + }, "xtend": { "version": "4.0.2", "resolved": "https://registry.npmjs.org/xtend/-/xtend-4.0.2.tgz", diff --git a/lib/www/server/package.json b/lib/www/server/package.json index 4f9bb8d..4d72035 100644 --- a/lib/www/server/package.json +++ b/lib/www/server/package.json @@ -15,6 +15,7 @@ "jsonwebtoken": "^8.5.1", "node-fetch": "^2.6.0", "pg": "^8.3.0", + "ws": "^7.3.1", "yaml": "^2.0.0-0" } } diff --git a/lib/www/server/ws/db.js b/lib/www/server/ws/db.js new file mode 100644 index 0000000..21162aa --- /dev/null +++ b/lib/www/server/ws/db.js @@ -0,0 +1,41 @@ +const { pool } = require('../lib/db/connection'); + +var client; + +const channels = {}; + +async function notify (data) { + console.log("NOTIFY", data); + if (data.channel in channels) { + data._received = new Date(); + for (const listener of channels[data.channel]) { + listener(data); + } + } +} + +async function listen (addChannels, callback) { + if (!client) { + client = await pool.connect(); + client.on('notification', notify); + console.log("Client connected"); + } + + if (!Array.isArray(addChannels)) { + addChannels = [addChannels]; + } + + for (const channel of addChannels) { + if (!(channel in channels)) { + await client.query("LISTEN "+channel); + channels[channel] = new Set(); + console.log("Listening to ", channel); + } + + channels[channel].add(callback); + } +} + +module.exports = { + listen +} diff --git a/lib/www/server/ws/index.js b/lib/www/server/ws/index.js new file mode 100644 index 0000000..64b43ae --- /dev/null +++ b/lib/www/server/ws/index.js @@ -0,0 +1,49 @@ +const ws = require('ws'); +const URL = require('url'); +const db = require('./db'); + +function start (server, pingInterval=30000) { + + const wsServer = new ws.Server({ noServer: true }); + wsServer.on('connection', socket => { + socket.alive = true; + socket.on('pong', function () { this.alive = true; }) + socket.on('message', message => console.log(message)); + }); + + server.on('upgrade', (request, socket, head) => { + console.log("Received upgrade request", request.url); + const url = URL.parse(request.url); + if (/^\/ws\/?$/.test(url.pathname)) { + wsServer.handleUpgrade(request, socket, head, socket => { + wsServer.emit('connection', socket, request); + }); + } + }); + + db.listen(["realtime", "event", "project"], (data) => { + console.log("DB realtime", data); + console.log(wsServer.clients.length, "clients"); + wsServer.clients.forEach( (socket) => { + socket.send(JSON.stringify(data)); + }) + }); + + const interval = setInterval( () => { + wsServer.clients.forEach( (socket) => { + if (!socket.alive) { + return socket.terminate(); + } + socket.alive = false; + socket.ping(); + }) + }, pingInterval); + + wsServer.on('close', () => clearInterval(interval)); + + return wsServer; +} + +module.exports = { + start +}