diff --git a/.unreleased/pr_10048 b/.unreleased/pr_10048 new file mode 100644 index 00000000000..334faadf4b2 --- /dev/null +++ b/.unreleased/pr_10048 @@ -0,0 +1 @@ +Implements: #10048 Support concurrent refresh policies on hierarchical continuous aggregates diff --git a/tsl/src/bgw_policy/continuous_aggregate_api.c b/tsl/src/bgw_policy/continuous_aggregate_api.c index cc60a2ded5f..3442e3eea84 100644 --- a/tsl/src/bgw_policy/continuous_aggregate_api.c +++ b/tsl/src/bgw_policy/continuous_aggregate_api.c @@ -691,24 +691,6 @@ policy_refresh_cagg_check_for_overlaps(ContinuousAgg *cagg, Jsonb *policy_config } } - /* We cannot check if the CAgg is hierarchical first and abort early since - * we need to respect the if_not_exists parameter that is passed in. - * So we check for overlap first, and only if there is no exact match, we block multiple - * policies on hierarchical caggs */ - if (ContinuousAggIsHierarchical(cagg)) - { - /* if this is an existing job, it will also be in the list of jobs */ - int max_concurrent = existing_job_id ? 1 : 0; - - if (list_length(jobs) > max_concurrent) - { - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("multiple refresh policies are not supported for hierarchical " - "continuous aggregates"))); - } - } - return overlap_result; } diff --git a/tsl/test/expected/cagg_policy_concurrent.out b/tsl/test/expected/cagg_policy_concurrent.out index 717364e574a..f6c62cfc301 100644 --- a/tsl/test/expected/cagg_policy_concurrent.out +++ b/tsl/test/expected/cagg_policy_concurrent.out @@ -1263,7 +1263,7 @@ DROP MATERIALIZED VIEW mat_m1; NOTICE: drop cascades to 3 other objects DROP MATERIALIZED VIEW mat_m2; NOTICE: drop cascades to 3 other objects -/* Concurrent policies aren't allowed on hierarchical continuous aggs */ +/* Multiple refresh policies on hierarchical continuous aggs */ CREATE MATERIALIZED VIEW mat_m1 WITH (timescaledb.continuous, timescaledb.materialized_only=true) AS @@ -1301,10 +1301,21 @@ SELECT alter_job(:JOB_ID, next_start => '2000-01-01'::timestamptz); ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- (1105,"@ 12 hours","@ 0",-1,"@ 12 hours",t,"{""end_offset"": ""@ 30 days"", ""start_offset"": null, ""mat_hypertable_id"": 11}","Sat Jan 01 00:00:00 2000 PST",_timescaledb_functions.policy_refresh_continuous_aggregate_check,f,,,"Refresh Continuous Aggregate Policy [1105]") +-- Multiple non-overlapping policies on the same hierarchical cagg are allowed +SELECT add_continuous_aggregate_policy('mat_m1_rollup', '29 days'::interval, NULL, '12 h'::interval) AS "JOB_ID_HIER2" \gset +SELECT config->>'start_offset' AS start_offset, config->>'end_offset' AS end_offset +FROM _timescaledb_config.bgw_job +WHERE id IN (:JOB_ID, :JOB_ID_HIER2) +ORDER BY id; + start_offset | end_offset +--------------+------------ + | @ 30 days + @ 29 days | + \set ON_ERROR_STOP 0 --- Multiple policies on hierarchical cagg should not be allowed -SELECT add_continuous_aggregate_policy('mat_m1_rollup', '29 days'::interval, NULL, '12 h'::interval); -ERROR: multiple refresh policies are not supported for hierarchical continuous aggregates +-- Overlapping policies on a hierarchical cagg are still rejected +SELECT add_continuous_aggregate_policy('mat_m1_rollup', '40 days'::interval, NULL, '12 h'::interval); +ERROR: refresh interval overlaps with an existing continuous aggregate policy on "mat_m1_rollup" \set ON_ERROR_STOP 1 -- Adding the exact same policy with if_not_exists should succeed (not error) SELECT add_continuous_aggregate_policy('mat_m1_rollup', NULL, '30 days'::interval, '12 h'::interval, if_not_exists => true); @@ -1318,7 +1329,7 @@ SELECT add_continuous_aggregate_policy('mat_m1_rollup2', NULL, '30 days'::interv SELECT alter_job(:JOB_ID2, next_start => '2000-01-01'::timestamptz); alter_job ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- - (1106,"@ 12 hours","@ 0",-1,"@ 12 hours",t,"{""end_offset"": ""@ 30 days"", ""start_offset"": null, ""mat_hypertable_id"": 12}","Sat Jan 01 00:00:00 2000 PST",_timescaledb_functions.policy_refresh_continuous_aggregate_check,f,,,"Refresh Continuous Aggregate Policy [1106]") + (1107,"@ 12 hours","@ 0",-1,"@ 12 hours",t,"{""end_offset"": ""@ 30 days"", ""start_offset"": null, ""mat_hypertable_id"": 12}","Sat Jan 01 00:00:00 2000 PST",_timescaledb_functions.policy_refresh_continuous_aggregate_check,f,,,"Refresh Continuous Aggregate Policy [1107]") TRUNCATE mat_m1; TRUNCATE mat_m1_rollup; @@ -1335,11 +1346,14 @@ SELECT remove_continuous_aggregate_policy('mat_m1_rollup'); ------------------------------------ -SELECT add_continuous_aggregate_policy('mat_m1_rollup', NULL, NULL, '12 h'::interval) AS m1_rollup_job \gset +/* Create two policies on the hierarchical cagg mat_m1_rollup as well */ +SELECT add_continuous_aggregate_policy('mat_m1_rollup', NULL, '30 days'::interval, '12 h'::interval, buckets_per_batch => 0) AS m1_rollup_job_1 \gset +SELECT add_continuous_aggregate_policy('mat_m1_rollup', '30 days'::interval, NULL, '12 h'::interval, buckets_per_batch => 0) AS m1_rollup_job_2 \gset /* Refresh both continuous aggs */ CALL run_job(:agg_m1_job_1); CALL run_job(:agg_m1_job_2); -CALL run_job(:m1_rollup_job); +CALL run_job(:m1_rollup_job_1); +CALL run_job(:m1_rollup_job_2); /* Insert new data to generate invalidations */ INSERT INTO overlap_test_timestamptz SELECT t, (i % 5), random() * 100 @@ -1373,8 +1387,9 @@ GROUP BY ht.schema_name, ht.table_name; -----------------------------+-------+------------------------------+------------------------------ _materialized_hypertable_10 | 54 | Mon Jan 01 00:00:00 2024 UTC | Sun Jun 01 00:00:00 2025 UTC -/* Run L2 policy */ -CALL run_job(:m1_rollup_job); +/* Run both L2 policies */ +CALL run_job(:m1_rollup_job_1); +CALL run_job(:m1_rollup_job_2); /* L2 must be consistent with L1 after both policies run */ SELECT r.bucket, (r.counta = l.reagg_counta) AS counta_match, @@ -1408,7 +1423,7 @@ ORDER BY 1; Thu May 01 00:00:00 2025 UTC | t | t Sun Jun 01 00:00:00 2025 UTC | t | t -/* Restore timezone */ +/* Restore timezone and mock time */ SET timezone TO PST8PDT; DROP MATERIALIZED VIEW mat_m1_rollup; NOTICE: drop cascades to 18 other objects diff --git a/tsl/test/isolation/expected/cagg_concurrent_policy_register.out b/tsl/test/isolation/expected/cagg_concurrent_policy_register.out index f2e160ccae7..93dc1c0fbc4 100644 --- a/tsl/test/isolation/expected/cagg_concurrent_policy_register.out +++ b/tsl/test/isolation/expected/cagg_concurrent_policy_register.out @@ -78,3 +78,86 @@ debug_waitpoint_release step s1_run_pol7d_3d_refresh: <... completed> step s12_run_pol3d_1d_refresh: <... completed> step s13_run_pol1d_refresh: <... completed> + +starting permutation: s3_lock_before_register s1_run_l2_hist s12_run_l2_recent s4_enable_before_process_cagg_invalidations s3_release_after_register s5_show_running_jobs s4_release_before_process_cagg_invalidations s5_l2_consistency +step s3_lock_before_register: + -- lock jobs_refresh_ranges table to serialize registration + BEGIN; LOCK TABLE _timescaledb_catalog.continuous_aggs_jobs_refresh_ranges; + +step s1_run_l2_hist: + DO $$ + DECLARE + jid integer; + BEGIN + SELECT job_id INTO jid FROM cagg_policy_jobs WHERE job_name = 'l2_job_hist'; + CALL run_job(jid); + END; + $$; + +step s12_run_l2_recent: + DO $$ + DECLARE + jid integer; + BEGIN + SELECT job_id INTO jid FROM cagg_policy_jobs WHERE job_name = 'l2_job_recent'; + CALL run_job(jid); + END; + $$; + +step s4_enable_before_process_cagg_invalidations: + SELECT debug_waitpoint_enable('before_process_cagg_invalidations_for_refresh_lock'); + +debug_waitpoint_enable +---------------------- + + +step s3_release_after_register: + -- release lock on jobs_refresh_ranges table + ROLLBACK; + +step s5_show_running_jobs: + SELECT ca.user_view_name AS cagg_name, r.start_range, r.end_range, + to_timestamp(r.start_range / 1000000) AT TIME ZONE 'UTC' AS start_ts_utc, + to_timestamp(r.end_range / 1000000) AT TIME ZONE 'UTC' AS end_ts_utc + FROM _timescaledb_catalog.continuous_aggs_jobs_refresh_ranges r + JOIN _timescaledb_catalog.continuous_agg ca ON r.materialization_id = ca.mat_hypertable_id + ORDER BY ca.user_view_name, start_range; + +cagg_name | start_range| end_range|start_ts_utc |end_ts_utc +-----------+-------------------+----------------+---------------------------+------------------------ +mat_2pol_m2|-210866803200000000|1748131200000000|Mon Nov 24 00:00:00 4714 BC|Sun May 25 00:00:00 2025 +mat_2pol_m2| 1748131200000000|1748304000000000|Sun May 25 00:00:00 2025 |Tue May 27 00:00:00 2025 + +step s4_release_before_process_cagg_invalidations: + SELECT debug_waitpoint_release('before_process_cagg_invalidations_for_refresh_lock'); + +debug_waitpoint_release +----------------------- + + +step s1_run_l2_hist: <... completed> +step s12_run_l2_recent: <... completed> +step s5_l2_consistency: + -- L2 must match re-aggregation from L1 for all materialized buckets + SELECT d.bucket AT TIME ZONE 'UTC' AS bucket, + (d.cnt = h.cnt) AS cnt_match, + (d.sumb = h.sumb) AS sumb_match + FROM mat_2pol_m2 d + JOIN ( + SELECT time_bucket('1 day', bucket) AS bucket, + sum(count) AS cnt, + sum(sum) AS sumb + FROM mat_3pol_m1 GROUP BY 1 + ) h ON h.bucket = d.bucket + ORDER BY 1; + +bucket |cnt_match|sumb_match +------------------------+---------+---------- +Tue May 20 00:00:00 2025|t |t +Wed May 21 00:00:00 2025|t |t +Thu May 22 00:00:00 2025|t |t +Fri May 23 00:00:00 2025|t |t +Sat May 24 00:00:00 2025|t |t +Sun May 25 00:00:00 2025|t |t +Mon May 26 00:00:00 2025|t |t + diff --git a/tsl/test/isolation/specs/cagg_concurrent_policy_register.spec b/tsl/test/isolation/specs/cagg_concurrent_policy_register.spec index 0d7d7fe1892..7b39f54a67d 100644 --- a/tsl/test/isolation/specs/cagg_concurrent_policy_register.spec +++ b/tsl/test/isolation/specs/cagg_concurrent_policy_register.spec @@ -3,53 +3,91 @@ # LICENSE-TIMESCALE for a copy of the license. # -# Test concurrent CAgg refresh policies are executed # +# Setup: +# hypertable (test_3pol_timestamptz) +# -> L1 CAgg (mat_3pol_m1) 3 adjacent refresh policies +# -> L2 CAgg (mat_2pol_m2) 2 adjacent refresh policies (hierarchical) +# +setup +{ + -- create 3 adjacent policies --- + SELECT _timescaledb_functions.stop_background_workers(); + + CREATE TABLE test_3pol_timestamptz ( + time timestamptz NOT NULL, + a INTEGER, + b INTEGER + ); + + SELECT create_hypertable('test_3pol_timestamptz', 'time', chunk_time_interval => '1 day'::interval); + + INSERT INTO test_3pol_timestamptz + SELECT t, 1, (random() * 100)::int + FROM + generate_series('2025-05-20T11:05:00+00', '2025-05-27T12:05:00+00', INTERVAL '1 hour') t; + + CREATE MATERIALIZED VIEW mat_3pol_m1 + WITH (timescaledb.continuous, timescaledb.materialized_only=true) + AS + SELECT + time_bucket('1 hour', time) AS bucket, + count(a), + sum(b) + FROM test_3pol_timestamptz + GROUP BY 1 + WITH NO DATA; + + /* Three adjacent policies on mat_3pol_m1 */ + CREATE TABLE cagg_policy_jobs (job_id integer, job_name text); + + INSERT INTO cagg_policy_jobs + SELECT add_continuous_aggregate_policy('mat_3pol_m1', '7 days'::interval, '3 days'::interval, '12 h'::interval, buckets_per_batch => 0), + 'job_7d_3d'; + + INSERT INTO cagg_policy_jobs + SELECT add_continuous_aggregate_policy('mat_3pol_m1', '3 days'::interval, '1 day'::interval, '12 h'::interval, buckets_per_batch => 0), + 'job_3d_1d'; + + INSERT INTO cagg_policy_jobs + SELECT add_continuous_aggregate_policy('mat_3pol_m1', '1 day'::interval, '1 hour'::interval, '12 h'::interval, buckets_per_batch => 0), + 'job_1d_1h'; +} + +# Materialize L1 so the hierarchical L2 CAgg has source data +setup +{ + CALL refresh_continuous_aggregate('mat_3pol_m1', NULL, NULL); +} + +# Create the hierarchical L2 CAgg with two adjacent refresh policies of its own setup { --- create 3 adjacent policies --- -CREATE TABLE test_3pol_timestamptz ( - time timestamptz NOT NULL, - a INTEGER, - b INTEGER -); - -SELECT create_hypertable('test_3pol_timestamptz', 'time', chunk_time_interval => '1 day'::interval); - -INSERT INTO test_3pol_timestamptz -SELECT t, 1, (random() * 100)::int -FROM -generate_series('2025-05-20T11:05:00+00', '2025-05-27T12:05:00+00', INTERVAL '1 hour') t; - -CREATE MATERIALIZED VIEW mat_3pol_m1 -WITH (timescaledb.continuous, timescaledb.materialized_only=true) -AS -SELECT - time_bucket('1 hour', time) AS bucket, - count(a), - sum(b) -FROM test_3pol_timestamptz -GROUP BY 1 -WITH NO DATA; - -/* Three adjacent policies on mat_3pol_m1 */ -CREATE TABLE cagg_policy_jobs (job_id integer, job_name text); - -INSERT INTO cagg_policy_jobs -SELECT add_continuous_aggregate_policy('mat_3pol_m1', '7 days'::interval, '3 days'::interval, '12 h'::interval, buckets_per_batch => 0), - 'job_7d_3d'; - -INSERT INTO cagg_policy_jobs -SELECT add_continuous_aggregate_policy('mat_3pol_m1', '3 days'::interval, '1 day'::interval, '12 h'::interval, buckets_per_batch => 0), - 'job_3d_1d'; - -INSERT INTO cagg_policy_jobs -SELECT add_continuous_aggregate_policy('mat_3pol_m1', '1 day'::interval, '1 hour'::interval, '12 h'::interval, buckets_per_batch => 0), - 'job_1d_1h'; + CREATE MATERIALIZED VIEW mat_2pol_m2 + WITH (timescaledb.continuous, timescaledb.materialized_only=true) + AS + SELECT + time_bucket('1 day', bucket) AS bucket, + sum(count) AS cnt, + sum(sum) AS sumb + FROM mat_3pol_m1 + GROUP BY 1 + WITH NO DATA; + + INSERT INTO cagg_policy_jobs + SELECT add_continuous_aggregate_policy('mat_2pol_m2', NULL, '3 days'::interval, '12 h'::interval, buckets_per_batch => 0), + 'l2_job_hist'; + + INSERT INTO cagg_policy_jobs + SELECT add_continuous_aggregate_policy('mat_2pol_m2', '3 days'::interval, '1 hour'::interval, '12 h'::interval, buckets_per_batch => 0), + 'l2_job_recent'; } teardown { - DROP TABLE test_3pol_timestamptz CASCADE; + DROP MATERIALIZED VIEW IF EXISTS mat_2pol_m2 CASCADE; + DROP MATERIALIZED VIEW IF EXISTS mat_3pol_m1 CASCADE; + DROP TABLE IF EXISTS test_3pol_timestamptz CASCADE; + DROP TABLE IF EXISTS cagg_policy_jobs; } session "S1" @@ -72,6 +110,16 @@ step "s1_run_pol7d_3d_refresh" { END; $$; } +step "s1_run_l2_hist" { + DO $$ + DECLARE + jid integer; + BEGIN + SELECT job_id INTO jid FROM cagg_policy_jobs WHERE job_name = 'l2_job_hist'; + CALL run_job(jid); + END; + $$; +} session "S12" setup @@ -89,6 +137,16 @@ step "s12_run_pol3d_1d_refresh" { END; $$; } +step "s12_run_l2_recent" { + DO $$ + DECLARE + jid integer; + BEGIN + SELECT job_id INTO jid FROM cagg_policy_jobs WHERE job_name = 'l2_job_recent'; + CALL run_job(jid); + END; + $$; +} session "S13" setup @@ -136,10 +194,26 @@ step "s5_show_running_jobs" { JOIN _timescaledb_catalog.continuous_agg ca ON r.materialization_id = ca.mat_hypertable_id ORDER BY ca.user_view_name, start_range; } +step "s5_l2_consistency" { + -- L2 must match re-aggregation from L1 for all materialized buckets + SELECT d.bucket AT TIME ZONE 'UTC' AS bucket, + (d.cnt = h.cnt) AS cnt_match, + (d.sumb = h.sumb) AS sumb_match + FROM mat_2pol_m2 d + JOIN ( + SELECT time_bucket('1 day', bucket) AS bucket, + sum(count) AS cnt, + sum(sum) AS sumb + FROM mat_3pol_m1 GROUP BY 1 + ) h ON h.bucket = d.bucket + ORDER BY 1; +} ## TEST: when 3 concurrent refresh policies execute, they serialize on registration, then execute succesfully ## since these are adjacent policies 2 concurrent refresh processes, the extend last bucket behavior will apply ## observe the ranges recorded for each policy run permutation "s1_select" "s3_lock_before_register" "s1_run_pol7d_3d_refresh" "s12_run_pol3d_1d_refresh"("s1_run_pol7d_3d_refresh") "s13_run_pol1d_refresh"("s12_run_pol3d_1d_refresh") "s4_enable_before_process_cagg_invalidations" "s3_release_after_register" "s5_show_running_jobs" "s4_release_before_process_cagg_invalidations" - +## TEST: two concurrent refresh policies on the hierarchical L2 CAgg serialize on registration, +## then both execute succesfully. L2 stays consistent with L1. +permutation "s3_lock_before_register" "s1_run_l2_hist" "s12_run_l2_recent"("s1_run_l2_hist") "s4_enable_before_process_cagg_invalidations" "s3_release_after_register" "s5_show_running_jobs" "s4_release_before_process_cagg_invalidations" "s5_l2_consistency" diff --git a/tsl/test/sql/cagg_policy_concurrent.sql b/tsl/test/sql/cagg_policy_concurrent.sql index 1fb4d2ba567..fc0c31fea86 100644 --- a/tsl/test/sql/cagg_policy_concurrent.sql +++ b/tsl/test/sql/cagg_policy_concurrent.sql @@ -616,7 +616,7 @@ SELECT * from mat_m1 EXCEPT SELECT * from mat_m2; DROP MATERIALIZED VIEW mat_m1; DROP MATERIALIZED VIEW mat_m2; -/* Concurrent policies aren't allowed on hierarchical continuous aggs */ +/* Multiple refresh policies on hierarchical continuous aggs */ CREATE MATERIALIZED VIEW mat_m1 WITH (timescaledb.continuous, timescaledb.materialized_only=true) AS @@ -653,9 +653,15 @@ WITH NO DATA; SELECT add_continuous_aggregate_policy('mat_m1_rollup', NULL, '30 days'::interval, '12 h'::interval) AS "JOB_ID" \gset -- alter_job should not be blocked SELECT alter_job(:JOB_ID, next_start => '2000-01-01'::timestamptz); +-- Multiple non-overlapping policies on the same hierarchical cagg are allowed +SELECT add_continuous_aggregate_policy('mat_m1_rollup', '29 days'::interval, NULL, '12 h'::interval) AS "JOB_ID_HIER2" \gset +SELECT config->>'start_offset' AS start_offset, config->>'end_offset' AS end_offset +FROM _timescaledb_config.bgw_job +WHERE id IN (:JOB_ID, :JOB_ID_HIER2) +ORDER BY id; \set ON_ERROR_STOP 0 --- Multiple policies on hierarchical cagg should not be allowed -SELECT add_continuous_aggregate_policy('mat_m1_rollup', '29 days'::interval, NULL, '12 h'::interval); +-- Overlapping policies on a hierarchical cagg are still rejected +SELECT add_continuous_aggregate_policy('mat_m1_rollup', '40 days'::interval, NULL, '12 h'::interval); \set ON_ERROR_STOP 1 -- Adding the exact same policy with if_not_exists should succeed (not error) SELECT add_continuous_aggregate_policy('mat_m1_rollup', NULL, '30 days'::interval, '12 h'::interval, if_not_exists => true); @@ -678,12 +684,15 @@ SELECT add_continuous_aggregate_policy('mat_m1', NULL, '30 days'::interval, '12 SELECT add_continuous_aggregate_policy('mat_m1', '30 days'::interval, NULL, '12 h'::interval, buckets_per_batch => 0) AS agg_m1_job_2 \gset SELECT remove_continuous_aggregate_policy('mat_m1_rollup'); -SELECT add_continuous_aggregate_policy('mat_m1_rollup', NULL, NULL, '12 h'::interval) AS m1_rollup_job \gset +/* Create two policies on the hierarchical cagg mat_m1_rollup as well */ +SELECT add_continuous_aggregate_policy('mat_m1_rollup', NULL, '30 days'::interval, '12 h'::interval, buckets_per_batch => 0) AS m1_rollup_job_1 \gset +SELECT add_continuous_aggregate_policy('mat_m1_rollup', '30 days'::interval, NULL, '12 h'::interval, buckets_per_batch => 0) AS m1_rollup_job_2 \gset /* Refresh both continuous aggs */ CALL run_job(:agg_m1_job_1); CALL run_job(:agg_m1_job_2); -CALL run_job(:m1_rollup_job); +CALL run_job(:m1_rollup_job_1); +CALL run_job(:m1_rollup_job_2); /* Insert new data to generate invalidations */ INSERT INTO overlap_test_timestamptz @@ -714,8 +723,9 @@ FROM _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log hil JOIN _timescaledb_catalog.hypertable ht ON ht.id = hil.hypertable_id GROUP BY ht.schema_name, ht.table_name; -/* Run L2 policy */ -CALL run_job(:m1_rollup_job); +/* Run both L2 policies */ +CALL run_job(:m1_rollup_job_1); +CALL run_job(:m1_rollup_job_2); /* L2 must be consistent with L1 after both policies run */ SELECT r.bucket, @@ -730,7 +740,7 @@ JOIN ( ) l ON l.month = r.bucket ORDER BY 1; -/* Restore timezone */ +/* Restore timezone and mock time */ SET timezone TO PST8PDT; DROP MATERIALIZED VIEW mat_m1_rollup;