Add queue-related functions to the database interface.

These functions, in general following the same HTTP-verb
approach as the rest of the database interface, are for
use with both the HTTP API and the queue processor.
This commit is contained in:
D. Berge
2021-10-03 21:31:26 +02:00
parent db8efce346
commit 6debf5c355
6 changed files with 191 additions and 1 deletions

View File

@@ -10,5 +10,6 @@ module.exports = {
configuration: require('./configuration'),
info: require('./info'),
meta: require('./meta'),
navdata: require('./navdata')
navdata: require('./navdata'),
queue: require('./queue')
};

View File

@@ -0,0 +1,24 @@
const { transaction, pool } = require('../connection');
async function post(queueId/*unused*/, item_id) {
const client = await pool.connect();
const text = `
UPDATE queue_items
SET status = 'cancelled'
WHERE status = 'queued'
AND (item_id = $1
OR parent_id = $1);
`;
const res = await client.query(text, [item_id]);
client.release();
if (!res.rowCount) {
throw { status: 404, message: "No cancellable requests" };
}
}
module.exports = post;

View File

@@ -0,0 +1,72 @@
const { pool } = require('../connection');
async function get (queueId/*unused*/, opts = {}) {
const client = await pool.connect();
const validStatuses = [
'queued',
'cancelled',
'failed',
'sent'
];
const validOrders = {
created_on: "created_on",
updated_on: "updated_on",
not_before: "not_before",
item_id: "item_id",
status: "status",
parent_id: "parent_id",
project: "payload->'project'",
sequence: "payload->'sequence'"
};
const limit = Math.min(Math.abs(opts.limit) || 100, 1000);
const offset = Math.abs(opts.offset) || 0;
const order = validOrders[opts.order] || "updated_on";
const dir = (!opts.dir || opts.dir == "-")
? "DESC"
: "ASC";
const validStatus = validStatuses.includes(opts.status);
const status = validStatus ? opts.status : 1;
const hasProject = "project" in opts;
const hasSequence = "sequence" in opts;
const project = hasProject
? opts.project
: 1;
const sequence = hasSequence
? opts.sequence
: 1;
const restrict1 = validStatus
? "status = $3"
: "$3 = $3";
const restrict2 = hasProject
? "AND payload->>'project' = $4"
: "AND $4 = $4";
// Yes, technically a user could restrict by sequence
// only, without specifying a project. Let's call that
// a feature.
const restrict3 = hasSequence
? "AND payload->>'sequence' = $5"
: "AND $5 = $5";
const text = `
SELECT *
FROM queue_items
WHERE ${restrict1} ${restrict2} ${restrict3}
ORDER BY ${order} ${dir}
LIMIT $1
OFFSET $2;
`;
const res = await client.query(text, [limit, offset, status, project, sequence]);
client.release();
return res.rows;
}
module.exports = get;

View File

@@ -0,0 +1,6 @@
module.exports = {
get: require('./get'),
post: require('./post'),
put: require('./put'),
delete: require('./delete')
};

View File

@@ -0,0 +1,38 @@
const { transaction, pool } = require('../connection');
async function post(queueId/*unused*/, payload, parent_id) {
// Technically the API only supports an array payload,
// but we'll be permissive and accept a single request
// object.
if (!Array.isArray(payload)) {
payload = [payload];
}
const client = await pool.connect();
await transaction.begin(client);
for (const {project, sequence} of payload) {
if ([project, sequence].some(v => typeof v === "undefined")) {
throw { status: 400, message: "Malformed request" };
}
// If we got here, the request is probably OK.
// Most fields just take default values.
const text = `
INSERT INTO queue_items (payload, parent_id)
VALUES ($1, $2);
`;
await client.query(text, [{project, sequence}, parent_id]);
}
await transaction.commit(client);
client.release();
}
module.exports = post;

View File

@@ -0,0 +1,49 @@
const { pool } = require('../connection');
/**
* Stringify arrays.
*
* node-postgres has an issue in that it transforms
* JSON array objects into PostgreSQL arrays instead
* of JSON arrays, which causes a problem for json/jsonb
* fields.
*
* Note: this is a feature not a bug. See:
* https://github.com/brianc/node-postgres/issues/442
*/
function formatValue(value) {
if (Array.isArray(value)) {
return JSON.stringify(value);
} else {
return value;
}
}
async function put (item_id, values) {
const client = await pool.connect();
const updateable = [ "status", "results", "parent_id" ];
const fields = [];
const params = [item_id];
for (const field of updateable) {
if (field in values) {
fields.push(`${field} = $${params.length+1}`);
params.push(formatValue(values[field]));
}
}
const text = `
UPDATE queue_items
SET
${fields.join(",\n")}
WHERE item_id = $1
RETURNING *;
`;
const res = await client.query(text, params);
client.release();
return res.rows[0];
}
module.exports = put;