Add ASAQC queue processor.

This code implements the backend processing side
of the ASAQC queue, i.e., the bit that communicates
with the remote API.

Its expected use it to have it running at regular
intervals, e.g., via cron. The entry point is:

lib/www/server/queues/asaqc/index.js

That file is executable and can be run directly
from the shell or within a script. Read the comments
in that file for further instructions.
This commit is contained in:
D. Berge
2021-10-04 02:15:10 +02:00
parent 0f447fc27d
commit 5c190e5554
5 changed files with 246 additions and 0 deletions

View File

@@ -0,0 +1,51 @@
const { queue } = require('../../lib/db');
/**
* Fetch up to limit items from the queue having
* status = status. Take oldest first.
*/
async function fetchItems ({status, limit}) {
status ||= "queued";
limit ||= 10;
return await queue.get(null, {status, limit, order: "created_on", dir: "+"})
}
/**
* Set items to status=failed in the database, save
* the results *and* modify the item itself, so that
* it can be picked up by rescheduleFailed().
*/
async function markFailed (item, results) {
item.status = "failed";
item.results = results;
return await queue.put(item.item_id, {status: 'failed', results});
}
/**
* Set items to status=sent in the database and save
* the results.
*/
async function markSent (item, results) {
return await queue.put(item.item_id, {status: 'sent', results});
}
/**
* Reschedule any item in `items` that has been marked
* as failed. The newly created item will take its parent's
* parent_id if present, or else its item_id.
*/
async function rescheduleFailed (items) {
const failed = items.filter(i => i.status == 'failed');
if (failed.length)
console.warn(failed.length, "failed items");
for (const item of failed) {
await queue.post('asaqc', item.payload, item.parent_id || item.item_id);
}
}
module.exports = {fetchItems, markFailed, markSent, rescheduleFailed};

View File

@@ -0,0 +1,55 @@
const fs = require('fs');
const path = require('path');
const https = require('https');
const fetch = require('node-fetch');
const DOUGAL_ROOT = require('../../lib/config').DOUGAL_ROOT;
const cfg = require('../../lib/config').global.queues.asaqc.request;
/**
* Return a suitably configured httpsAgent with the client's TLS
* credentials if given.
*/
function httpsAgent () {
// References:
// https://github.com/node-fetch/node-fetch/issues/904
// https://nodejs.org/api/https.html#https_https_request_options_callback
if (!cfg.httpsAgent) {
return;
}
const options = {
key: fs.readFileSync(path.resolve(DOUGAL_ROOT, cfg.httpsAgent.key)),
cert: fs.readFileSync(path.resolve(DOUGAL_ROOT, cfg.httpsAgent.cert))
}
return https.Agent(options);
}
/**
* Send a payload to the ASAQC `upload-file-encoded` endpoint.
* https://api.equinor.com/docs/services/vessel-track/operations/FileUploadEncoded
*/
async function despatchPayload(payload) {
try {
const res = await fetch(cfg.url, {
...cfg.args,
body: JSON.stringify(payload),
agent: httpsAgent()
});
if (res) {
return await res.json();
} else {
console.error("NO RESPONSE FROM ASAQC ENDPOINT");
}
} catch (err) {
console.error(err);
return {error: err};
}
}
module.exports = despatchPayload;

View File

@@ -0,0 +1,29 @@
#!/usr/bin/node
/*
* Can be required as a module or called directly.
*
* In the latter case, it will do a queue run.
*
* The following environment variables may come in
* useful:
*
* DOUGAL_ROOT Use it to specify the path to Dougal's
* top directory (`software/`). Most of the time this
* is not needed unless running in a development
* environment.
*
* NODE_TLS_REJECT_UNAUTHORIZED=0 Use this when running
* against the internal test server or any other endpoint
* that has self-signed certificates. WARNING: think carefully
* if you really want to do this, most of the time you don't.
*/
module.exports = {
processQueue: require('./process')
}
if (!module.parent) {
module.exports.processQueue().then(() => process.exit());
}

View File

@@ -0,0 +1,67 @@
const { createHash } = require('crypto');
const { seisjson, pdf } = require('../../lib/sse/present');
const { configuration } = require('../../lib/db');
function digestOf(content) {
const hash = createHash('sha256');
const data = (typeof content.data == "string" || Buffer.isBuffer(content.data))
? content.data
: JSON.stringify(content.data);
hash.update(data);
return {
sha256: {hex: hash.digest('hex')}
};
}
/**
* Create the payloads to send to the ASAQC endpoint
* for a queue item.
*
* At present this consists of two files, which must be
* sent in two separate requests. One is the SeisJSON
* file and the other is its PDF representation.
*
* In principle, other options are possible, such as
* GeoJSON and CSV, and this could be made configurable.
*
* Likewise, it would be possible to upload P1 and P2 files,
* etc.
*/
async function getPayloads(item) {
const asaqcConfig = await configuration.get(item.payload.project, '/asaqc');
const surveyName = await configuration.get(item.payload.project, 'id');
const template = {
type: "acqlinelog",
imo: asaqcConfig.imo,
mmsi: asaqcConfig.mmsi,
surveyName: surveyName,
surveyId: asaqcConfig.id
};
const seisjsonData = await seisjson(item.payload);
const pdfData = await pdf(item.payload, seisjsonData);
return [
{
payload: {
...template,
fileName: seisjsonData.name,
encodedData: Buffer.from(JSON.stringify(seisjsonData.data)).toString("base64")
},
digest: digestOf(seisjsonData)
},
{
payload: {
...template,
fileName: pdfData.name,
encodedData: pdfData.data.toString("base64")
},
digest: digestOf(pdfData)
}
]
}
module.exports = getPayloads;

View File

@@ -0,0 +1,44 @@
const getPayloads = require('./payloads');
const despatchPayload = require('./despatch');
const {fetchItems, markFailed, markSent, rescheduleFailed} = require('./db');
function passed (result) {
return "id" in result;
}
/**
* Process the queue.
*
* Try to send up to a certain number of
* items from the queue.
* Reschedule any failed items.
*/
async function processQueue () {
const items = await fetchItems({status: "queued"});
for (const item of items) {
const payloads = await getPayloads(item, (digestInfo) => {item.digest = digestInfo});
const results = [];
for (const {payload, digest} of payloads) {
const response = await despatchPayload(payload);
results.push({response, digest});
}
if (results.some(result => !passed(result.response))) {
await markFailed(item, results);
} else {
await markSent(item, results);
}
}
await rescheduleFailed(items);
}
module.exports = processQueue;
if (!module.parent) {
processQueue().then(() => process.exit());
}