mirror of
https://gitlab.com/wgp/dougal/software.git
synced 2025-12-06 10:17:07 +00:00
165 lines
4.6 KiB
MySQL
165 lines
4.6 KiB
MySQL
|
|
-- Support notification payloads larger than Postgres' NOTIFY limit.
|
||
|
|
--
|
||
|
|
-- New schema version: 0.4.3
|
||
|
|
--
|
||
|
|
-- ATTENTION:
|
||
|
|
--
|
||
|
|
-- ENSURE YOU HAVE BACKED UP THE DATABASE BEFORE RUNNING THIS SCRIPT.
|
||
|
|
--
|
||
|
|
--
|
||
|
|
-- NOTE: This upgrade affects the public schema only.
|
||
|
|
-- NOTE: Each application starts a transaction, which must be committed
|
||
|
|
-- or rolled back.
|
||
|
|
--
|
||
|
|
-- This creates a new table where large notification payloads are stored
|
||
|
|
-- temporarily and from which they might be recalled by the notification
|
||
|
|
-- listeners. It also creates a purge_notifications() procedure used to
|
||
|
|
-- clean up old notifications from the notifications log and finally,
|
||
|
|
-- modifies notify() to support these changes. When a large payload is
|
||
|
|
-- encountered, the payload is stored in the notify_payloads table and
|
||
|
|
-- a trimmed down version containing a notification_id is sent to listeners
|
||
|
|
-- instead. Listeners can then query notify_payloads to retrieve the full
|
||
|
|
-- payloads. It is the application layer's responsibility to delete old
|
||
|
|
-- notifications.
|
||
|
|
--
|
||
|
|
-- To apply, run as the dougal user:
|
||
|
|
--
|
||
|
|
-- psql <<EOF
|
||
|
|
-- \i $THIS_FILE
|
||
|
|
-- COMMIT;
|
||
|
|
-- EOF
|
||
|
|
--
|
||
|
|
-- NOTE: It can be applied multiple times without ill effect.
|
||
|
|
--
|
||
|
|
|
||
|
|
BEGIN;
|
||
|
|
|
||
|
|
CREATE OR REPLACE PROCEDURE pg_temp.show_notice (notice text) AS $$
|
||
|
|
BEGIN
|
||
|
|
RAISE NOTICE '%', notice;
|
||
|
|
END;
|
||
|
|
$$ LANGUAGE plpgsql;
|
||
|
|
|
||
|
|
CREATE OR REPLACE PROCEDURE pg_temp.upgrade_schema () AS $outer$
|
||
|
|
BEGIN
|
||
|
|
|
||
|
|
RAISE NOTICE 'Updating public schema';
|
||
|
|
-- We need to set the search path because some of the trigger
|
||
|
|
-- functions reference other tables in survey schemas assuming
|
||
|
|
-- they are in the search path.
|
||
|
|
EXECUTE format('SET search_path TO public');
|
||
|
|
|
||
|
|
CREATE TABLE IF NOT EXISTS public.notify_payloads (
|
||
|
|
id SERIAL,
|
||
|
|
tstamp timestamptz NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||
|
|
payload text NOT NULL DEFAULT '',
|
||
|
|
PRIMARY KEY (id)
|
||
|
|
);
|
||
|
|
|
||
|
|
CREATE INDEX IF NOT EXISTS notify_payload_tstamp ON notify_payloads (tstamp);
|
||
|
|
|
||
|
|
CREATE OR REPLACE FUNCTION public.notify() RETURNS trigger
|
||
|
|
LANGUAGE plpgsql
|
||
|
|
AS $$
|
||
|
|
DECLARE
|
||
|
|
channel text := TG_ARGV[0];
|
||
|
|
pid text;
|
||
|
|
payload text;
|
||
|
|
notification text;
|
||
|
|
payload_id integer;
|
||
|
|
BEGIN
|
||
|
|
|
||
|
|
SELECT projects.pid INTO pid FROM projects WHERE schema = TG_TABLE_SCHEMA;
|
||
|
|
|
||
|
|
payload := json_build_object(
|
||
|
|
'tstamp', CURRENT_TIMESTAMP,
|
||
|
|
'operation', TG_OP,
|
||
|
|
'schema', TG_TABLE_SCHEMA,
|
||
|
|
'table', TG_TABLE_NAME,
|
||
|
|
'old', row_to_json(OLD),
|
||
|
|
'new', row_to_json(NEW),
|
||
|
|
'pid', pid
|
||
|
|
)::text;
|
||
|
|
|
||
|
|
IF octet_length(payload) < 1000 THEN
|
||
|
|
PERFORM pg_notify(channel, payload);
|
||
|
|
ELSE
|
||
|
|
-- We need to find another solution
|
||
|
|
-- FIXME Consider storing the payload in a temporary memory table,
|
||
|
|
-- referenced by some form of autogenerated ID. Then send the ID
|
||
|
|
-- as the payload and then it's up to the user to fetch the original
|
||
|
|
-- payload if interested. This needs a mechanism to expire older payloads
|
||
|
|
-- in the interest of conserving memory.
|
||
|
|
|
||
|
|
INSERT INTO notify_payloads (payload) VALUES (payload) RETURNING id INTO payload_id;
|
||
|
|
|
||
|
|
notification := json_build_object(
|
||
|
|
'tstamp', CURRENT_TIMESTAMP,
|
||
|
|
'operation', TG_OP,
|
||
|
|
'schema', TG_TABLE_SCHEMA,
|
||
|
|
'table', TG_TABLE_NAME,
|
||
|
|
'pid', pid,
|
||
|
|
'payload_id', payload_id
|
||
|
|
)::text;
|
||
|
|
|
||
|
|
PERFORM pg_notify(channel, notification);
|
||
|
|
RAISE INFO 'Payload over limit';
|
||
|
|
END IF;
|
||
|
|
RETURN NULL;
|
||
|
|
END;
|
||
|
|
$$;
|
||
|
|
|
||
|
|
CREATE PROCEDURE public.purge_notifications (age_seconds numeric DEFAULT 120) AS $$
|
||
|
|
DELETE FROM notify_payloads WHERE EXTRACT(epoch FROM CURRENT_TIMESTAMP - tstamp) > age_seconds;
|
||
|
|
$$ LANGUAGE sql;
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
END;
|
||
|
|
$outer$ LANGUAGE plpgsql;
|
||
|
|
|
||
|
|
CREATE OR REPLACE PROCEDURE pg_temp.upgrade () AS $outer$
|
||
|
|
DECLARE
|
||
|
|
row RECORD;
|
||
|
|
current_db_version TEXT;
|
||
|
|
BEGIN
|
||
|
|
|
||
|
|
SELECT value->>'db_schema' INTO current_db_version FROM public.info WHERE key = 'version';
|
||
|
|
|
||
|
|
IF current_db_version >= '0.4.3' THEN
|
||
|
|
RAISE EXCEPTION
|
||
|
|
USING MESSAGE='Patch already applied';
|
||
|
|
END IF;
|
||
|
|
|
||
|
|
IF current_db_version != '0.4.2' THEN
|
||
|
|
RAISE EXCEPTION
|
||
|
|
USING MESSAGE='Invalid database version: ' || current_db_version,
|
||
|
|
HINT='Ensure all previous patches have been applied.';
|
||
|
|
END IF;
|
||
|
|
|
||
|
|
-- This upgrade modified the `public` schema only, not individual
|
||
|
|
-- project schemas.
|
||
|
|
CALL pg_temp.upgrade_schema();
|
||
|
|
|
||
|
|
END;
|
||
|
|
$outer$ LANGUAGE plpgsql;
|
||
|
|
|
||
|
|
CALL pg_temp.upgrade();
|
||
|
|
|
||
|
|
CALL pg_temp.show_notice('Cleaning up');
|
||
|
|
DROP PROCEDURE pg_temp.upgrade_schema ();
|
||
|
|
DROP PROCEDURE pg_temp.upgrade ();
|
||
|
|
|
||
|
|
CALL pg_temp.show_notice('Updating db_schema version');
|
||
|
|
INSERT INTO public.info VALUES ('version', '{"db_schema": "0.4.3"}')
|
||
|
|
ON CONFLICT (key) DO UPDATE
|
||
|
|
SET value = public.info.value || '{"db_schema": "0.4.3"}' WHERE public.info.key = 'version';
|
||
|
|
|
||
|
|
|
||
|
|
CALL pg_temp.show_notice('All done. You may now run "COMMIT;" to persist the changes');
|
||
|
|
DROP PROCEDURE pg_temp.show_notice (notice text);
|
||
|
|
|
||
|
|
--
|
||
|
|
--NOTE Run `COMMIT;` now if all went well
|
||
|
|
--
|