diff --git a/sql/pre_install/tables.sql b/sql/pre_install/tables.sql index 6abb5cceee5..db99da68815 100644 --- a/sql/pre_install/tables.sql +++ b/sql/pre_install/tables.sql @@ -37,6 +37,15 @@ -- chunks. -- The unique constraint is table_name +schema_name. The ordering is -- important as we want index access when we filter by table_name +-- +-- NOTE: the 'hypertable' catalog table is marked as a user catalog table to +-- support logical replication. If it is re-created in an upgrade script, the +-- script should first call the function below: it aborts the upgrade when +-- a logical replication slot still has undecoded changes to this table. +-- +-- SELECT _timescaledb_functions.ensure_catalog_replication( +-- '_timescaledb_catalog.hypertable' +-- ); CREATE SEQUENCE _timescaledb_catalog.hypertable_id_seq MINVALUE 1; CREATE TABLE _timescaledb_catalog.hypertable ( @@ -138,6 +147,15 @@ SELECT pg_catalog.pg_extension_config_dump(pg_get_serial_sequence('_timescaledb_ -- the chunk's hypercube. Tuples that fall within the chunk's -- hypercube are stored in the chunk's data table, as given by -- 'schema_name' and 'table_name'. +-- +-- NOTE: the 'chunk' catalog table is marked as a user catalog table to support +-- logical replication. If it is re-created in an upgrade script, the script +-- should first call the function below: it aborts the upgrade when a logical +-- replication slot still has undecoded changes to this table. +-- +-- SELECT _timescaledb_functions.ensure_catalog_replication( +-- '_timescaledb_catalog.chunk' +-- ); CREATE SEQUENCE _timescaledb_catalog.chunk_id_seq MINVALUE 1; CREATE TABLE _timescaledb_catalog.chunk ( diff --git a/sql/util_internal_table_ddl.sql b/sql/util_internal_table_ddl.sql index c4f8ee73433..21c2803b5a2 100644 --- a/sql/util_internal_table_ddl.sql +++ b/sql/util_internal_table_ddl.sql @@ -70,3 +70,13 @@ BEGIN RETURN ret; END $BODY$ SET search_path TO pg_catalog, pg_temp; + +-- Raise an error if a logical replication slot still has pending changes +-- against the given relation. Intended to be called before recreating a +-- TimescaleDB user catalog table during an extension upgrade: once the old +-- relation is dropped, a slot that still needs to decode changes to it can no +-- longer be decoded ("could not find pg_class entry"), so the slot must be +-- advanced or dropped first. +CREATE OR REPLACE FUNCTION _timescaledb_functions.ensure_catalog_replication( + rel REGCLASS +) RETURNS VOID AS '@MODULE_PATHNAME@', 'ts_ensure_catalog_replication' LANGUAGE C VOLATILE STRICT; diff --git a/src/process_utility.c b/src/process_utility.c index fe721b06ded..d45ef5d40e7 100644 --- a/src/process_utility.c +++ b/src/process_utility.c @@ -5,7 +5,10 @@ */ #include +#include #include +#include +#include #include #include #include @@ -40,6 +43,8 @@ #include #include #include +#include +#include #include #include #include @@ -110,6 +115,7 @@ static ProcessUtility_hook_type prev_ProcessUtility_hook; static bool expect_chunk_modification = false; static ProcessUtilityContext last_process_utility_context = PROCESS_UTILITY_TOPLEVEL; static void check_no_timescale_options(AlterTableCmd *cmd, Oid reloid); +static void error_if_logical_slot_blocks_extension_drop(void); static DDLResult process_altertable_set_options(AlterTableCmd *cmd, Hypertable *ht); static DDLResult process_altertable_reset_options(AlterTableCmd *cmd, Hypertable *ht); static void ts_bgw_job_update_owner(Relation rel, HeapTuple tuple, TupleDesc tupledesc, @@ -615,6 +621,22 @@ process_drop_procedure_start(DropStmt *stmt) } } +static void +process_drop_extension_start(DropStmt *stmt) +{ + ListCell *lc; + + foreach (lc, stmt->objects) + { + char *extname = strVal(lfirst(lc)); + + if (strcmp(extname, EXTENSION_NAME) == 0) + { + error_if_logical_slot_blocks_extension_drop(); + } + } +} + static void replace_attr_if_changed(AttrNumber attno, const char *newvalue, Name name_buf, Datum *values, bool *replace) @@ -2302,6 +2324,82 @@ process_drop_role(ProcessUtilityArgs *args) return DDL_CONTINUE; } +/* + * Block DROP EXTENSION timescaledb when a local logical replication slot + * still has WAL to decode that references _timescaledb_catalog.hypertable or + * _timescaledb_catalog.chunk. + * + * Those tables are marked as user_catalog_tables so logical decoding plugins + * can read them under a historic snapshot. Dropping them while a slot still + * has pending changes against those relfilenodes later fails inside + * RelationInitPhysicalAddr: that code path forces a non-historic pg_class + * lookup to refresh the current relfilenode (it handles concurrent catalog + * rewrites that way), and once the extension is gone there is no current + * pg_class row anymore, producing "could not find pg_class entry for ...". + * The slot is unrecoverable at that point, so refuse the drop and let the + * operator drain or remove the slot first. + * + * A slot's catalog_xmin is the oldest transaction whose (user) catalog changes + * it may still decode. If the newest XID that modified either table is at or + * after a slot's catalog_xmin, that slot still has pending changes against + * them. + * + * Caveat: a pending TRUNCATE leaves no tuple carrying the truncating XID, so + * it is not detected here. Nothing in TimescaleDB truncates these internal + * catalog tables, so we accept that gap. + */ +static void +error_if_logical_slot_blocks_extension_drop(void) +{ + Oid catalog_ns; + NameData slot_name; + TransactionId catalog_xmin; + char *catalog[] = {HYPERTABLE_TABLE_NAME, CHUNK_TABLE_NAME}; + + if (max_replication_slots == 0 || ReplicationSlotCtl == NULL) + return; + + catalog_ns = get_namespace_oid(CATALOG_SCHEMA_NAME, true); + if (!OidIsValid(catalog_ns)) + return; + + for (int i = 0; i < 2; i++) + { + TransactionId max_xid; + Oid catalog_oid = get_relname_relid(catalog[i], catalog_ns); + + if (!OidIsValid(catalog_oid)) + { + continue; + } + + max_xid = ts_relation_max_modifying_xid(catalog_oid); + if (!ts_relation_xid_blocks_logical_slot(max_xid, &slot_name, &catalog_xmin)) + { + continue; + } + + ereport(ERROR, + (errcode(ERRCODE_OBJECT_IN_USE), + errmsg("cannot drop extension \"%s\" while logical replication slot " + "\"%s\" still has pending changes against TimescaleDB catalog " + "table \"%s\"", + EXTENSION_NAME, + NameStr(slot_name), + catalog[i]), + errdetail("Slot \"%s\" has catalog_xmin %u but TimescaleDB catalog " + "table \"%s\" was modified by transaction %u. Decoding " + "the pending WAL after the extension is dropped would " + "not be possible", + NameStr(slot_name), + catalog_xmin, + catalog[i], + max_xid), + errhint("Advance the slot past those changes or drop it before dropping " + "the extension."))); + } +} + static DDLResult process_drop_start(ProcessUtilityArgs *args) { @@ -2333,6 +2431,11 @@ process_drop_start(ProcessUtilityArgs *args) case OBJECT_SCHEMA: process_drop_schema_start(stmt); break; + case OBJECT_EXTENSION: + { + process_drop_extension_start(stmt); + break; + } default: break; } diff --git a/src/utils.c b/src/utils.c index f21c7fbd7d0..8a26cb3fad1 100644 --- a/src/utils.c +++ b/src/utils.c @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -23,11 +24,13 @@ #include #include #include +#include #include #include #include #include #include +#include #include #include #include @@ -2210,3 +2213,171 @@ ts_get_attnotnull(Oid relid, AttrNumber attno) ReleaseSysCache(tp); return result; } + +/* + * Find the newest XID that inserted, updated, or deleted a tuple in a + * relation, scanning both live and dead-but-still-present tuples. + * + * We take AccessExclusiveLock and hold it until the end of the transaction. + * Its job is to fence concurrent writers: no new modification to these tables + * can commit between this check and the actual DROP, closing the TOCTOU + * window. (We take it directly rather than a weaker ShareLock so the + * subsequent drop need not upgrade the lock, which would risk deadlock.) + * + * The lock is NOT needed to protect against VACUUM. Because hypertable/chunk + * are user_catalog_tables, VACUUM's removal horizon is clamped to the oldest + * slot catalog_xmin (see procarray.c), so it can only remove tuples strictly + * older than every slot's catalog_xmin -- exactly the tuples that could never + * make us block. Any modification a slot has not yet decoded is therefore + * still present for this scan regardless of concurrent VACUUM. Per-page buffer + * locking during the scan itself is handled internally by heap_getnext. + * + * We scan with SnapshotAny so deleted tuples are seen too (a pending DELETE + * only shows up in xmax). Frozen xmins map to FrozenTransactionId and are + * ignored, as are lock-only xmax values, which produce no logical change. + */ +TransactionId +ts_relation_max_modifying_xid(Oid relid) +{ + Relation rel; + TableScanDesc scan; + HeapTuple tuple; + TransactionId maxxid = InvalidTransactionId; + + rel = table_open(relid, AccessExclusiveLock); + scan = heap_beginscan(rel, + SnapshotAny, + 0, + NULL, + NULL, + SO_TYPE_SEQSCAN | SO_ALLOW_STRAT | SO_ALLOW_SYNC | SO_ALLOW_PAGEMODE); + while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + { + HeapTupleHeader htup = tuple->t_data; + uint16 infomask = htup->t_infomask; + TransactionId xmin = HeapTupleHeaderGetXmin(htup); + + if (TransactionIdIsNormal(xmin) && + (!TransactionIdIsValid(maxxid) || TransactionIdFollows(xmin, maxxid))) + maxxid = xmin; + + /* + * A lock-only xmax (or a lock-only multixact) is not a data change and + * its raw value may even be a MultiXactId, so skip it; a real + * update/delete xid is resolved by HeapTupleHeaderGetUpdateXid. + */ + if (!(infomask & HEAP_XMAX_INVALID) && !(infomask & HEAP_XMAX_LOCK_ONLY)) + { + TransactionId xmax = HeapTupleHeaderGetUpdateXid(htup); + + if (TransactionIdIsNormal(xmax) && + (!TransactionIdIsValid(maxxid) || TransactionIdFollows(xmax, maxxid))) + maxxid = xmax; + } + } + heap_endscan(scan); + /* keep AccessExclusiveLock until end of transaction */ + table_close(rel, NoLock); + + return maxxid; +} + +/* + * Check whether any in-use logical replication slot may still need to decode a + * change identified by max_xid -- that is, whether its catalog_xmin is at or + * before max_xid. Returns true for the first such slot and, when the output + * pointers are non-NULL, reports that slot's name and catalog_xmin. Returns + * false if no logical slot is affected (including when no slots are + * configured). + */ +bool +ts_relation_xid_blocks_logical_slot(TransactionId max_xid, NameData *blocking_slot, + TransactionId *blocking_catalog_xmin) +{ + if (!TransactionIdIsValid(max_xid)) + return false; + + if (max_replication_slots == 0 || ReplicationSlotCtl == NULL) + return false; + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (int i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i]; + NameData slot_name; + bool is_logical; + TransactionId catalog_xmin; + + if (!slot->in_use) + continue; + + SpinLockAcquire(&slot->mutex); + slot_name = slot->data.name; + is_logical = OidIsValid(slot->data.database); + catalog_xmin = slot->data.catalog_xmin; + SpinLockRelease(&slot->mutex); + + if (!is_logical) + continue; + /* A slot not retaining catalog rows cannot need these tables. */ + if (!TransactionIdIsValid(catalog_xmin)) + continue; + /* Slot has already decoded past every modification to the table(s). */ + if (TransactionIdPrecedes(max_xid, catalog_xmin)) + continue; + + if (blocking_slot != NULL) + *blocking_slot = slot_name; + if (blocking_catalog_xmin != NULL) + *blocking_catalog_xmin = catalog_xmin; + LWLockRelease(ReplicationSlotControlLock); + return true; + } + LWLockRelease(ReplicationSlotControlLock); + + return false; +} + +/* + * SQL-callable guard: raise an error if a logical replication slot still has + * pending changes against the given relation. Intended to be called before + * recreating a TimescaleDB catalog table during an extension upgrade -- once + * the old relation is dropped, a slot that still needs to decode changes to it + * becomes unrecoverable (see error_if_logical_slot_blocks_extension_drop for + * the underlying mechanism). + */ +TS_FUNCTION_INFO_V1(ts_ensure_catalog_replication); +Datum +ts_ensure_catalog_replication(PG_FUNCTION_ARGS) +{ + Oid relid = PG_GETARG_OID(0); + TransactionId max_xid; + NameData slot_name; + TransactionId catalog_xmin; + + if (max_replication_slots == 0 || ReplicationSlotCtl == NULL) + PG_RETURN_VOID(); + + max_xid = ts_relation_max_modifying_xid(relid); + + if (!ts_relation_xid_blocks_logical_slot(max_xid, &slot_name, &catalog_xmin)) + PG_RETURN_VOID(); + + ereport(ERROR, + (errcode(ERRCODE_OBJECT_IN_USE), + errmsg("cannot modify relation \"%s\" while logical replication slot " + "\"%s\" still has pending changes against it", + get_rel_name(relid), + NameStr(slot_name)), + errdetail("Slot \"%s\" has catalog_xmin %u but the relation was modified by " + "transaction %u. Decoding the pending WAL after the relation is " + "recreated would fail with \"could not find pg_class entry\".", + NameStr(slot_name), + catalog_xmin, + max_xid), + errhint("Advance the slot past those changes (e.g. by consuming with " + "pg_logical_slot_get_changes()), or drop it with " + "pg_drop_replication_slot(), first."))); + + PG_RETURN_VOID(); +} diff --git a/src/utils.h b/src/utils.h index 642f44b1cdb..72e643d9a53 100644 --- a/src/utils.h +++ b/src/utils.h @@ -402,3 +402,7 @@ extern TSDLLEXPORT char *ts_list_to_string(List *list, append_cell_func append); extern TSDLLEXPORT List *ts_find_aggrefs(Node *node); extern TSDLLEXPORT bool ts_is_time_bucket_function(Expr *node); extern TSDLLEXPORT bool ts_get_attnotnull(Oid relid, AttrNumber attno); + +extern TransactionId ts_relation_max_modifying_xid(Oid relid); +extern bool ts_relation_xid_blocks_logical_slot(TransactionId max_xid, NameData *blocking_slot, + TransactionId *blocking_catalog_xmin); diff --git a/test/expected/drop_extension.out b/test/expected/drop_extension.out index bcb649f9500..615a0bb5b7f 100644 --- a/test/expected/drop_extension.out +++ b/test/expected/drop_extension.out @@ -120,3 +120,96 @@ SELECT extname FROM pg_extension WHERE extname = 'timescaledb'; ------------- timescaledb +-- A logical replication slot with pending (undecoded) changes to the +-- TimescaleDB catalog tables (hypertable, chunk) must block both DROP EXTENSION +-- (process utility hook) and the ensure_catalog_replication() check, +-- since dropping those user_catalog_tables would make the slot undecodable. +-- The error detail prints non-deterministic transaction ids, so only show the +-- primary error message. +\set VERBOSITY terse +-- A hypertable and chunk created before any logical replication slot +CREATE TABLE metrics(time timestamptz NOT NULL, value float8); +SELECT create_hypertable('metrics', 'time'); + create_hypertable +---------------------- + (1,public,metrics,t) + +INSERT INTO metrics VALUES ('2024-01-01', 1.0); +SELECT slot_name FROM pg_create_logical_replication_slot('test_slot', 'test_decoding'); + slot_name +----------- + test_slot + +-- No pending catalog changes yet in the replication slot, the check should pass +SELECT _timescaledb_functions.ensure_catalog_replication('_timescaledb_catalog.hypertable'); + ensure_catalog_replication +---------------------------- + + +SELECT _timescaledb_functions.ensure_catalog_replication('_timescaledb_catalog.chunk'); + ensure_catalog_replication +---------------------------- + + +-- Modify the hypertable catalog table after the slot was created. +CREATE TABLE metrics2(time timestamptz NOT NULL, value float8); +SELECT create_hypertable('metrics2', 'time'); + create_hypertable +----------------------- + (2,public,metrics2,t) + +-- The slot now has pending changes against the hypertable table: both the SQL +-- function and DROP EXTENSION must error. The chunk table is untouched since +-- slot creation and still validates. +\set ON_ERROR_STOP 0 +SELECT _timescaledb_functions.ensure_catalog_replication('_timescaledb_catalog.hypertable'); +ERROR: cannot modify relation "hypertable" while logical replication slot "test_slot" still has pending changes against it +DROP EXTENSION timescaledb CASCADE; +ERROR: cannot drop extension "timescaledb" while logical replication slot "test_slot" still has pending changes against TimescaleDB catalog table "hypertable" +\set ON_ERROR_STOP 1 +SELECT _timescaledb_functions.ensure_catalog_replication('_timescaledb_catalog.chunk'); + ensure_catalog_replication +---------------------------- + + +-- Insert a new chunk -- now the check should fail on the `chunk` catalog +\set ON_ERROR_STOP 0 +INSERT INTO metrics2 VALUES ('2024-01-01', 1.0); +SELECT _timescaledb_functions.ensure_catalog_replication('_timescaledb_catalog.chunk'); +ERROR: cannot modify relation "chunk" while logical replication slot "test_slot" still has pending changes against it +\set ON_ERROR_STOP 1 +-- A trick to immediately advance the slot's `catalog_xmin`. `catalog_xmin` +-- advances when the slot sees an `xl_running_xacts` record which is normally +-- generated automatically, e.g. by the `bgwriter`; here we force it using +-- `pg_log_standby_snapshot()` +BEGIN; +SELECT txid_current() \gset +SELECT pg_log_standby_snapshot() \gset +COMMIT; +SELECT count(*) FROM pg_logical_slot_get_changes('test_slot', NULL, NULL) \gset +-- The block is now lifted: validation passes for both tables and the extension +-- can be dropped even though the slot still exists. +SELECT _timescaledb_functions.ensure_catalog_replication('_timescaledb_catalog.hypertable'); + ensure_catalog_replication +---------------------------- + + +SELECT _timescaledb_functions.ensure_catalog_replication('_timescaledb_catalog.chunk'); + ensure_catalog_replication +---------------------------- + + +SET client_min_messages=error; +DROP EXTENSION timescaledb CASCADE; +RESET client_min_messages; +SELECT extname FROM pg_extension WHERE extname = 'timescaledb'; + extname +--------- + +-- Clean up the slot so the test database can be dropped. +SELECT pg_drop_replication_slot('test_slot'); + pg_drop_replication_slot +-------------------------- + + +\set VERBOSITY default diff --git a/test/sql/drop_extension.sql b/test/sql/drop_extension.sql index c0780d80b57..ee94e1d97ac 100644 --- a/test/sql/drop_extension.sql +++ b/test/sql/drop_extension.sql @@ -75,3 +75,64 @@ SET client_min_messages=error; CREATE EXTENSION timescaledb; RESET client_min_messages; SELECT extname FROM pg_extension WHERE extname = 'timescaledb'; + +-- A logical replication slot with pending (undecoded) changes to the +-- TimescaleDB catalog tables (hypertable, chunk) must block both DROP EXTENSION +-- (process utility hook) and the ensure_catalog_replication() check, +-- since dropping those user_catalog_tables would make the slot undecodable. +-- The error detail prints non-deterministic transaction ids, so only show the +-- primary error message. +\set VERBOSITY terse + +-- A hypertable and chunk created before any logical replication slot +CREATE TABLE metrics(time timestamptz NOT NULL, value float8); +SELECT create_hypertable('metrics', 'time'); +INSERT INTO metrics VALUES ('2024-01-01', 1.0); + +SELECT slot_name FROM pg_create_logical_replication_slot('test_slot', 'test_decoding'); + +-- No pending catalog changes yet in the replication slot, the check should pass +SELECT _timescaledb_functions.ensure_catalog_replication('_timescaledb_catalog.hypertable'); +SELECT _timescaledb_functions.ensure_catalog_replication('_timescaledb_catalog.chunk'); + +-- Modify the hypertable catalog table after the slot was created. +CREATE TABLE metrics2(time timestamptz NOT NULL, value float8); +SELECT create_hypertable('metrics2', 'time'); + +-- The slot now has pending changes against the hypertable table: both the SQL +-- function and DROP EXTENSION must error. The chunk table is untouched since +-- slot creation and still validates. +\set ON_ERROR_STOP 0 +SELECT _timescaledb_functions.ensure_catalog_replication('_timescaledb_catalog.hypertable'); +DROP EXTENSION timescaledb CASCADE; +\set ON_ERROR_STOP 1 +SELECT _timescaledb_functions.ensure_catalog_replication('_timescaledb_catalog.chunk'); + +-- Insert a new chunk -- now the check should fail on the `chunk` catalog +\set ON_ERROR_STOP 0 +INSERT INTO metrics2 VALUES ('2024-01-01', 1.0); +SELECT _timescaledb_functions.ensure_catalog_replication('_timescaledb_catalog.chunk'); +\set ON_ERROR_STOP 1 + +-- A trick to immediately advance the slot's `catalog_xmin`. `catalog_xmin` +-- advances when the slot sees an `xl_running_xacts` record which is normally +-- generated automatically, e.g. by the `bgwriter`; here we force it using +-- `pg_log_standby_snapshot()` +BEGIN; +SELECT txid_current() \gset +SELECT pg_log_standby_snapshot() \gset +COMMIT; +SELECT count(*) FROM pg_logical_slot_get_changes('test_slot', NULL, NULL) \gset + +-- The block is now lifted: validation passes for both tables and the extension +-- can be dropped even though the slot still exists. +SELECT _timescaledb_functions.ensure_catalog_replication('_timescaledb_catalog.hypertable'); +SELECT _timescaledb_functions.ensure_catalog_replication('_timescaledb_catalog.chunk'); +SET client_min_messages=error; +DROP EXTENSION timescaledb CASCADE; +RESET client_min_messages; +SELECT extname FROM pg_extension WHERE extname = 'timescaledb'; + +-- Clean up the slot so the test database can be dropped. +SELECT pg_drop_replication_slot('test_slot'); +\set VERBOSITY default