Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 42 additions & 26 deletions tsl/src/compression/batch_metadata_builder_bloom1.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ typedef struct Bloom1MetadataBuilder
Bloom1HasherInternal hasher;
} Bloom1MetadataBuilder;

static void bloom1_hasher_init(Bloom1HasherInternal *hasher, const Oid *type_oids, int num_columns);
static Bloom1HasherInternal bloom1_hasher_init(const Oid *type_oids, int num_columns);

/*
* Low-bias invertible hash function from this article:
Expand Down Expand Up @@ -544,6 +544,13 @@ bloom1_contains_context_prepare(FunctionCallInfo fcinfo, bool use_element_type)
num_columns)));
}

if (num_columns < 2)
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("composite bloom filter must have at least two columns")));
}

for (int i = 0; i < num_columns; i++)
{
type_oids[i] = TupleDescAttr(tupdesc, i)->atttypid;
Expand All @@ -556,7 +563,7 @@ bloom1_contains_context_prepare(FunctionCallInfo fcinfo, bool use_element_type)
num_columns = 1;
}

bloom1_hasher_init(&context->bloom_hasher, type_oids, num_columns);
context->bloom_hasher = bloom1_hasher_init(type_oids, num_columns);

get_typlenbyvalalign(context->element_type,
&context->element_typlen,
Expand All @@ -583,12 +590,6 @@ bloom1_contains_hash_internal(const char *words_buf, uint32 num_bits, uint64 has
{
Assert(words_buf != NULL);

/* Must be a power of two. */
CheckCompressedData(num_bits == (1ULL << pg_leftmost_one_pos32(num_bits)));

/* Must be >= 64 bits. */
CheckCompressedData(num_bits >= 64);

const uint32 num_word_bits = sizeof(*words_buf) * 8;
Assert(num_bits % num_word_bits == 0);
const uint32 log2_word_bits = pg_leftmost_one_pos32(num_word_bits);
Expand Down Expand Up @@ -767,12 +768,12 @@ bloom1_contains_any(PG_FUNCTION_ARGS)
const char *words_buf = bloom1_words_buf(bloom);
const uint32 num_bits = bloom1_num_bits(bloom);

/* Must be a power of two. */
CheckCompressedData(num_bits == (1ULL << pg_leftmost_one_pos32(num_bits)));

/* Must be >= 64 bits. */
CheckCompressedData(num_bits >= 64);

/* Must be a power of two. */
CheckCompressedData(num_bits == (1ULL << pg_leftmost_one_pos32(num_bits)));

const uint32 num_word_bits = sizeof(*words_buf) * 8;
Assert(num_bits % num_word_bits == 0);
const uint32 log2_word_bits = pg_leftmost_one_pos32(num_word_bits);
Expand Down Expand Up @@ -809,7 +810,7 @@ bloom1_contains_any(PG_FUNCTION_ARGS)
}

/*
* Checks whether any hashes of the given array can be present in the given
* Checks whether any hashes in the given array can be present in the given
* bloom filter. This is used for predicate pushdown where the values are
* pre-hashed at planning time.
*
Expand Down Expand Up @@ -864,6 +865,12 @@ bloom1_contains_any_hashes(PG_FUNCTION_ARGS)
const char *words_buf = bloom1_words_buf(bloom);
const uint32 num_bits = bloom1_num_bits(bloom);

/* Must be >= 64 bits. */
CheckCompressedData(num_bits >= 64);

/* Must be a power of two. */
CheckCompressedData(num_bits == (1ULL << pg_leftmost_one_pos32(num_bits)));

for (int i = 0; i < num_hashes; i++)
{
if (hash_nulls[i])
Expand Down Expand Up @@ -912,6 +919,13 @@ bloom1_hash(PG_FUNCTION_ARGS)
num_columns)));
}

if (num_columns < 2)
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("composite bloom filter must have at least two columns")));
}

for (int i = 0; i < num_columns; i++)
{
type_oids[i] = TupleDescAttr(tupdesc, i)->atttypid;
Expand All @@ -938,8 +952,8 @@ bloom1_hash(PG_FUNCTION_ARGS)
num_columns = 1;
}

