Files
dougal-software/etc/db/upgrades/upgrade30-v0.4.3-large-notification-payloads.sql
2023-10-14 18:29:28 +02:00

165 lines
4.6 KiB
PL/PgSQL

-- Support notification payloads larger than Postgres' NOTIFY limit.
--
-- New schema version: 0.4.3
--
-- ATTENTION:
--
-- ENSURE YOU HAVE BACKED UP THE DATABASE BEFORE RUNNING THIS SCRIPT.
--
--
-- NOTE: This upgrade affects the public schema only.
-- NOTE: Each application starts a transaction, which must be committed
-- or rolled back.
--
-- This creates a new table where large notification payloads are stored
-- temporarily and from which they might be recalled by the notification
-- listeners. It also creates a purge_notifications() procedure used to
-- clean up old notifications from the notifications log and finally,
-- modifies notify() to support these changes. When a large payload is
-- encountered, the payload is stored in the notify_payloads table and
-- a trimmed down version containing a notification_id is sent to listeners
-- instead. Listeners can then query notify_payloads to retrieve the full
-- payloads. It is the application layer's responsibility to delete old
-- notifications.
--
-- To apply, run as the dougal user:
--
-- psql <<EOF
-- \i $THIS_FILE
-- COMMIT;
-- EOF
--
-- NOTE: It can be applied multiple times without ill effect.
--
BEGIN;
CREATE OR REPLACE PROCEDURE pg_temp.show_notice (notice text) AS $$
BEGIN
RAISE NOTICE '%', notice;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE PROCEDURE pg_temp.upgrade_schema () AS $outer$
BEGIN
RAISE NOTICE 'Updating public schema';
-- We need to set the search path because some of the trigger
-- functions reference other tables in survey schemas assuming
-- they are in the search path.
EXECUTE format('SET search_path TO public');
CREATE TABLE IF NOT EXISTS public.notify_payloads (
id SERIAL,
tstamp timestamptz NOT NULL DEFAULT CURRENT_TIMESTAMP,
payload text NOT NULL DEFAULT '',
PRIMARY KEY (id)
);
CREATE INDEX IF NOT EXISTS notify_payload_tstamp ON notify_payloads (tstamp);
CREATE OR REPLACE FUNCTION public.notify() RETURNS trigger
LANGUAGE plpgsql
AS $$
DECLARE
channel text := TG_ARGV[0];
pid text;
payload text;
notification text;
payload_id integer;
BEGIN
SELECT projects.pid INTO pid FROM projects WHERE schema = TG_TABLE_SCHEMA;
payload := json_build_object(
'tstamp', CURRENT_TIMESTAMP,
'operation', TG_OP,
'schema', TG_TABLE_SCHEMA,
'table', TG_TABLE_NAME,
'old', row_to_json(OLD),
'new', row_to_json(NEW),
'pid', pid
)::text;
IF octet_length(payload) < 1000 THEN
PERFORM pg_notify(channel, payload);
ELSE
-- We need to find another solution
-- FIXME Consider storing the payload in a temporary memory table,
-- referenced by some form of autogenerated ID. Then send the ID
-- as the payload and then it's up to the user to fetch the original
-- payload if interested. This needs a mechanism to expire older payloads
-- in the interest of conserving memory.
INSERT INTO notify_payloads (payload) VALUES (payload) RETURNING id INTO payload_id;
notification := json_build_object(
'tstamp', CURRENT_TIMESTAMP,
'operation', TG_OP,
'schema', TG_TABLE_SCHEMA,
'table', TG_TABLE_NAME,
'pid', pid,
'payload_id', payload_id
)::text;
PERFORM pg_notify(channel, notification);
RAISE INFO 'Payload over limit';
END IF;
RETURN NULL;
END;
$$;
CREATE PROCEDURE public.purge_notifications (age_seconds numeric DEFAULT 120) AS $$
DELETE FROM notify_payloads WHERE EXTRACT(epoch FROM CURRENT_TIMESTAMP - tstamp) > age_seconds;
$$ LANGUAGE sql;
END;
$outer$ LANGUAGE plpgsql;
CREATE OR REPLACE PROCEDURE pg_temp.upgrade () AS $outer$
DECLARE
row RECORD;
current_db_version TEXT;
BEGIN
SELECT value->>'db_schema' INTO current_db_version FROM public.info WHERE key = 'version';
IF current_db_version >= '0.4.3' THEN
RAISE EXCEPTION
USING MESSAGE='Patch already applied';
END IF;
IF current_db_version != '0.4.2' THEN
RAISE EXCEPTION
USING MESSAGE='Invalid database version: ' || current_db_version,
HINT='Ensure all previous patches have been applied.';
END IF;
-- This upgrade modified the `public` schema only, not individual
-- project schemas.
CALL pg_temp.upgrade_schema();
END;
$outer$ LANGUAGE plpgsql;
CALL pg_temp.upgrade();
CALL pg_temp.show_notice('Cleaning up');
DROP PROCEDURE pg_temp.upgrade_schema ();
DROP PROCEDURE pg_temp.upgrade ();
CALL pg_temp.show_notice('Updating db_schema version');
INSERT INTO public.info VALUES ('version', '{"db_schema": "0.4.3"}')
ON CONFLICT (key) DO UPDATE
SET value = public.info.value || '{"db_schema": "0.4.3"}' WHERE public.info.key = 'version';
CALL pg_temp.show_notice('All done. You may now run "COMMIT;" to persist the changes');
DROP PROCEDURE pg_temp.show_notice (notice text);
--
--NOTE Run `COMMIT;` now if all went well
--