Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .unreleased/pr_9811
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixes: #9811 Cap refresh start when hypertable has tiered data
2 changes: 1 addition & 1 deletion src/chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ extern TM_Result ts_chunk_lock_for_creating_compressed_chunk(Chunk *chunk);
extern ScanIterator ts_chunk_scan_iterator_create(MemoryContext result_mcxt);
extern void ts_chunk_scan_iterator_set_chunk_id(ScanIterator *it, int32 chunk_id);
extern bool ts_chunk_lock_if_exists(Oid chunk_oid, LOCKMODE chunk_lockmode);
int ts_chunk_get_osm_chunk_id(int hypertable_id);
extern TSDLLEXPORT int ts_chunk_get_osm_chunk_id(int hypertable_id);
extern TSDLLEXPORT void ts_chunk_merge_on_dimension(const Hypertable *ht, Chunk *chunk,
const Chunk *merge_chunk, int32 dimension_id);
extern TSDLLEXPORT void ts_chunk_detach_by_relid(Oid relid);
Expand Down
47 changes: 47 additions & 0 deletions src/dimension_slice.c
Original file line number Diff line number Diff line change
Expand Up @@ -1181,6 +1181,53 @@ ts_dimension_slice_nth_earliest_slice(int32 dimension_id, int n)
return ret;
}

static ScanTupleResult
dimension_slice_earliest_non_osm_found(TupleInfo *ti, void *data)
{
DimensionSlice **slice = (DimensionSlice **) data;
MemoryContext old = MemoryContextSwitchTo(ti->mctx);
*slice = dimension_slice_from_slot(ti->slot);
MemoryContextSwitchTo(old);
Chunk *chunk = ts_chunk_get_by_id((*slice)->fd.chunk_id, true);

if (IS_OSM_CHUNK(chunk))
{
return SCAN_CONTINUE;
}
Comment on lines +1189 to +1196

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*slice here assigned whether it's an osm chunk or not. Imagine a case where there is no non-osm chunk and all data is tiered, we would end up this *slice already assigned. The caller assumes that it's non-osm as long as it's not NULL which does not hold in this case.

We should either check again in the caller, or nullify *slice if it's osm in the later IS_OSM_CHUNK check here.


return SCAN_DONE;
}

/*
* Return the earliest dimension slice not belonging to an OSM chunk.
*/
DimensionSlice *
ts_dimension_slice_earliest_non_osm_slice(int32 dimension_id)
{
ScanKeyData scankey[1];
DimensionSlice *ret = NULL;

ScanKeyInit(&scankey[0],
Anum_dimension_slice_dimension_id_range_start_range_end_idx_dimension_id,
BTEqualStrategyNumber,
F_INT4EQ,
Int32GetDatum(dimension_id));

dimension_slice_scan_limit_direction_internal(
DIMENSION_SLICE_DIMENSION_ID_RANGE_START_RANGE_END_IDX,
scankey,
1,
dimension_slice_earliest_non_osm_found,
(void *) &ret,
0,
ForwardScanDirection,
AccessShareLock,
NULL,
CurrentMemoryContext);

return ret;
}