Bloom1Hasher *hasher = bloom1_hasher_create(type_oids, num_columns);
uint64 hash = hasher->hash_values(hasher, values);
Bloom1HasherInternal hasher = bloom1_hasher_init(type_oids, num_columns);
uint64 hash = hasher.functions.hash_values(&hasher, values);
PG_RETURN_INT64((int64) hash);
}

Expand Down Expand Up @@ -981,36 +995,39 @@ batch_metadata_builder_bloom1_varlena_size(void)
return bloom1_varlena_alloc_size(desired_bits);
}

static void
bloom1_hasher_init(Bloom1HasherInternal *hasher, const Oid *type_oids, int num_columns)
static Bloom1HasherInternal
bloom1_hasher_init(const Oid *type_oids, int num_columns)
{
*hasher = (Bloom1HasherInternal){
Bloom1HasherInternal hasher = (Bloom1HasherInternal){
.functions =
(Bloom1Hasher){
.hash_values = bloom1_hash_values,
.num_columns = num_columns,
},
};

Assert(num_columns != 0);
for (int i = 0; i < num_columns; i++)
{
hasher->hash_functions[i] =
bloom1_get_hash_function(type_oids[i], &hasher->hash_function_finfos[i]);
if (hasher->hash_functions[i] == NULL)
hasher.hash_functions[i] =
bloom1_get_hash_function(type_oids[i], &hasher.hash_function_finfos[i]);
if (hasher.hash_functions[i] == NULL)
{
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_FUNCTION),
errmsg("the argument type %s lacks an extended hash function",
format_type_be(type_oids[i]))));
}
}

return hasher;
}

Bloom1Hasher *
bloom1_hasher_create(const Oid *type_oids, int num_columns)
{
Bloom1HasherInternal *hasher = palloc(sizeof(*hasher));
bloom1_hasher_init(hasher, type_oids, num_columns);
*hasher = bloom1_hasher_init(type_oids, num_columns);
return &hasher->functions;
}

Expand Down Expand Up @@ -1038,7 +1055,7 @@ batch_metadata_builder_bloom1_create(int num_columns, const Oid *type_oids,
memcpy(builder->input_columns, attnums, num_columns * sizeof(AttrNumber));

/* Initialize the embedded hasher */
bloom1_hasher_init(&builder->hasher, type_oids, num_columns);
builder->hasher = bloom1_hasher_init(type_oids, num_columns);

/*
* Initialize the bloom filter.
Expand Down Expand Up @@ -1196,15 +1213,15 @@ ts_bloom1_composite_debug_hash(PG_FUNCTION_ARGS)
}
ReleaseTupleDesc(tupdesc);

Bloom1Hasher *hasher = bloom1_hasher_create(type_oids, num_fields);
Bloom1HasherInternal hasher = bloom1_hasher_init(type_oids, num_fields);

NullableDatum values[MAX_BLOOM_FILTER_COLUMNS];
for (int i = 0; i < num_fields; i++)
{
values[i].value = GetAttributeByNum(tuple, i + 1, &values[i].isnull);
}

uint64 hash = hasher->hash_values(hasher, values);
uint64 hash = hasher.functions.hash_values(&hasher, values);
PG_RETURN_INT64((int64) hash);
}

Expand All @@ -1227,9 +1244,8 @@ bloom1_contains_hash(Datum bloom_datum, uint64 hash)
const uint32 num_bits = 8 * VARSIZE_ANY_EXHDR(bloom);

/* Validate bloom structure */
CheckCompressedData(num_bits != 0);
CheckCompressedData(num_bits == (1ULL << pg_leftmost_one_pos32(num_bits)));
CheckCompressedData(num_bits >= 64);
CheckCompressedData(num_bits == (1ULL << pg_leftmost_one_pos32(num_bits)));

return bloom1_contains_hash_internal(words_buf, num_bits, hash);
}
16 changes: 12 additions & 4 deletions tsl/test/expected/compress_bloom_sparse_debug.out
Original file line number Diff line number Diff line change
Expand Up @@ -156,14 +156,25 @@ select ts_bloom1_debug_hash('2025-05-05'::timestamptz);
----------------------
2448302731963240730

