Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
244 changes: 244 additions & 0 deletions parquet/src/arrow/push_decoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,30 @@ impl ParquetPushDecoder {
self.state.row_groups_remaining()
}

/// Returns the row-group index that the next call to
/// [`Self::try_next_reader`] will yield a reader for, after applying
/// any internal skipping (row selection emptiness, exhausted budget,
/// finished state).
///
/// Safe to call at any time. When called mid-row-group (i.e. while a
/// previously-emitted [`ParquetRecordBatchReader`] is still being
/// drained), the returned index refers to the row group that
/// [`Self::try_next_reader`] will produce *after* the current one.
///
/// Returns `Ok(None)` when:
/// - the decoder has no more row groups to read, or
/// - every remaining row group would be skipped.
///
/// This method does not mutate decoder state. It is useful for
/// callers that maintain per-row-group state in lock-step with the
/// decoder (e.g. dynamic row-group pruners) to determine which row
/// group the next reader corresponds to, since
/// [`Self::try_next_reader`] may silently advance past row groups
/// based on filtering and other criteria.
pub fn peek_next_row_group(&self) -> Result<Option<usize>, ParquetError> {
self.state.peek_next_row_group()
}

Comment on lines +561 to +564

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make this peek_remaining_row_groups()? That might enable more usages in the future.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you suggesting it actually return more than one row group? Something like

    pub fn peek_next_row_groups(&self) -> Result<Option<&[usize]>, ParquetError> {

or just rename it?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am suggesting it return more than one. I'm not sure if that's helpful, but I think we have that information and if we do we might as well return it. It would avoid adding that API later and then there being overlap, but maybe it's worth doing the simpler API for now and we can always add another one later.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, future work may dependent on this.

/// Decompose this decoder back into a [`ParquetPushDecoderBuilder`] for the
/// row groups that have *not* yet been decoded.
///
Expand Down Expand Up @@ -840,6 +864,26 @@ impl ParquetDecoderState {
}
}

/// See [`ParquetPushDecoder::peek_next_row_group`] for the public
/// API contract. This inner method delegates to the underlying
/// [`RemainingRowGroups`] for both `ReadingRowGroup` and
/// `DecodingRowGroup`: mid-row-group the answer is the row group
/// `try_next_reader` will produce *after* the active one finishes,
/// which is exactly what `RemainingRowGroups::peek_next_row_group`
/// computes from the queued frontier.
fn peek_next_row_group(&self) -> Result<Option<usize>, ParquetError> {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe you could add a comment with a link to ParquetPushDecoder::peek_next_row_group?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion, addressed in latest PR.

match self {
ParquetDecoderState::ReadingRowGroup {
remaining_row_groups,
} => remaining_row_groups.peek_next_row_group(),
ParquetDecoderState::DecodingRowGroup {
remaining_row_groups,
..
} => remaining_row_groups.peek_next_row_group(),
ParquetDecoderState::Finished => Ok(None),
}
}

