Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions sql/pre_install/tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@
-- chunks.
-- The unique constraint is table_name +schema_name. The ordering is
-- important as we want index access when we filter by table_name
--
-- NOTE: the 'hypertable' catalog table is marked as a user catalog table to
-- support logical replication. If it is re-created in an upgrade script, the
-- script should first call the function below: it aborts the upgrade when
-- a logical replication slot still has undecoded changes to this table.
--
-- SELECT _timescaledb_functions.ensure_catalog_replication(
-- '_timescaledb_catalog.hypertable'
-- );
CREATE SEQUENCE _timescaledb_catalog.hypertable_id_seq MINVALUE 1;

CREATE TABLE _timescaledb_catalog.hypertable (
Expand Down Expand Up @@ -138,6 +147,15 @@ SELECT pg_catalog.pg_extension_config_dump(pg_get_serial_sequence('_timescaledb_
-- the chunk's hypercube. Tuples that fall within the chunk's
-- hypercube are stored in the chunk's data table, as given by
-- 'schema_name' and 'table_name'.
--
-- NOTE: the 'chunk' catalog table is marked as a user catalog table to support
-- logical replication. If it is re-created in an upgrade script, the script
-- should first call the function below: it aborts the upgrade when a logical
-- replication slot still has undecoded changes to this table.
--
-- SELECT _timescaledb_functions.ensure_catalog_replication(
-- '_timescaledb_catalog.chunk'
-- );
CREATE SEQUENCE _timescaledb_catalog.chunk_id_seq MINVALUE 1;

CREATE TABLE _timescaledb_catalog.chunk (
Expand Down
10 changes: 10 additions & 0 deletions sql/util_internal_table_ddl.sql
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,13 @@ BEGIN
RETURN ret;
END
$BODY$ SET search_path TO pg_catalog, pg_temp;

-- Raise an error if a logical replication slot still has pending changes
-- against the given relation. Intended to be called before recreating a
-- TimescaleDB user catalog table during an extension upgrade: once the old
-- relation is dropped, a slot that still needs to decode changes to it can no
-- longer be decoded ("could not find pg_class entry"), so the slot must be
-- advanced or dropped first.
CREATE OR REPLACE FUNCTION _timescaledb_functions.ensure_catalog_replication(
rel REGCLASS
) RETURNS VOID AS '@MODULE_PATHNAME@', 'ts_ensure_catalog_replication' LANGUAGE C VOLATILE STRICT;
103 changes: 103 additions & 0 deletions src/process_utility.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
*/

#include <postgres.h>
#include <access/heapam.h>
#include <access/htup_details.h>
#include <access/table.h>
#include <access/transam.h>
#include <access/xact.h>
#include <catalog/heap.h>
#include <catalog/index.h>
Expand Down Expand Up @@ -40,6 +43,8 @@
#include <parser/parse_relation.h>
#include <parser/parse_type.h>
#include <parser/parse_utilcmd.h>
#include <replication/slot.h>
#include <storage/bufmgr.h>
#include <storage/lmgr.h>
#include <storage/lockdefs.h>
#include <tcop/utility.h>
Expand Down Expand Up @@ -110,6 +115,7 @@ static ProcessUtility_hook_type prev_ProcessUtility_hook;
static bool expect_chunk_modification = false;
static ProcessUtilityContext last_process_utility_context = PROCESS_UTILITY_TOPLEVEL;
static void check_no_timescale_options(AlterTableCmd *cmd, Oid reloid);
static void error_if_logical_slot_blocks_extension_drop(void);
static DDLResult process_altertable_set_options(AlterTableCmd *cmd, Hypertable *ht);
static DDLResult process_altertable_reset_options(AlterTableCmd *cmd, Hypertable *ht);
static void ts_bgw_job_update_owner(Relation rel, HeapTuple tuple, TupleDesc tupledesc,
Expand Down Expand Up @@ -615,6 +621,22 @@ process_drop_procedure_start(DropStmt *stmt)
}
}

static void
process_drop_extension_start(DropStmt *stmt)
{
ListCell *lc;

foreach (lc, stmt->objects)
{
char *extname = strVal(lfirst(lc));

if (strcmp(extname, EXTENSION_NAME) == 0)
{
error_if_logical_slot_blocks_extension_drop();
}
}
}

static void
replace_attr_if_changed(AttrNumber attno, const char *newvalue, Name name_buf, Datum *values,
bool *replace)
Expand Down Expand Up @@ -2302,6 +2324,82 @@ process_drop_role(ProcessUtilityArgs *args)
return DDL_CONTINUE;
}

/*
* Block DROP EXTENSION timescaledb when a local logical replication slot
* still has WAL to decode that references _timescaledb_catalog.hypertable or
* _timescaledb_catalog.chunk.
*
* Those tables are marked as user_catalog_tables so logical decoding plugins
* can read them under a historic snapshot. Dropping them while a slot still
* has pending changes against those relfilenodes later fails inside
* RelationInitPhysicalAddr: that code path forces a non-historic pg_class
* lookup to refresh the current relfilenode (it handles concurrent catalog
* rewrites that way), and once the extension is gone there is no current
* pg_class row anymore, producing "could not find pg_class entry for ...".
* The slot is unrecoverable at that point, so refuse the drop and let the
* operator drain or remove the slot first.
*
* A slot's catalog_xmin is the oldest transaction whose (user) catalog changes
* it may still decode. If the newest XID that modified either table is at or
* after a slot's catalog_xmin, that slot still has pending changes against
* them.
*
* Caveat: a pending TRUNCATE leaves no tuple carrying the truncating XID, so
* it is not detected here. Nothing in TimescaleDB truncates these internal
* catalog tables, so we accept that gap.
*/
static void
error_if_logical_slot_blocks_extension_drop(void)
{
Oid catalog_ns;
NameData slot_name;
TransactionId catalog_xmin;
char *catalog[] = {HYPERTABLE_TABLE_NAME, CHUNK_TABLE_NAME};

if (max_replication_slots == 0 || ReplicationSlotCtl == NULL)
return;

catalog_ns = get_namespace_oid(CATALOG_SCHEMA_NAME, true);
if (!OidIsValid(catalog_ns))
return;

for (int i = 0; i < 2; i++)
{
TransactionId max_xid;
Oid catalog_oid = get_relname_relid(catalog[i], catalog_ns);

if (!OidIsValid(catalog_oid))
{
continue;
}

max_xid = ts_relation_max_modifying_xid(catalog_oid);
if (!ts_relation_xid_blocks_logical_slot(max_xid, &slot_name, &catalog_xmin))
{
continue;
}

ereport(ERROR,
(errcode(ERRCODE_OBJECT_IN_USE),
errmsg("cannot drop extension \"%s\" while logical replication slot "
"\"%s\" still has pending changes against TimescaleDB catalog "
"table \"%s\"",
EXTENSION_NAME,
NameStr(slot_name),
catalog[i]),
errdetail("Slot \"%s\" has catalog_xmin %u but TimescaleDB catalog "
"table \"%s\" was modified by transaction %u. Decoding "
"the pending WAL after the extension is dropped would "
"not be possible",
NameStr(slot_name),
catalog_xmin,
catalog[i],
max_xid),
errhint("Advance the slot past those changes or drop it before dropping "
"the extension.")));
}
}

static DDLResult
process_drop_start(ProcessUtilityArgs *args)
{
Expand Down Expand Up @@ -2333,6 +2431,11 @@ process_drop_start(ProcessUtilityArgs *args)
case OBJECT_SCHEMA:
process_drop_schema_start(stmt);
break;
case OBJECT_EXTENSION:
{
process_drop_extension_start(stmt);
break;
}
default:
break;
}
Expand Down
171 changes: 171 additions & 0 deletions src/utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <access/htup.h>
#include <access/htup_details.h>
#include <access/reloptions.h>
#include <access/transam.h>
#include <access/xact.h>
#include <catalog/indexing.h>
#include <catalog/namespace.h>
Expand All @@ -23,11 +24,13 @@
#include <commands/tablecmds.h>
#include <fmgr.h>
#include <funcapi.h>
#include <miscadmin.h>
#include <nodes/makefuncs.h>
#include <nodes/nodeFuncs.h>
#include <parser/parse_coerce.h>
#include <parser/parse_func.h>
#include <parser/scansup.h>
#include <replication/slot.h>
#include <storage/lockdefs.h>
#include <utils/acl.h>
#include <utils/builtins.h>
Expand Down Expand Up @@ -2210,3 +2213,171 @@ ts_get_attnotnull(Oid relid, AttrNumber attno)
ReleaseSysCache(tp);
return result;
}

/*
* Find the newest XID that inserted, updated, or deleted a tuple in a
* relation, scanning both live and dead-but-still-present tuples.
*
* We take AccessExclusiveLock and hold it until the end of the transaction.
* Its job is to fence concurrent writers: no new modification to these tables
* can commit between this check and the actual DROP, closing the TOCTOU
* window. (We take it directly rather than a weaker ShareLock so the
* subsequent drop need not upgrade the lock, which would risk deadlock.)
*
* The lock is NOT needed to protect against VACUUM. Because hypertable/chunk
* are user_catalog_tables, VACUUM's removal horizon is clamped to the oldest
* slot catalog_xmin (see procarray.c), so it can only remove tuples strictly
* older than every slot's catalog_xmin -- exactly the tuples that could never
* make us block. Any modification a slot has not yet decoded is therefore
* still present for this scan regardless of concurrent VACUUM. Per-page buffer
* locking during the scan itself is handled internally by heap_getnext.
*
* We scan with SnapshotAny so deleted tuples are seen too (a pending DELETE
* only shows up in xmax). Frozen xmins map to FrozenTransactionId and are
* ignored, as are lock-only xmax values, which produce no logical change.
*/
TransactionId
ts_relation_max_modifying_xid(Oid relid)
{
Relation rel;
TableScanDesc scan;
HeapTuple tuple;
TransactionId maxxid = InvalidTransactionId;

rel = table_open(relid, AccessExclusiveLock);
scan = heap_beginscan(rel,
SnapshotAny,
0,
NULL,
NULL,
SO_TYPE_SEQSCAN | SO_ALLOW_STRAT | SO_ALLOW_SYNC | SO_ALLOW_PAGEMODE);
while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
{
HeapTupleHeader htup = tuple->t_data;
uint16 infomask = htup->t_infomask;
TransactionId xmin = HeapTupleHeaderGetXmin(htup);

if (TransactionIdIsNormal(xmin) &&
(!TransactionIdIsValid(maxxid) || TransactionIdFollows(xmin, maxxid)))
maxxid = xmin;

/*
* A lock-only xmax (or a lock-only multixact) is not a data change and
* its raw value may even be a MultiXactId, so skip it; a real
* update/delete xid is resolved by HeapTupleHeaderGetUpdateXid.
*/
if (!(infomask & HEAP_XMAX_INVALID) && !(infomask & HEAP_XMAX_LOCK_ONLY))
{
TransactionId xmax = HeapTupleHeaderGetUpdateXid(htup);

if (TransactionIdIsNormal(xmax) &&
(!TransactionIdIsValid(maxxid) || TransactionIdFollows(xmax, maxxid)))
maxxid = xmax;
}
}
heap_endscan(scan);
/* keep AccessExclusiveLock until end of transaction */
table_close(rel, NoLock);

return maxxid;
}

/*
* Check whether any in-use logical replication slot may still need to decode a
* change identified by max_xid -- that is, whether its catalog_xmin is at or
* before max_xid. Returns true for the first such slot and, when the output
* pointers are non-NULL, reports that slot's name and catalog_xmin. Returns
* false if no logical slot is affected (including when no slots are
* configured).
*/
bool
ts_relation_xid_blocks_logical_slot(TransactionId max_xid, NameData *blocking_slot,
TransactionId *blocking_catalog_xmin)
{
if (!TransactionIdIsValid(max_xid))
return false;

if (max_replication_slots == 0 || ReplicationSlotCtl == NULL)
return false;

LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
for (int i = 0; i < max_replication_slots; i++)
{
ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
NameData slot_name;
bool is_logical;
TransactionId catalog_xmin;

if (!slot->in_use)
continue;

SpinLockAcquire(&slot->mutex);
slot_name = slot->data.name;
is_logical = OidIsValid(slot->data.database);
catalog_xmin = slot->data.catalog_xmin;
SpinLockRelease(&slot->mutex);

if (!is_logical)
continue;
/* A slot not retaining catalog rows cannot need these tables. */
if (!TransactionIdIsValid(catalog_xmin))
continue;
/* Slot has already decoded past every modification to the table(s). */
if (TransactionIdPrecedes(max_xid, catalog_xmin))
continue;

if (blocking_slot != NULL)
*blocking_slot = slot_name;
if (blocking_catalog_xmin != NULL)
*blocking_catalog_xmin = catalog_xmin;
LWLockRelease(ReplicationSlotControlLock);
return true;
}
LWLockRelease(ReplicationSlotControlLock);

return false;
}

/*
* SQL-callable guard: raise an error if a logical replication slot still has
* pending changes against the given relation. Intended to be called before
* recreating a TimescaleDB catalog table during an extension upgrade -- once
* the old relation is dropped, a slot that still needs to decode changes to it
* becomes unrecoverable (see error_if_logical_slot_blocks_extension_drop for
* the underlying mechanism).
*/
TS_FUNCTION_INFO_V1(ts_ensure_catalog_replication);
Datum
ts_ensure_catalog_replication(PG_FUNCTION_ARGS)
{
Oid relid = PG_GETARG_OID(0);
TransactionId max_xid;
NameData slot_name;
TransactionId catalog_xmin;

if (max_replication_slots == 0 || ReplicationSlotCtl == NULL)
PG_RETURN_VOID();

max_xid = ts_relation_max_modifying_xid(relid);

if (!ts_relation_xid_blocks_logical_slot(max_xid, &slot_name, &catalog_xmin))
PG_RETURN_VOID();

ereport(ERROR,
(errcode(ERRCODE_OBJECT_IN_USE),
errmsg("cannot modify relation \"%s\" while logical replication slot "
"\"%s\" still has pending changes against it",
get_rel_name(relid),
NameStr(slot_name)),
errdetail("Slot \"%s\" has catalog_xmin %u but the relation was modified by "
"transaction %u. Decoding the pending WAL after the relation is "
"recreated would fail with \"could not find pg_class entry\".",
NameStr(slot_name),
catalog_xmin,
max_xid),
errhint("Advance the slot past those changes (e.g. by consuming with "
"pg_logical_slot_get_changes()), or drop it with "
"pg_drop_replication_slot(), first.")));

PG_RETURN_VOID();
}
4 changes: 4 additions & 0 deletions src/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -402,3 +402,7 @@ extern TSDLLEXPORT char *ts_list_to_string(List *list, append_cell_func append);
extern TSDLLEXPORT List *ts_find_aggrefs(Node *node);
extern TSDLLEXPORT bool ts_is_time_bucket_function(Expr *node);
extern TSDLLEXPORT bool ts_get_attnotnull(Oid relid, AttrNumber attno);

extern TransactionId ts_relation_max_modifying_xid(Oid relid);
extern bool ts_relation_xid_blocks_logical_slot(TransactionId max_xid, NameData *blocking_slot,
TransactionId *blocking_catalog_xmin);
Loading
Loading