typedef struct ReorderBoundaryState
{
int target;
Expand Down
1 change: 1 addition & 0 deletions src/dimension_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ extern int ts_dimension_slice_cmp_coordinate(const DimensionSlice *slice, int64

extern TSDLLEXPORT DimensionSlice *ts_dimension_slice_nth_latest_slice(int32 dimension_id, int n);
extern TSDLLEXPORT DimensionSlice *ts_dimension_slice_nth_earliest_slice(int32 dimension_id, int n);
extern TSDLLEXPORT DimensionSlice *ts_dimension_slice_earliest_non_osm_slice(int32 dimension_id);
extern TSDLLEXPORT int32 ts_dimension_slice_oldest_valid_chunk_for_reorder(
int32 job_id, int32 dimension_id, int skip_newest_distinct_buckets);
extern TSDLLEXPORT List *ts_dimension_slice_get_chunkids_to_compress(
Expand Down
50 changes: 49 additions & 1 deletion tsl/src/continuous_aggs/invalidation.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
#include "cache.h"
#include "continuous_aggs/invalidation_threshold.h"
#include "continuous_aggs/materialize.h"
#include "dimension.h"
#include "dimension_slice.h"
#include "guc.h"
#include "invalidation.h"
#include "refresh.h"
Expand Down Expand Up @@ -1286,6 +1288,30 @@ invalidation_cagg_has_pending_mat_ranges(ContinuousAgg *cagg)
return found;
}

/*
* Return the range_start of the earliest dimension slice (chunk) for the
* given raw hypertable, or INVAL_NEG_INFINITY when no chunks exist.
*
* This is used to ignore invalidation entries that lie entirely before any
* chunk — such entries cannot affect materialized data and should be
* preserved for future use rather than treated as pending work.
*/
int64
invalidation_get_earliest_chunk_start(int32 raw_hypertable_id)
{
Hypertable *raw_ht = cagg_get_hypertable_or_fail(raw_hypertable_id);
const Dimension *time_dim = hyperspace_get_open_dimension(raw_ht->space, 0);
Assert(time_dim != NULL);

DimensionSlice *slice = ts_dimension_slice_earliest_non_osm_slice(time_dim->fd.id);
if (slice != NULL)
{
return slice->fd.range_start;
}

return INVAL_NEG_INFINITY;
}

bool
invalidation_cagg_has_invalidations(ContinuousAgg *cagg)
{
Expand All @@ -1299,6 +1325,26 @@ invalidation_cagg_has_invalidations(ContinuousAgg *cagg)
PG_INT64_MAX);

int64 watermark = ts_cagg_watermark_get(cagg_hyper_id);

/*
* Expand the earliest chunk boundary to the end of its containing
* bucket. Invalidation entries are also bucket-expanded, so both
* sides of the comparison use consistent bucket-aligned values.
* Any entry whose expanded greatest_modified_value falls within this
* bucket or earlier lies entirely before the first data and can be
* ignored.
*/
int64 earliest_start = invalidation_get_earliest_chunk_start(cagg->data.raw_hypertable_id);
if (earliest_start != INVAL_NEG_INFINITY)
{
Invalidation boundary = { .lowest_modified_value = earliest_start,
.greatest_modified_value = earliest_start };
invalidation_expand_to_bucket_boundaries(&boundary,
cagg->partition_type,
cagg->bucket_function);
earliest_start = boundary.greatest_modified_value;
}
Comment on lines +1337 to +1346

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand why we want to ignore anything before the value initially return by invalidation_get_earliest_chunk_start. But why do we move earliest_start to its bucket end which is a further value?

I feel like skipping the whole bucket as if it's not invalidated may trigger rewrite to use the cagg when the specific bucket is actually stale in the cagg.


ts_scanner_foreach(&iterator)
{
TupleInfo *ti;
Expand All @@ -1310,9 +1356,11 @@ invalidation_cagg_has_invalidations(ContinuousAgg *cagg)
ti,
cagg->partition_type,
cagg->bucket_function);

/* Entries which cannot be invalidations */
if (logentry.greatest_modified_value == INVAL_NEG_INFINITY ||
logentry.lowest_modified_value >= watermark)
logentry.lowest_modified_value >= watermark ||
logentry.greatest_modified_value <= earliest_start)
{
continue;
}
Expand Down
1 change: 1 addition & 0 deletions tsl/src/continuous_aggs/invalidation.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,4 @@ extern HeapTuple create_invalidation_tup(const TupleDesc tupdesc, int32 cagg_hyp
extern bool invalidation_hypertable_has_invalidations(int32 hyper_id);
extern bool invalidation_cagg_has_invalidations(ContinuousAgg *cagg);
extern bool invalidation_cagg_has_pending_mat_ranges(ContinuousAgg *cagg);
extern int64 invalidation_get_earliest_chunk_start(int32 raw_hypertable_id);
47 changes: 44 additions & 3 deletions tsl/src/continuous_aggs/refresh.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <utils/tuplestore.h>

#include "bgw_policy/policies_v2.h"
#include "chunk.h"
#include "debug_point.h"
#include "dimension.h"
#include "dimension_slice.h"
Expand Down Expand Up @@ -58,7 +59,6 @@ typedef struct CaggRefreshSpiContext
int save_nestlevel;
} CaggRefreshSpiContext;

static Hypertable *cagg_get_hypertable_or_fail(int32 hypertable_id);
static InternalTimeRange get_largest_bucketed_window(Oid timetype, int64 bucket_width);
static InternalTimeRange
compute_inscribed_bucketed_refresh_window(const InternalTimeRange *const refresh_window,
Expand All @@ -84,7 +84,7 @@ static bool process_cagg_invalidations_and_refresh(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window,
const ContinuousAggRefreshContext context,
bool bucketing_refresh_window);
static Hypertable *
Hypertable *
cagg_get_hypertable_or_fail(int32 hypertable_id)
{
Hypertable *ht = ts_hypertable_get_by_id(hypertable_id);
Expand Down Expand Up @@ -1143,6 +1143,37 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg_arg,
refresh_window.end = computed_invalidation_threshold_for_cagg;
}

/*
* Cap the refresh window start at the earliest chunk when the
* hypertable has tiered data but reads are disabled.
*
* Without this cap the refresh window may begin before the
* earliest chunk, causing invalidations in that range to be
* processed and removed. By raising the start to the earliest
* chunk boundary we preserve those earlier invalidations so
* they can be processed later when tiered reads are re-enabled.
*
* We only apply the cap when tiered data exists but reads are
* off, because in that case the refresh cannot see the tiered
* data and would still consume the invalidations. When tiered
* reads are on (or there is no tiered data), no cap is needed.
*/
if (!ts_guc_enable_osm_reads)
{
int32 osm_chunk_id = ts_chunk_get_osm_chunk_id(cagg->data.raw_hypertable_id);

if (osm_chunk_id != INVALID_CHUNK_ID)
{
int64 earliest_start =
invalidation_get_earliest_chunk_start(cagg->data.raw_hypertable_id);

if (earliest_start > refresh_window.start)
{
refresh_window.start = earliest_start;
}
}
}

/* Capping the end might have made the window 0, or negative, so nothing to refresh in that
* case.
*
Expand Down Expand Up @@ -1258,7 +1289,17 @@ continuous_agg_split_refresh_window(ContinuousAgg *cagg, InternalTimeRange *orig
if (refresh_window.start_isnull)
{
debug_refresh_window(cagg, &refresh_window, "START IS NULL");
DimensionSlice *slice = ts_dimension_slice_nth_earliest_slice(time_dim->fd.id, 1);
DimensionSlice *slice;
// If tiered reads are disabled, we cap the refresh window to the start of the hypertable
// Get the earliest *non-osm* slice in this scenario.
if (!ts_guc_enable_osm_reads)
{
slice = ts_dimension_slice_earliest_non_osm_slice(time_dim->fd.id);
}
else
{
slice = ts_dimension_slice_nth_earliest_slice(time_dim->fd.id, 1);
}

/* If still there's no MIN slice range start then return no batches */
if (NULL == slice || TS_TIME_IS_MIN(slice->fd.range_start, refresh_window.type) ||
Expand Down
1 change: 1 addition & 0 deletions tsl/src/continuous_aggs/refresh.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
/* Default refresh newest first */
#define DEFAULT_REFRESH_NEWEST_FIRST true

Hypertable *cagg_get_hypertable_or_fail(int32 hypertable_id);
extern Datum continuous_agg_refresh(PG_FUNCTION_ARGS);
extern void continuous_agg_refresh_batched(ContinuousAgg *cagg, InternalTimeRange *refresh_window,
ContinuousAggRefreshContext context,
Expand Down
Loading
Loading