fn into_builder(self) -> Result<ParquetPushDecoderBuilder, ParquetError> {
let remaining_row_groups = match self {
ParquetDecoderState::ReadingRowGroup {
Expand Down Expand Up @@ -1743,6 +1787,206 @@ mod test {
expect_finished(decoder.try_decode());
}

/// `peek_next_row_group` reports the index of the row group the
/// next `try_next_reader` call will hand back, matching the
/// frontier's internal skip logic.
#[test]
fn test_peek_next_row_group_basic() {
let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
.unwrap()
.build()
.unwrap();

// Two row groups (0, 1). At boundary before any read, peek should
// see RG 0.
assert_eq!(decoder.peek_next_row_group().unwrap(), Some(0));
assert!(decoder.is_at_row_group_boundary());

let ranges = expect_needs_data(decoder.try_next_reader());
push_ranges_to_decoder(&mut decoder, ranges);
let reader = expect_data(decoder.try_next_reader());
// Once the reader for RG 0 has been handed off, the decoder is
// back at a boundary waiting for RG 1 — peek must reflect that
// (the active reader lives outside the decoder).
assert!(decoder.is_at_row_group_boundary());
assert_eq!(decoder.peek_next_row_group().unwrap(), Some(1));

// Drain RG 0's reader and consume RG 1.
for batch in reader {
let _ = batch.unwrap();
}
let ranges = expect_needs_data(decoder.try_next_reader());
push_ranges_to_decoder(&mut decoder, ranges);
let reader = expect_data(decoder.try_next_reader());
for batch in reader {
let _ = batch.unwrap();
}

// No row groups left.
assert_eq!(decoder.peek_next_row_group().unwrap(), None);
}

/// `peek_next_row_group` honors `with_row_groups` — restricting the
/// scan to a single row group means peek reports only that one and
/// then `None`.
#[test]
fn test_peek_next_row_group_respects_with_row_groups() {
let decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
.unwrap()
.with_row_groups(vec![1])
.build()
.unwrap();

assert_eq!(decoder.peek_next_row_group().unwrap(), Some(1));
}

/// When a row-selection segment leaves the next row group with zero
/// selected rows, `peek_next_row_group` mirrors
/// `next_readable_row_group`'s skip: it returns the *following*
/// row group instead of the empty one.
#[test]
fn test_peek_next_row_group_skips_empty_selection() {
// Each row group has 200 rows. Skip all 200 of RG 0 plus 50 of
// RG 1; the next reader will be for RG 1, not RG 0.
let decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
.unwrap()
.with_row_selection(RowSelection::from(vec![
RowSelector::skip(250),
RowSelector::select(100),
]))
.build()
.unwrap();

assert_eq!(decoder.peek_next_row_group().unwrap(), Some(1));
}

/// `peek_next_row_group` returns `None` on a finished decoder.
#[test]
fn test_peek_next_row_group_finished() {
let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
.unwrap()
.with_row_groups(vec![])
.build()
.unwrap();

// No row groups requested ⇒ already finished, no peek.
expect_finished(decoder.try_next_reader());
assert_eq!(decoder.peek_next_row_group().unwrap(), None);
}

/// `peek_next_row_group` mirrors `next_readable_row_group`'s
/// budget-skipping logic: with an `OFFSET` that consumes the
/// entire first row group, peek must return the *following*
/// row group, not RG 0 (no predicates, so budget skips apply).
#[test]
fn test_peek_next_row_group_skips_for_budget() {
// Each row group has 200 rows. OFFSET 200 drains RG 0 entirely
// and lands the decoder's first emitted reader at RG 1.
let decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
.unwrap()
.with_offset(200)
.build()
.unwrap();

assert_eq!(decoder.peek_next_row_group().unwrap(), Some(1));
}

/// OFFSET larger than every remaining row group's row count
/// exhausts the budget, so peek should report `None` — matching
/// `next_readable_row_group`'s behavior of producing `Finished`.
#[test]
fn test_peek_next_row_group_budget_drains_all() {
// 2 row groups × 200 rows = 400 total. OFFSET 500 cannot land
// anywhere — every RG is skipped under the budget.
let decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
.unwrap()
.with_offset(500)
.build()
.unwrap();

assert_eq!(decoder.peek_next_row_group().unwrap(), None);
}

/// `peek_next_row_group` is safe to call while a row group's reader
/// is still being drained via `try_decode`. In `DecodingRowGroup`
/// state it must report the row group `try_next_reader` will yield
/// a reader for *after* the active one — never `None` just because
/// the decoder is mid-row-group.
#[test]
fn test_peek_next_row_group_during_decoding_row_group() {
// Two row groups (200 rows each), small batch size so the
// decoder stays in `DecodingRowGroup` across multiple
// `try_decode` calls within RG 0.
let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
.unwrap()
.with_batch_size(100)
.build()
.unwrap();
decoder
.push_range(test_file_range(), TEST_FILE_DATA.clone())
.unwrap();

// First batch of RG 0 — after this, the decoder is in
// `DecodingRowGroup` state with the reader retained internally
// and one more 100-row batch still owed for RG 0. Peek must
// therefore look past the active RG to RG 1.
let _batch0 = expect_data(decoder.try_decode());
assert!(!decoder.is_at_row_group_boundary());
assert_eq!(decoder.peek_next_row_group().unwrap(), Some(1));

// Drain the rest of RG 0; peek still reports RG 1.
let _batch1 = expect_data(decoder.try_decode());
assert_eq!(decoder.peek_next_row_group().unwrap(), Some(1));

// Move into RG 1 — peek now sees no further row groups.
let _batch2 = expect_data(decoder.try_decode());
assert!(!decoder.is_at_row_group_boundary());
assert_eq!(decoder.peek_next_row_group().unwrap(), None);

let _batch3 = expect_data(decoder.try_decode());
expect_finished(decoder.try_decode());
}

/// Peeking is a read-only operation: calling it repeatedly between
/// `try_next_reader` calls must never change which row group the
/// reader path actually produces. Drives the decoder all the way
/// through and asserts each peek/read pair agrees.
#[test]
fn test_peek_next_row_group_does_not_mutate_state() {
let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
.unwrap()
.build()
.unwrap();

// Two row groups expected — drive both, asserting peek/read agree.
for expected_rg in [0usize, 1usize] {
assert!(decoder.is_at_row_group_boundary());

// Multiple peeks before reading must all agree, and must not
// disturb the upcoming read.
let first_peek = decoder.peek_next_row_group().unwrap();
let second_peek = decoder.peek_next_row_group().unwrap();
let third_peek = decoder.peek_next_row_group().unwrap();
assert_eq!(first_peek, Some(expected_rg));
assert_eq!(first_peek, second_peek);
assert_eq!(second_peek, third_peek);

// Now read for real and confirm the decoder hands back exactly
// what peek promised.
let ranges = expect_needs_data(decoder.try_next_reader());
push_ranges_to_decoder(&mut decoder, ranges);
let reader = expect_data(decoder.try_next_reader());
for batch in reader {
let _ = batch.unwrap();
}
}

// Decoder is drained. Peek must agree with `try_next_reader`'s
// terminal state.
assert_eq!(decoder.peek_next_row_group().unwrap(), None);
expect_finished(decoder.try_next_reader());
}

/// `into_builder` between row groups recovers a builder for the
/// not-yet-decoded row groups; rebuilding it with a new row filter
/// applies that filter to the subsequent row groups while leaving the
Expand Down
Loading
Loading