mirror of
https://gitlab.com/wgp/dougal/software.git
synced 2025-12-06 11:57:08 +00:00
Merge branch '274-use-new-db-event-notifier-for-event-processing-handlers' into 'devel'
Resolve "Use new DB event notifier for event processing handlers" Closes #275, #230, and #274 See merge request wgp/dougal/software!42
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -12,3 +12,4 @@ etc/surveys/*.yaml
|
||||
!etc/surveys/_*.yaml
|
||||
etc/ssl/*
|
||||
etc/config.yaml
|
||||
var/*
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
const { listen } = require('../ws/db');
|
||||
const { listen } = require('../lib/db/notify');
|
||||
const channels = require('../lib/db/channels');
|
||||
const handlers = require('./handlers').init();
|
||||
const { ERROR, INFO, DEBUG } = require('DOUGAL_ROOT/debug')(__filename);
|
||||
|
||||
@@ -1,71 +0,0 @@
|
||||
const { pool } = require('../lib/db/connection');
|
||||
|
||||
var client;
|
||||
|
||||
const channels = {};
|
||||
|
||||
async function notify (data) {
|
||||
|
||||
if (data.channel in channels) {
|
||||
data._received = new Date();
|
||||
try {
|
||||
const json = JSON.parse(data.payload);
|
||||
data.payload = json;
|
||||
} catch {
|
||||
// Ignore the error
|
||||
}
|
||||
for (const listener of channels[data.channel]) {
|
||||
await listener(JSON.parse(JSON.stringify(data)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function reconnect () {
|
||||
console.log("Reconnecting");
|
||||
// No need to provide parameters, channels should already be populated.
|
||||
listen();
|
||||
}
|
||||
|
||||
async function listen (addChannels, callback) {
|
||||
if (!client) {
|
||||
try {
|
||||
client = await pool.connect();
|
||||
} catch (err) {
|
||||
console.error("Error connecting to DB", err);
|
||||
console.log("Will try again in 15 seconds");
|
||||
setImmediate(() => client = null);
|
||||
setTimeout(() => {
|
||||
listen(addChannels, callback);
|
||||
}, 15000);
|
||||
return;
|
||||
}
|
||||
client.on('notification', notify);
|
||||
console.log("Websocket client connected", Object.keys(channels));
|
||||
client.on('error', (err) => console.error("Events client error: ", err));
|
||||
client.on('end', () => {
|
||||
console.warn("Websocket events client disconnected. Will attempt to reconnect in five seconds");
|
||||
setImmediate(() => client = null);
|
||||
setTimeout(reconnect, 5000);
|
||||
});
|
||||
}
|
||||
|
||||
if (addChannels) {
|
||||
if (!Array.isArray(addChannels)) {
|
||||
addChannels = [addChannels];
|
||||
}
|
||||
|
||||
for (const channel of addChannels) {
|
||||
if (!(channel in channels)) {
|
||||
await client.query("LISTEN "+channel);
|
||||
channels[channel] = [];
|
||||
console.log("Listening to ", channel);
|
||||
}
|
||||
|
||||
channels[channel].push(callback);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
listen
|
||||
}
|
||||
@@ -1,6 +1,6 @@
|
||||
const ws = require('ws');
|
||||
const URL = require('url');
|
||||
const db = require('./db');
|
||||
const { listen } = require('../lib/db/notify');
|
||||
const channels = require('../lib/db/channels');
|
||||
|
||||
function start (server, pingInterval=30000) {
|
||||
@@ -22,7 +22,7 @@ function start (server, pingInterval=30000) {
|
||||
}
|
||||
});
|
||||
|
||||
db.listen(channels, (data) => {
|
||||
listen(channels, (data) => {
|
||||
wsServer.clients.forEach( (socket) => {
|
||||
socket.send(JSON.stringify(data));
|
||||
})
|
||||
|
||||
@@ -16,7 +16,12 @@ OUTPATH="$OUTDIR/$OUTNAME"
|
||||
# 30000/UDP: Navigation system headers
|
||||
# Not all inputs will be present in all systems.
|
||||
#
|
||||
EXPR="udp and (port 4461 or port 4462 or port 30000)"
|
||||
# NOTE: $INS_HOST must be defined and point to the
|
||||
# navigation server. The reason we don't use a port
|
||||
# filter for this data is because that doesn't work
|
||||
# with fragmented UDP packets.
|
||||
#
|
||||
EXPR="udp and (port 4461 or port 4462 or src host $INS_HOST)"
|
||||
|
||||
if [[ ! -d "$OUTDIR" ]]; then
|
||||
mkdir "$OUTDIR"
|
||||
|
||||
42
sbin/rewrite-captures.sh
Executable file
42
sbin/rewrite-captures.sh
Executable file
@@ -0,0 +1,42 @@
|
||||
#!/bin/bash
|
||||
#
|
||||
# Rewrite packet captures in order to be able to replay them.
|
||||
#
|
||||
# SINET: Rewrite all packets with this source IP address
|
||||
# SETHER: Rewrite all packets with this MAC
|
||||
#
|
||||
# DINET: Rewrite all packets with this destination IP address
|
||||
# DETHER: Rewrite all packets with this destination MAC address
|
||||
#
|
||||
# The resulting files have the original name with "-rewritten.pcap"
|
||||
# appended as a suffix. Those packets may then be replayed from a
|
||||
# different computer or virtual container, for instance with:
|
||||
#
|
||||
# sudo bittwist -i 1 -v -m10 capture-rewritten.pcap
|
||||
#
|
||||
# Where -i n is the interface name (use bittwist -d to list available
|
||||
# interfaces), -v is the verbose flag and -m10 replays at 10× speed.
|
||||
#
|
||||
|
||||
SINET=${SINET:-$(ip -o -4 addr |grep -v " lo " |head -n 1 |sed -r 's/^.*inet\s([0-9.]+).*$/\1/')}
|
||||
SETHER=${SETHER:-$(ip -o link |grep -v " lo" |head -n 1 |sed -r 's/^.*ether\s([0-9a-fA-F:]+).*$/\1/')}
|
||||
|
||||
DINET=${DINET:-$(ip -o -4 addr |grep -v " lo " |head -n 1 |sed -r 's/^.*inet\s([0-9.]+).*$/\1/')}
|
||||
DETHER=${DETHER:-$(ip -o link |grep -v " lo" |head -n 1 |sed -r 's/^.*ether\s([0-9a-fA-F:]+).*$/\1/')}
|
||||
|
||||
for f in $*; do
|
||||
|
||||
OUTFNAME=$f-rewritten.pcap
|
||||
echo $f → $OUTFNAME
|
||||
if [[ -n "$SINET" && -n "$SETHER" ]]; then
|
||||
tcprewrite -S 0.0.0.0/0:$SINET --enet-smac=$SETHER \
|
||||
-D 0.0.0.0/0:$DINET --enet-dmac=$DETHER \
|
||||
--infile "$f" \
|
||||
--outfile "$OUTFNAME"
|
||||
else
|
||||
tcprewrite -D 0.0.0.0/0:$DINET --enet-dmac=$DETHER \
|
||||
--infile "$f" \
|
||||
--outfile "$OUTFNAME"
|
||||
fi
|
||||
|
||||
done
|
||||
Reference in New Issue
Block a user