-- The "contains" functions should error out when called with wrong arguments.
\set ON_ERROR_STOP 0
-- The "contains" functions should error out when called with wrong arguments.
select _timescaledb_functions.bloom1_contains('\xffffffffffffffff'::_timescaledb_internal.bloom1, 1::bit) ;
ERROR: the argument type bit lacks an extended hash function
select _timescaledb_functions.bloom1_contains_any('\xffffffffffffffff'::_timescaledb_internal.bloom1, array[1::bit]) ;
ERROR: the argument type bit lacks an extended hash function
select _timescaledb_functions.bloom1_contains(_timescaledb_functions.bloom1in('\x'::cstring), 1);
ERROR: the compressed data is corrupt
select _timescaledb_functions.bloom1_contains_any(_timescaledb_functions.bloom1in('\x'::cstring), 1);
ERROR: function _timescaledb_functions.bloom1_contains_any(_timescaledb_internal.bloom1, integer) does not exist at character 8
select _timescaledb_functions.bloom1_contains('\x'::_timescaledb_internal.bloom1, ROW(1));
ERROR: composite bloom filter must have at least two columns
select _timescaledb_functions.bloom1_contains('\xffffffffffffffff'::_timescaledb_internal.bloom1, ROW());
ERROR: composite bloom filter must have at least two columns
-- The hash function is callable by user, so must return proper error
select _timescaledb_functions.bloom1_hash(ROW(1, 2, 3, 4, 5, 6, 7, 8, 9));
ERROR: composite bloom filter supports at most 8 columns, got 9
select _timescaledb_functions.bloom1_hash(ROW());
ERROR: composite bloom filter must have at least two columns
\set ON_ERROR_STOP 1
-- Test that the "contains" function cope with different source chunks.
create table detoaster(ts int, tag text) with (tsdb.hypertable,
Expand Down Expand Up @@ -236,6 +247,3 @@ SELECT _timescaledb_functions.bloom1_contains(NULL, pg_catalog.record_in(null::c
-----------------
f

-- The hash function is callable by user, so must return proper error
SELECT _timescaledb_functions.bloom1_hash(ROW(1, 2, 3, 4, 5, 6, 7, 8, 9));
ERROR: composite bloom filter supports at most 8 columns, got 9
15 changes: 12 additions & 3 deletions tsl/test/sql/compress_bloom_sparse_debug.sql
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,26 @@ select ts_bloom1_debug_hash('2025-05-05'::timestamp);
select ts_bloom1_debug_hash('2025-05-05'::timestamptz);


-- The "contains" functions should error out when called with wrong arguments.
\set ON_ERROR_STOP 0

-- The "contains" functions should error out when called with wrong arguments.
select _timescaledb_functions.bloom1_contains('\xffffffffffffffff'::_timescaledb_internal.bloom1, 1::bit) ;

select _timescaledb_functions.bloom1_contains_any('\xffffffffffffffff'::_timescaledb_internal.bloom1, array[1::bit]) ;

select _timescaledb_functions.bloom1_contains(_timescaledb_functions.bloom1in('\x'::cstring), 1);

select _timescaledb_functions.bloom1_contains_any(_timescaledb_functions.bloom1in('\x'::cstring), 1);

select _timescaledb_functions.bloom1_contains('\x'::_timescaledb_internal.bloom1, ROW(1));

select _timescaledb_functions.bloom1_contains('\xffffffffffffffff'::_timescaledb_internal.bloom1, ROW());

-- The hash function is callable by user, so must return proper error
select _timescaledb_functions.bloom1_hash(ROW(1, 2, 3, 4, 5, 6, 7, 8, 9));

select _timescaledb_functions.bloom1_hash(ROW());

\set ON_ERROR_STOP 1


Expand Down Expand Up @@ -173,7 +184,5 @@ SELECT _timescaledb_functions.bloom1_contains('\xd098c885f08468eb8916751d947f248
SELECT _timescaledb_functions.bloom1_contains(NULL, pg_catalog.record_in(null::cstring, 23::oid, 12::int4));


-- The hash function is callable by user, so must return proper error
SELECT _timescaledb_functions.bloom1_hash(ROW(1, 2, 3, 4, 5, 6, 7, 8, 9));


Loading