diff --git a/lib/www/server/api/index.js b/lib/www/server/api/index.js index 5553ba1..8fe659e 100644 --- a/lib/www/server/api/index.js +++ b/lib/www/server/api/index.js @@ -235,6 +235,15 @@ app.map({ patch: [ mw.auth.access.write, mw.event.patch ], delete: [mw.auth.access.write, mw.event.delete ] }, + 'import': { + put: [ mw.auth.access.write, mw.event.import.csv, mw.event.import.put ], + post: [ mw.auth.access.write, mw.event.import.csv, mw.event.import.put ], + '/:filename': { + put: [ mw.auth.access.read, mw.event.import.csv, mw.event.import.put ], + post: [ mw.auth.access.write, mw.event.import.csv, mw.event.import.put ], + delete: [ mw.auth.access.write, mw.event.import.delete ] + }, + }, }, /* diff --git a/lib/www/server/api/middleware/event/import/csv.js b/lib/www/server/api/middleware/event/import/csv.js new file mode 100644 index 0000000..d20f154 --- /dev/null +++ b/lib/www/server/api/middleware/event/import/csv.js @@ -0,0 +1,146 @@ +const Busboy = require('busboy'); +const { parse } = require('csv-parse/sync'); + +async function middleware(req, res, next) { + const contentType = req.headers['content-type'] || ''; + let csvText = null; + let filename = null; + + if (req.params.filename && contentType.startsWith('text/csv')) { + csvText = typeof req.body === 'string' ? req.body : req.body.toString('utf8'); + filename = req.params.filename; + processCsv(); + } else if (contentType.startsWith('multipart/form-data')) { + const busboy = Busboy({ headers: req.headers }); + let found = false; + busboy.on('file', (name, file, info) => { + if (found) { + file.resume(); + return; + } + if (info.mimeType === 'text/csv') { + found = true; + filename = info.filename || 'unnamed.csv'; + csvText = ''; + file.setEncoding('utf8'); + file.on('data', (data) => { csvText += data; }); + file.on('end', () => {}); + } else { + file.resume(); + } + }); + busboy.on('field', () => {}); // Ignore fields + busboy.on('finish', () => { + if (!found) { + return next(); + } + processCsv(); + }); + req.pipe(busboy); + return; + } else { + return next(); + } + + function processCsv() { + let records; + try { + records = parse(csvText, { + relax_quotes: true, + quote: '"', + escape: '"', + skip_empty_lines: true, + trim: true + }); + } catch (e) { + return res.status(400).json({ error: 'Invalid CSV' }); + } + if (!records.length) { + return res.status(400).json({ error: 'Empty CSV' }); + } + const headers = records[0].map(h => h.toLowerCase().trim()); + const rows = records.slice(1); + let lastDate = null; + let lastTime = null; + const currentDate = new Date().toISOString().slice(0, 10); + const currentTime = new Date().toISOString().slice(11, 19); + const events = []; + for (let row of rows) { + let object = { labels: [] }; + for (let k = 0; k < headers.length; k++) { + let key = headers[k]; + let val = row[k] ? row[k].trim() : ''; + if (!key) continue; + if (['remarks', 'event', 'comment', 'comments', 'text'].includes(key)) { + object.remarks = val; + } else if (key === 'label') { + if (val) object.labels.push(val); + } else if (key === 'labels') { + if (val) object.labels.push(...val.split(';').map(l => l.trim()).filter(l => l)); + } else if (key === 'sequence' || key === 'seq') { + if (val) object.sequence = Number(val); + } else if (['point', 'shot', 'shotpoint'].includes(key)) { + if (val) object.point = Number(val); + } else if (key === 'date') { + object.date = val; + } else if (key === 'time') { + object.time = val; + } else if (key === 'timestamp') { + object.timestamp = val; + } else if (key === 'latitude') { + object.latitude = parseFloat(val); + } else if (key === 'longitude') { + object.longitude = parseFloat(val); + } + } + if (!object.remarks) continue; + let useSeqPoint = Number.isFinite(object.sequence) && Number.isFinite(object.point); + let tstamp = null; + if (!useSeqPoint) { + if (object.timestamp) { + tstamp = new Date(object.timestamp); + } + if (!tstamp || isNaN(tstamp.getTime())) { + let dateStr = object.date || lastDate || currentDate; + let timeStr = object.time || lastTime || currentTime; + if (timeStr.length === 5) timeStr += ':00'; + let full = `${dateStr}T${timeStr}.000Z`; + tstamp = new Date(full); + if (isNaN(tstamp.getTime())) continue; + } + if (object.date) lastDate = object.date; + if (object.time) lastTime = object.time; + } + let event = { + remarks: object.remarks, + labels: object.labels, + meta: { + author: "*CSVImport*", + "*CSVImport*": { + filename, + tstamp: new Date().toISOString() + } + } + }; + if (!isNaN(object.latitude) && !isNaN(object.longitude)) { + event.meta.geometry = { + type: "Point", + coordinates: [object.longitude, object.latitude] + }; + } + if (useSeqPoint) { + event.sequence = object.sequence; + event.point = object.point; + } else if (tstamp) { + event.tstamp = tstamp.toISOString(); + } else { + continue; + } + events.push(event); + } + req.body = events; + next(); + } +} + +module.exports = middleware; diff --git a/lib/www/server/api/middleware/event/import/delete.js b/lib/www/server/api/middleware/event/import/delete.js new file mode 100644 index 0000000..77aa33e --- /dev/null +++ b/lib/www/server/api/middleware/event/import/delete.js @@ -0,0 +1,18 @@ + +const { event } = require('../../../../lib/db'); + +module.exports = async function (req, res, next) { + + try { + if (req.params.project && req.params.filename) { + await event.unimport(req.params.project, req.params.filename, req.query); + res.status(204).end(); + } else { + res.status(400).send({message: "Malformed request"}); + } + next(); + } catch (err) { + next(err); + } + +}; diff --git a/lib/www/server/api/middleware/event/import/index.js b/lib/www/server/api/middleware/event/import/index.js new file mode 100644 index 0000000..db29c69 --- /dev/null +++ b/lib/www/server/api/middleware/event/import/index.js @@ -0,0 +1,6 @@ + +module.exports = { + csv: require('./csv'), + put: require('./put'), + delete: require('./delete'), +} diff --git a/lib/www/server/api/middleware/event/import/put.js b/lib/www/server/api/middleware/event/import/put.js new file mode 100644 index 0000000..11ed05a --- /dev/null +++ b/lib/www/server/api/middleware/event/import/put.js @@ -0,0 +1,16 @@ + +const { event } = require('../../../../lib/db'); + +module.exports = async function (req, res, next) { + + try { + const payload = req.body; + + await event.import(req.params.project, payload, req.query); + res.status(200).send(payload); + next(); + } catch (err) { + next(err); + } + +}; diff --git a/lib/www/server/api/middleware/event/index.js b/lib/www/server/api/middleware/event/index.js index 3f76e69..48b2f3d 100644 --- a/lib/www/server/api/middleware/event/index.js +++ b/lib/www/server/api/middleware/event/index.js @@ -7,5 +7,6 @@ module.exports = { put: require('./put'), patch: require('./patch'), delete: require('./delete'), - changes: require('./changes') + changes: require('./changes'), + import: require('./import'), } diff --git a/lib/www/server/lib/db/event/import.js b/lib/www/server/lib/db/event/import.js new file mode 100644 index 0000000..b9ac730 --- /dev/null +++ b/lib/www/server/lib/db/event/import.js @@ -0,0 +1,105 @@ +const { DEBUG, ERROR } = require('DOUGAL_ROOT/debug')(__filename); +const { setSurvey, transaction } = require('../connection'); + + +/** Remove a previous import from the database. + * + * ATTENTION! + * + * This will not just mark the events as deleted but actually + * remove them. + */ +async function bulk_unimport (projectId, filename, opts = {}) { + + const client = opts.client ?? await setSurvey(projectId); + try { + + const text = ` + DELETE + FROM event_log + WHERE meta ? 'author' + AND meta->(meta->>'author')->>'filename' = $1; + `; + const values = [ filename ]; + + DEBUG("Removing all event data imported from filename '%s'", filename); + await client.query(text, values); + } catch (err) { + err.origin = __filename; + throw err; + } finally { + if (client !== opts.client) client.release(); + } + + return; +} + + +async function bulk_import (projectId, payload, opts = {}) { + + const client = opts.client ?? await setSurvey(projectId); + try { + + if (!payload.length) { + DEBUG("Called with no rows to be imported. Returning"); + return []; + } + + const filename = payload[0].meta[payload[0].meta.author].filename; + + // Delete previous data from this file + await transaction.begin(client); + await bulk_unimport(projectId, filename, {client}); + + + // Prepare arrays for each column + const tstamps = []; + const sequences = []; + const points = []; + const remarks = []; + const labels = []; + const metas = []; + + for (const event of payload) { + tstamps.push(event.tstamp ? new Date(event.tstamp) : null); + sequences.push(Number.isInteger(event.sequence) ? event.sequence : null); + points.push(Number.isInteger(event.point) ? event.point : null); + remarks.push(event.remarks || ''); + labels.push(Array.isArray(event.labels) && event.labels.length + ? `{${event.labels.map(l => `"${l.replace(/"/g, '""')}"`).join(',')}}` + : '{}' + ); + metas.push(event.meta ? JSON.stringify(event.meta) : '{}'); + } + + + const text = ` + INSERT INTO event_log (tstamp, sequence, point, remarks, labels, meta) + SELECT + UNNEST($1::TIMESTAMP[]) AS tstamp, + UNNEST($2::INTEGER[]) AS sequence, + UNNEST($3::INTEGER[]) AS point, + replace_placeholders(UNNEST($4::TEXT[]), UNNEST($1::TIMESTAMP[]), UNNEST($2::INTEGER[]), UNNEST($3::INTEGER[])) AS remarks, + UNNEST($5::TEXT[])::TEXT[] AS labels, + UNNEST($6::JSONB[]) AS meta + RETURNING id; + `; + const values = [ tstamps, sequences, points, remarks, labels, metas ]; + + DEBUG("Importing %d rows from filename '%s'", payload.length, filename); + const res = await client.query(text, values); + + transaction.commit(client); + + return res.rows.map(row => row.id); + } catch (err) { + err.origin = __filename; + throw err; + } finally { + if (client !== opts.client) client.release(); + } + + return; +} + +module.exports = { import: bulk_import, unimport: bulk_unimport }; diff --git a/lib/www/server/lib/db/event/index.js b/lib/www/server/lib/db/event/index.js index 266ab50..cd65f0a 100644 --- a/lib/www/server/lib/db/event/index.js +++ b/lib/www/server/lib/db/event/index.js @@ -6,5 +6,7 @@ module.exports = { put: require('./put'), patch: require('./patch'), del: require('./delete'), - changes: require('./changes') + changes: require('./changes'), + import: require('./import').import, + unimport: require('./import').unimport, } diff --git a/lib/www/server/lib/db/event/unimport.js b/lib/www/server/lib/db/event/unimport.js new file mode 100644 index 0000000..b6450ef --- /dev/null +++ b/lib/www/server/lib/db/event/unimport.js @@ -0,0 +1,37 @@ + +const { DEBUG, ERROR } = require('DOUGAL_ROOT/debug')(__filename); +const { setSurvey, transaction } = require('../connection'); + +/** Remove a previous import from the database. + * + * ATTENTION! + * + * This will not just mark the events as deleted but actually + * remove them. + */ +async function unimport (projectId, filename, opts = {}) { + + const client = await setSurvey(projectId); + try { + + const text = ` + DELETE + FROM event_log + WHERE meta ? 'author' + AND meta->(meta->'author')->>'filename' = $1; + `; + const values = [ filename ]; + + DEBUG("Removing all event data imported from filename '%s'", filename); + await client.query(text, values); + } catch (err) { + err.origin = __filename; + throw err; + } finally { + client.release(); + } + + return; +} + +module.exports = post; diff --git a/lib/www/server/package.json b/lib/www/server/package.json index be07722..f463bac 100644 --- a/lib/www/server/package.json +++ b/lib/www/server/package.json @@ -30,6 +30,7 @@ "@dougal/organisations": "file:../../modules/@dougal/organisations", "@dougal/user": "file:../../modules/@dougal/user", "body-parser": "gitlab:aaltronav/contrib/expressjs/body-parser", + "busboy": "^1.6.0", "compression": "^1.8.1", "cookie-parser": "^1.4.5", "csv": "^6.3.3", diff --git a/package-lock.json b/package-lock.json index 3e5108f..150a128 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9360,6 +9360,7 @@ "@dougal/organisations": "file:../../modules/@dougal/organisations", "@dougal/user": "file:../../modules/@dougal/user", "body-parser": "gitlab:aaltronav/contrib/expressjs/body-parser", + "busboy": "^1.6.0", "compression": "^1.8.1", "cookie-parser": "^1.4.5", "csv": "^6.3.3", @@ -14274,6 +14275,17 @@ "node": ">=0.10.0" } }, + "node_modules/busboy": { + "version": "1.6.0", + "resolved": "https://registry.npmjs.org/busboy/-/busboy-1.6.0.tgz", + "integrity": "sha512-8SFQbg/0hQ9xy3UNTB0YEnsNBbWfhf7RtnzpL7TkBiTBRfrQ9Fxcnz7VJsleJpyp6rVLvXiuORqjlHi5q+PYuA==", + "dependencies": { + "streamsearch": "^1.1.0" + }, + "engines": { + "node": ">=10.16.0" + } + }, "node_modules/bytes": { "version": "3.1.2", "resolved": "https://registry.npmjs.org/bytes/-/bytes-3.1.2.tgz", @@ -16504,6 +16516,14 @@ "node": ">= 0.8" } }, + "node_modules/streamsearch": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/streamsearch/-/streamsearch-1.1.0.tgz", + "integrity": "sha512-Mcc5wHehp9aXz1ax6bZUyY5afg9u2rv5cqQI3mRrYkGC8rW2hM02jWuwjtL++LS5qinSyhj2QfLyNsuc+VsExg==", + "engines": { + "node": ">=10.0.0" + } + }, "node_modules/string_decoder": { "version": "1.3.0", "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.3.0.tgz",