Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .unreleased/pr_10048
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #10048 Support concurrent refresh policies on hierarchical continuous aggregates
18 changes: 0 additions & 18 deletions tsl/src/bgw_policy/continuous_aggregate_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -691,24 +691,6 @@ policy_refresh_cagg_check_for_overlaps(ContinuousAgg *cagg, Jsonb *policy_config
}
}

/* We cannot check if the CAgg is hierarchical first and abort early since
* we need to respect the if_not_exists parameter that is passed in.
* So we check for overlap first, and only if there is no exact match, we block multiple
* policies on hierarchical caggs */
if (ContinuousAggIsHierarchical(cagg))
{
/* if this is an existing job, it will also be in the list of jobs */
int max_concurrent = existing_job_id ? 1 : 0;

if (list_length(jobs) > max_concurrent)
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("multiple refresh policies are not supported for hierarchical "
"continuous aggregates")));
}
}

return overlap_result;
}

Expand Down
35 changes: 25 additions & 10 deletions tsl/test/expected/cagg_policy_concurrent.out
Original file line number Diff line number Diff line change
Expand Up @@ -1263,7 +1263,7 @@ DROP MATERIALIZED VIEW mat_m1;
NOTICE: drop cascades to 3 other objects
DROP MATERIALIZED VIEW mat_m2;
NOTICE: drop cascades to 3 other objects
/* Concurrent policies aren't allowed on hierarchical continuous aggs */
/* Multiple refresh policies on hierarchical continuous aggs */
CREATE MATERIALIZED VIEW mat_m1
WITH (timescaledb.continuous, timescaledb.materialized_only=true)
AS
Expand Down Expand Up @@ -1301,10 +1301,21 @@ SELECT alter_job(:JOB_ID, next_start => '2000-01-01'::timestamptz);
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
(1105,"@ 12 hours","@ 0",-1,"@ 12 hours",t,"{""end_offset"": ""@ 30 days"", ""start_offset"": null, ""mat_hypertable_id"": 11}","Sat Jan 01 00:00:00 2000 PST",_timescaledb_functions.policy_refresh_continuous_aggregate_check,f,,,"Refresh Continuous Aggregate Policy [1105]")

-- Multiple non-overlapping policies on the same hierarchical cagg are allowed
SELECT add_continuous_aggregate_policy('mat_m1_rollup', '29 days'::interval, NULL, '12 h'::interval) AS "JOB_ID_HIER2" \gset
SELECT config->>'start_offset' AS start_offset, config->>'end_offset' AS end_offset
FROM _timescaledb_config.bgw_job
WHERE id IN (:JOB_ID, :JOB_ID_HIER2)
ORDER BY id;
start_offset | end_offset
--------------+------------
| @ 30 days
@ 29 days |

