diff --git a/lib/www/server/queues/asaqc/db.js b/lib/www/server/queues/asaqc/db.js new file mode 100644 index 0000000..9bc7c31 --- /dev/null +++ b/lib/www/server/queues/asaqc/db.js @@ -0,0 +1,51 @@ +const { queue } = require('../../lib/db'); + + +/** + * Fetch up to limit items from the queue having + * status = status. Take oldest first. + */ +async function fetchItems ({status, limit}) { + status ||= "queued"; + limit ||= 10; + return await queue.get(null, {status, limit, order: "created_on", dir: "+"}) +} + +/** + * Set items to status=failed in the database, save + * the results *and* modify the item itself, so that + * it can be picked up by rescheduleFailed(). + */ +async function markFailed (item, results) { + item.status = "failed"; + item.results = results; + return await queue.put(item.item_id, {status: 'failed', results}); +} + + +/** + * Set items to status=sent in the database and save + * the results. + */ +async function markSent (item, results) { + return await queue.put(item.item_id, {status: 'sent', results}); +} + + +/** + * Reschedule any item in `items` that has been marked + * as failed. The newly created item will take its parent's + * parent_id if present, or else its item_id. + */ +async function rescheduleFailed (items) { + const failed = items.filter(i => i.status == 'failed'); + + if (failed.length) + console.warn(failed.length, "failed items"); + + for (const item of failed) { + await queue.post('asaqc', item.payload, item.parent_id || item.item_id); + } +} + +module.exports = {fetchItems, markFailed, markSent, rescheduleFailed}; diff --git a/lib/www/server/queues/asaqc/despatch.js b/lib/www/server/queues/asaqc/despatch.js new file mode 100644 index 0000000..c318184 --- /dev/null +++ b/lib/www/server/queues/asaqc/despatch.js @@ -0,0 +1,55 @@ +const fs = require('fs'); +const path = require('path'); +const https = require('https'); +const fetch = require('node-fetch'); +const DOUGAL_ROOT = require('../../lib/config').DOUGAL_ROOT; +const cfg = require('../../lib/config').global.queues.asaqc.request; + +/** + * Return a suitably configured httpsAgent with the client's TLS + * credentials if given. + */ +function httpsAgent () { + // References: + // https://github.com/node-fetch/node-fetch/issues/904 + // https://nodejs.org/api/https.html#https_https_request_options_callback + + if (!cfg.httpsAgent) { + return; + } + + const options = { + key: fs.readFileSync(path.resolve(DOUGAL_ROOT, cfg.httpsAgent.key)), + cert: fs.readFileSync(path.resolve(DOUGAL_ROOT, cfg.httpsAgent.cert)) + } + + return https.Agent(options); +} + + +/** + * Send a payload to the ASAQC `upload-file-encoded` endpoint. + * https://api.equinor.com/docs/services/vessel-track/operations/FileUploadEncoded + */ +async function despatchPayload(payload) { + + try { + const res = await fetch(cfg.url, { + ...cfg.args, + body: JSON.stringify(payload), + agent: httpsAgent() + }); + + if (res) { + return await res.json(); + } else { + console.error("NO RESPONSE FROM ASAQC ENDPOINT"); + } + } catch (err) { + console.error(err); + return {error: err}; + } +} + + +module.exports = despatchPayload; diff --git a/lib/www/server/queues/asaqc/index.js b/lib/www/server/queues/asaqc/index.js new file mode 100755 index 0000000..ac365c7 --- /dev/null +++ b/lib/www/server/queues/asaqc/index.js @@ -0,0 +1,29 @@ +#!/usr/bin/node + +/* + * Can be required as a module or called directly. + * + * In the latter case, it will do a queue run. + * + * The following environment variables may come in + * useful: + * + * DOUGAL_ROOT Use it to specify the path to Dougal's + * top directory (`software/`). Most of the time this + * is not needed unless running in a development + * environment. + * + * NODE_TLS_REJECT_UNAUTHORIZED=0 Use this when running + * against the internal test server or any other endpoint + * that has self-signed certificates. WARNING: think carefully + * if you really want to do this, most of the time you don't. + */ + +module.exports = { + processQueue: require('./process') +} + + +if (!module.parent) { + module.exports.processQueue().then(() => process.exit()); +} diff --git a/lib/www/server/queues/asaqc/payloads.js b/lib/www/server/queues/asaqc/payloads.js new file mode 100644 index 0000000..0b879e5 --- /dev/null +++ b/lib/www/server/queues/asaqc/payloads.js @@ -0,0 +1,67 @@ +const { createHash } = require('crypto'); +const { seisjson, pdf } = require('../../lib/sse/present'); +const { configuration } = require('../../lib/db'); + +function digestOf(content) { + const hash = createHash('sha256'); + const data = (typeof content.data == "string" || Buffer.isBuffer(content.data)) + ? content.data + : JSON.stringify(content.data); + hash.update(data); + + return { + sha256: {hex: hash.digest('hex')} + }; +} + +/** + * Create the payloads to send to the ASAQC endpoint + * for a queue item. + * + * At present this consists of two files, which must be + * sent in two separate requests. One is the SeisJSON + * file and the other is its PDF representation. + * + * In principle, other options are possible, such as + * GeoJSON and CSV, and this could be made configurable. + * + * Likewise, it would be possible to upload P1 and P2 files, + * etc. + */ +async function getPayloads(item) { + + const asaqcConfig = await configuration.get(item.payload.project, '/asaqc'); + const surveyName = await configuration.get(item.payload.project, 'id'); + + const template = { + type: "acqlinelog", + imo: asaqcConfig.imo, + mmsi: asaqcConfig.mmsi, + surveyName: surveyName, + surveyId: asaqcConfig.id + }; + + const seisjsonData = await seisjson(item.payload); + const pdfData = await pdf(item.payload, seisjsonData); + + return [ + { + payload: { + ...template, + fileName: seisjsonData.name, + encodedData: Buffer.from(JSON.stringify(seisjsonData.data)).toString("base64") + }, + digest: digestOf(seisjsonData) + }, + { + payload: { + ...template, + fileName: pdfData.name, + encodedData: pdfData.data.toString("base64") + }, + digest: digestOf(pdfData) + } + ] +} + +module.exports = getPayloads; diff --git a/lib/www/server/queues/asaqc/process.js b/lib/www/server/queues/asaqc/process.js new file mode 100644 index 0000000..8b3ccd3 --- /dev/null +++ b/lib/www/server/queues/asaqc/process.js @@ -0,0 +1,44 @@ + +const getPayloads = require('./payloads'); +const despatchPayload = require('./despatch'); +const {fetchItems, markFailed, markSent, rescheduleFailed} = require('./db'); + + +function passed (result) { + return "id" in result; +} + +/** + * Process the queue. + * + * Try to send up to a certain number of + * items from the queue. + * Reschedule any failed items. + */ +async function processQueue () { + + const items = await fetchItems({status: "queued"}); + for (const item of items) { + const payloads = await getPayloads(item, (digestInfo) => {item.digest = digestInfo}); + const results = []; + + for (const {payload, digest} of payloads) { + const response = await despatchPayload(payload); + results.push({response, digest}); + } + + if (results.some(result => !passed(result.response))) { + await markFailed(item, results); + } else { + await markSent(item, results); + } + } + + await rescheduleFailed(items); +} + +module.exports = processQueue; + +if (!module.parent) { + processQueue().then(() => process.exit()); +}