const { queue } = require('../../lib/db'); /** * Fetch up to limit items from the queue having * status = status. Take oldest first. */ async function fetchItems ({status, limit}) { status = status || "queued"; limit = limit || 10; return await queue.get(null, {status, limit, order: "created_on", dir: "+"}) } /** * Set items to status=failed in the database, save * the results *and* modify the item itself, so that * it can be picked up by rescheduleFailed(). */ async function markFailed (item, results) { item.status = "failed"; item.results = results; return await queue.put(item.item_id, {status: 'failed', results}); } /** * Set items to status=sent in the database and save * the results. */ async function markSent (item, results) { return await queue.put(item.item_id, {status: 'sent', results}); } /** * Reschedule any item in `items` that has been marked * as failed. The newly created item will take its parent's * parent_id if present, or else its item_id. */ async function rescheduleFailed (items) { const failed = items.filter(i => i.status == 'failed'); if (failed.length) console.warn(failed.length, "failed items"); for (const item of failed) { await queue.post('asaqc', item.payload, item.parent_id || item.item_id); } } module.exports = {fetchItems, markFailed, markSent, rescheduleFailed};