\set ON_ERROR_STOP 0
-- Multiple policies on hierarchical cagg should not be allowed
SELECT add_continuous_aggregate_policy('mat_m1_rollup', '29 days'::interval, NULL, '12 h'::interval);
ERROR: multiple refresh policies are not supported for hierarchical continuous aggregates
-- Overlapping policies on a hierarchical cagg are still rejected
SELECT add_continuous_aggregate_policy('mat_m1_rollup', '40 days'::interval, NULL, '12 h'::interval);
ERROR: refresh interval overlaps with an existing continuous aggregate policy on "mat_m1_rollup"
\set ON_ERROR_STOP 1
-- Adding the exact same policy with if_not_exists should succeed (not error)
SELECT add_continuous_aggregate_policy('mat_m1_rollup', NULL, '30 days'::interval, '12 h'::interval, if_not_exists => true);
Expand All @@ -1318,7 +1329,7 @@ SELECT add_continuous_aggregate_policy('mat_m1_rollup2', NULL, '30 days'::interv
SELECT alter_job(:JOB_ID2, next_start => '2000-01-01'::timestamptz);
alter_job
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
(1106,"@ 12 hours","@ 0",-1,"@ 12 hours",t,"{""end_offset"": ""@ 30 days"", ""start_offset"": null, ""mat_hypertable_id"": 12}","Sat Jan 01 00:00:00 2000 PST",_timescaledb_functions.policy_refresh_continuous_aggregate_check,f,,,"Refresh Continuous Aggregate Policy [1106]")
(1107,"@ 12 hours","@ 0",-1,"@ 12 hours",t,"{""end_offset"": ""@ 30 days"", ""start_offset"": null, ""mat_hypertable_id"": 12}","Sat Jan 01 00:00:00 2000 PST",_timescaledb_functions.policy_refresh_continuous_aggregate_check,f,,,"Refresh Continuous Aggregate Policy [1107]")

TRUNCATE mat_m1;
TRUNCATE mat_m1_rollup;
Expand All @@ -1335,11 +1346,14 @@ SELECT remove_continuous_aggregate_policy('mat_m1_rollup');
------------------------------------


SELECT add_continuous_aggregate_policy('mat_m1_rollup', NULL, NULL, '12 h'::interval) AS m1_rollup_job \gset
/* Create two policies on the hierarchical cagg mat_m1_rollup as well */
SELECT add_continuous_aggregate_policy('mat_m1_rollup', NULL, '30 days'::interval, '12 h'::interval, buckets_per_batch => 0) AS m1_rollup_job_1 \gset
SELECT add_continuous_aggregate_policy('mat_m1_rollup', '30 days'::interval, NULL, '12 h'::interval, buckets_per_batch => 0) AS m1_rollup_job_2 \gset
/* Refresh both continuous aggs */
CALL run_job(:agg_m1_job_1);
CALL run_job(:agg_m1_job_2);
CALL run_job(:m1_rollup_job);
CALL run_job(:m1_rollup_job_1);
CALL run_job(:m1_rollup_job_2);
/* Insert new data to generate invalidations */
INSERT INTO overlap_test_timestamptz
SELECT t, (i % 5), random() * 100
Expand Down Expand Up @@ -1373,8 +1387,9 @@ GROUP BY ht.schema_name, ht.table_name;
-----------------------------+-------+------------------------------+------------------------------
_materialized_hypertable_10 | 54 | Mon Jan 01 00:00:00 2024 UTC | Sun Jun 01 00:00:00 2025 UTC

/* Run L2 policy */
CALL run_job(:m1_rollup_job);
/* Run both L2 policies */
CALL run_job(:m1_rollup_job_1);
CALL run_job(:m1_rollup_job_2);
/* L2 must be consistent with L1 after both policies run */
SELECT r.bucket,
(r.counta = l.reagg_counta) AS counta_match,
Expand Down Expand Up @@ -1408,7 +1423,7 @@ ORDER BY 1;
Thu May 01 00:00:00 2025 UTC | t | t
Sun Jun 01 00:00:00 2025 UTC | t | t

/* Restore timezone */
/* Restore timezone and mock time */
SET timezone TO PST8PDT;
DROP MATERIALIZED VIEW mat_m1_rollup;
NOTICE: drop cascades to 18 other objects
Expand Down
83 changes: 83 additions & 0 deletions tsl/test/isolation/expected/cagg_concurrent_policy_register.out
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,86 @@ debug_waitpoint_release
step s1_run_pol7d_3d_refresh: <... completed>
step s12_run_pol3d_1d_refresh: <... completed>
step s13_run_pol1d_refresh: <... completed>

starting permutation: s3_lock_before_register s1_run_l2_hist s12_run_l2_recent s4_enable_before_process_cagg_invalidations s3_release_after_register s5_show_running_jobs s4_release_before_process_cagg_invalidations s5_l2_consistency
step s3_lock_before_register:
-- lock jobs_refresh_ranges table to serialize registration
BEGIN; LOCK TABLE _timescaledb_catalog.continuous_aggs_jobs_refresh_ranges;

step s1_run_l2_hist:
DO $$
DECLARE
jid integer;
BEGIN
SELECT job_id INTO jid FROM cagg_policy_jobs WHERE job_name = 'l2_job_hist';
CALL run_job(jid);
END;
$$;
<waiting ...>
step s12_run_l2_recent:
DO $$
DECLARE
jid integer;
BEGIN
SELECT job_id INTO jid FROM cagg_policy_jobs WHERE job_name = 'l2_job_recent';
CALL run_job(jid);
END;
$$;
<waiting ...>
step s4_enable_before_process_cagg_invalidations:
SELECT debug_waitpoint_enable('before_process_cagg_invalidations_for_refresh_lock');

debug_waitpoint_enable
----------------------


step s3_release_after_register:
-- release lock on jobs_refresh_ranges table
ROLLBACK;

step s5_show_running_jobs:
SELECT ca.user_view_name AS cagg_name, r.start_range, r.end_range,
to_timestamp(r.start_range / 1000000) AT TIME ZONE 'UTC' AS start_ts_utc,
to_timestamp(r.end_range / 1000000) AT TIME ZONE 'UTC' AS end_ts_utc
FROM _timescaledb_catalog.continuous_aggs_jobs_refresh_ranges r
JOIN _timescaledb_catalog.continuous_agg ca ON r.materialization_id = ca.mat_hypertable_id
ORDER BY ca.user_view_name, start_range;

cagg_name | start_range| end_range|start_ts_utc |end_ts_utc
-----------+-------------------+----------------+---------------------------+------------------------
mat_2pol_m2|-210866803200000000|1748131200000000|Mon Nov 24 00:00:00 4714 BC|Sun May 25 00:00:00 2025
mat_2pol_m2| 1748131200000000|1748304000000000|Sun May 25 00:00:00 2025 |Tue May 27 00:00:00 2025

step s4_release_before_process_cagg_invalidations:
SELECT debug_waitpoint_release('before_process_cagg_invalidations_for_refresh_lock');

debug_waitpoint_release
-----------------------


step s1_run_l2_hist: <... completed>
step s12_run_l2_recent: <... completed>
step s5_l2_consistency:
-- L2 must match re-aggregation from L1 for all materialized buckets
SELECT d.bucket AT TIME ZONE 'UTC' AS bucket,
(d.cnt = h.cnt) AS cnt_match,
(d.sumb = h.sumb) AS sumb_match
FROM mat_2pol_m2 d
JOIN (
SELECT time_bucket('1 day', bucket) AS bucket,
sum(count) AS cnt,
sum(sum) AS sumb
FROM mat_3pol_m1 GROUP BY 1
) h ON h.bucket = d.bucket
ORDER BY 1;

bucket |cnt_match|sumb_match
------------------------+---------+----------
Tue May 20 00:00:00 2025|t |t
Wed May 21 00:00:00 2025|t |t
Thu May 22 00:00:00 2025|t |t
Fri May 23 00:00:00 2025|t |t
Sat May 24 00:00:00 2025|t |t
Sun May 25 00:00:00 2025|t |t
Mon May 26 00:00:00 2025|t |t

158 changes: 116 additions & 42 deletions tsl/test/isolation/specs/cagg_concurrent_policy_register.spec
Original file line number Diff line number Diff line change
Expand Up @@ -3,53 +3,91 @@
# LICENSE-TIMESCALE for a copy of the license.

#
# Test concurrent CAgg refresh policies are executed
#
# Setup:
# hypertable (test_3pol_timestamptz)
# -> L1 CAgg (mat_3pol_m1) 3 adjacent refresh policies
# -> L2 CAgg (mat_2pol_m2) 2 adjacent refresh policies (hierarchical)
#
setup
{
-- create 3 adjacent policies ---
SELECT _timescaledb_functions.stop_background_workers();

CREATE TABLE test_3pol_timestamptz (
time timestamptz NOT NULL,
a INTEGER,
b INTEGER
);

SELECT create_hypertable('test_3pol_timestamptz', 'time', chunk_time_interval => '1 day'::interval);

INSERT INTO test_3pol_timestamptz
SELECT t, 1, (random() * 100)::int
FROM
generate_series('2025-05-20T11:05:00+00', '2025-05-27T12:05:00+00', INTERVAL '1 hour') t;

CREATE MATERIALIZED VIEW mat_3pol_m1
WITH (timescaledb.continuous, timescaledb.materialized_only=true)
AS
SELECT
time_bucket('1 hour', time) AS bucket,
count(a),
sum(b)
FROM test_3pol_timestamptz
GROUP BY 1
WITH NO DATA;

/* Three adjacent policies on mat_3pol_m1 */
CREATE TABLE cagg_policy_jobs (job_id integer, job_name text);

INSERT INTO cagg_policy_jobs
SELECT add_continuous_aggregate_policy('mat_3pol_m1', '7 days'::interval, '3 days'::interval, '12 h'::interval, buckets_per_batch => 0),
'job_7d_3d';

INSERT INTO cagg_policy_jobs
SELECT add_continuous_aggregate_policy('mat_3pol_m1', '3 days'::interval, '1 day'::interval, '12 h'::interval, buckets_per_batch => 0),
'job_3d_1d';

INSERT INTO cagg_policy_jobs
SELECT add_continuous_aggregate_policy('mat_3pol_m1', '1 day'::interval, '1 hour'::interval, '12 h'::interval, buckets_per_batch => 0),
'job_1d_1h';
}

# Materialize L1 so the hierarchical L2 CAgg has source data
setup
{
CALL refresh_continuous_aggregate('mat_3pol_m1', NULL, NULL);
}

# Create the hierarchical L2 CAgg with two adjacent refresh policies of its own
setup
{
-- create 3 adjacent policies ---
CREATE TABLE test_3pol_timestamptz (
time timestamptz NOT NULL,
a INTEGER,
b INTEGER
);

SELECT create_hypertable('test_3pol_timestamptz', 'time', chunk_time_interval => '1 day'::interval);

INSERT INTO test_3pol_timestamptz
SELECT t, 1, (random() * 100)::int
FROM
generate_series('2025-05-20T11:05:00+00', '2025-05-27T12:05:00+00', INTERVAL '1 hour') t;

CREATE MATERIALIZED VIEW mat_3pol_m1
WITH (timescaledb.continuous, timescaledb.materialized_only=true)
AS
SELECT
time_bucket('1 hour', time) AS bucket,
count(a),
sum(b)
FROM test_3pol_timestamptz
GROUP BY 1
WITH NO DATA;

/* Three adjacent policies on mat_3pol_m1 */
CREATE TABLE cagg_policy_jobs (job_id integer, job_name text);

INSERT INTO cagg_policy_jobs
SELECT add_continuous_aggregate_policy('mat_3pol_m1', '7 days'::interval, '3 days'::interval, '12 h'::interval, buckets_per_batch => 0),
'job_7d_3d';

INSERT INTO cagg_policy_jobs
SELECT add_continuous_aggregate_policy('mat_3pol_m1', '3 days'::interval, '1 day'::interval, '12 h'::interval, buckets_per_batch => 0),
'job_3d_1d';

INSERT INTO cagg_policy_jobs
SELECT add_continuous_aggregate_policy('mat_3pol_m1', '1 day'::interval, '1 hour'::interval, '12 h'::interval, buckets_per_batch => 0),
'job_1d_1h';
CREATE MATERIALIZED VIEW mat_2pol_m2
WITH (timescaledb.continuous, timescaledb.materialized_only=true)
AS
SELECT
time_bucket('1 day', bucket) AS bucket,
sum(count) AS cnt,
sum(sum) AS sumb
FROM mat_3pol_m1
GROUP BY 1
WITH NO DATA;

INSERT INTO cagg_policy_jobs
SELECT add_continuous_aggregate_policy('mat_2pol_m2', NULL, '3 days'::interval, '12 h'::interval, buckets_per_batch => 0),
'l2_job_hist';

INSERT INTO cagg_policy_jobs
SELECT add_continuous_aggregate_policy('mat_2pol_m2', '3 days'::interval, '1 hour'::interval, '12 h'::interval, buckets_per_batch => 0),
'l2_job_recent';
}

teardown {
DROP TABLE test_3pol_timestamptz CASCADE;
DROP MATERIALIZED VIEW IF EXISTS mat_2pol_m2 CASCADE;
DROP MATERIALIZED VIEW IF EXISTS mat_3pol_m1 CASCADE;
DROP TABLE IF EXISTS test_3pol_timestamptz CASCADE;
DROP TABLE IF EXISTS cagg_policy_jobs;
}

session "S1"
Expand All @@ -72,6 +110,16 @@ step "s1_run_pol7d_3d_refresh" {
END;
$$;
}
step "s1_run_l2_hist" {
DO $$
DECLARE
jid integer;
BEGIN
SELECT job_id INTO jid FROM cagg_policy_jobs WHERE job_name = 'l2_job_hist';
CALL run_job(jid);
END;
$$;
}

session "S12"
setup
Expand All @@ -89,6 +137,16 @@ step "s12_run_pol3d_1d_refresh" {
END;
$$;
}
step "s12_run_l2_recent" {
DO $$
DECLARE
jid integer;
BEGIN
SELECT job_id INTO jid FROM cagg_policy_jobs WHERE job_name = 'l2_job_recent';
CALL run_job(jid);
END;
$$;
}

session "S13"
setup
Expand Down Expand Up @@ -136,10 +194,26 @@ step "s5_show_running_jobs" {
JOIN _timescaledb_catalog.continuous_agg ca ON r.materialization_id = ca.mat_hypertable_id
ORDER BY ca.user_view_name, start_range;
}
step "s5_l2_consistency" {
-- L2 must match re-aggregation from L1 for all materialized buckets
SELECT d.bucket AT TIME ZONE 'UTC' AS bucket,
(d.cnt = h.cnt) AS cnt_match,
(d.sumb = h.sumb) AS sumb_match
FROM mat_2pol_m2 d
JOIN (
SELECT time_bucket('1 day', bucket) AS bucket,
sum(count) AS cnt,
sum(sum) AS sumb
FROM mat_3pol_m1 GROUP BY 1
) h ON h.bucket = d.bucket
ORDER BY 1;
}

## TEST: when 3 concurrent refresh policies execute, they serialize on registration, then execute succesfully
## since these are adjacent policies 2 concurrent refresh processes, the extend last bucket behavior will apply
## observe the ranges recorded for each policy run
permutation "s1_select" "s3_lock_before_register" "s1_run_pol7d_3d_refresh" "s12_run_pol3d_1d_refresh"("s1_run_pol7d_3d_refresh") "s13_run_pol1d_refresh"("s12_run_pol3d_1d_refresh") "s4_enable_before_process_cagg_invalidations" "s3_release_after_register" "s5_show_running_jobs" "s4_release_before_process_cagg_invalidations"


## TEST: two concurrent refresh policies on the hierarchical L2 CAgg serialize on registration,
## then both execute succesfully. L2 stays consistent with L1.
permutation "s3_lock_before_register" "s1_run_l2_hist" "s12_run_l2_recent"("s1_run_l2_hist") "s4_enable_before_process_cagg_invalidations" "s3_release_after_register" "s5_show_running_jobs" "s4_release_before_process_cagg_invalidations" "s5_l2_consistency"
Loading
Loading