Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 158 additions & 0 deletions parquet/src/arrow/push_decoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,31 @@ impl ParquetPushDecoder {
self.state.row_groups_remaining()
}

/// Returns the file-level row-group index that the next call to
Comment thread
zhuqi-lucas marked this conversation as resolved.
Outdated
/// [`Self::try_next_reader`] will yield a reader for, after applying
/// any internal skipping (row selection emptiness, exhausted budget,
/// finished state). Returns `Ok(None)` when:
Comment thread
zhuqi-lucas marked this conversation as resolved.
Outdated
/// - the decoder has no more row groups to read,
/// - the decoder is currently inside a row group (consumers should
/// call [`Self::is_at_row_group_boundary`] first), or
/// - every remaining row group would be skipped.
///
/// Returns `Err` when reading row-group metadata fails (e.g.
/// `usize` overflow on 32-bit targets), matching the error surface
/// of `try_next_reader` so peek and read paths report errors
/// consistently.
Comment thread
zhuqi-lucas marked this conversation as resolved.
Outdated
///
/// This is a read-only peek: it does not mutate decoder state. It is
/// useful for adaptive callers that maintain per-row-group state in
/// lock-step with the decoder (e.g. dynamic row-group pruners or
/// per-RG `RowFilter` toggles): without this peek the caller has no
/// way to know which row group the next reader actually corresponds
/// to, because [`Self::try_next_reader`] may silently advance past
/// row groups whose row selection is empty.
Comment thread
zhuqi-lucas marked this conversation as resolved.
Outdated
pub fn peek_next_row_group(&self) -> Result<Option<usize>, ParquetError> {
self.state.peek_next_row_group()
}

Comment on lines +561 to +564

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make this peek_remaining_row_groups()? That might enable more usages in the future.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you suggesting it actually return more than one row group? Something like

    pub fn peek_next_row_groups(&self) -> Result<Option<&[usize]>, ParquetError> {

or just rename it?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am suggesting it return more than one. I'm not sure if that's helpful, but I think we have that information and if we do we might as well return it. It would avoid adding that API later and then there being overlap, but maybe it's worth doing the simpler API for now and we can always add another one later.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, future work may dependent on this.

/// Decompose this decoder back into a [`ParquetPushDecoderBuilder`] for the
/// row groups that have *not* yet been decoded.
///
Expand Down Expand Up @@ -840,6 +865,19 @@ impl ParquetDecoderState {
}
}

fn peek_next_row_group(&self) -> Result<Option<usize>, ParquetError> {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe you could add a comment with a link to ParquetPushDecoder::peek_next_row_group?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion, addressed in latest PR.

match self {
ParquetDecoderState::ReadingRowGroup {
remaining_row_groups,
} => remaining_row_groups.peek_next_row_group(),
// We only expose a meaningful answer at row-group boundaries.
// Mid-row-group there is no "next" — the active reader is
// tied to the current row group.
ParquetDecoderState::DecodingRowGroup { .. } => Ok(None),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a well defined next row group even in the middle of reading a row group. Why not return the remaining row groups here? Something like

 ParquetDecoderState::DecodingRowGroup { 
   remaining_row_groups , 
   ..
} => remaining_row_groups.peek_next_row_group(),

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a great idea, addressed it in latest PR!

ParquetDecoderState::Finished => Ok(None),
}
}

fn into_builder(self) -> Result<ParquetPushDecoderBuilder, ParquetError> {
let remaining_row_groups = match self {
ParquetDecoderState::ReadingRowGroup {
Expand Down Expand Up @@ -1743,6 +1781,126 @@ mod test {
expect_finished(decoder.try_decode());
}

/// `peek_next_row_group` reports the index of the row group the
/// next `try_next_reader` call will hand back, matching the
/// frontier's internal skip logic.
#[test]
fn test_peek_next_row_group_basic() {
let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
.unwrap()
.build()
.unwrap();

// Two row groups (0, 1). At boundary before any read, peek should
// see RG 0.
assert_eq!(decoder.peek_next_row_group().unwrap(), Some(0));
assert!(decoder.is_at_row_group_boundary());

let ranges = expect_needs_data(decoder.try_next_reader());
push_ranges_to_decoder(&mut decoder, ranges);
let reader = expect_data(decoder.try_next_reader());
// Once the reader for RG 0 has been handed off, the decoder is
// back at a boundary waiting for RG 1 — peek must reflect that
// (the active reader lives outside the decoder).
assert!(decoder.is_at_row_group_boundary());
assert_eq!(decoder.peek_next_row_group().unwrap(), Some(1));

// Drain RG 0's reader and consume RG 1.
for batch in reader {
let _ = batch.unwrap();
}
let ranges = expect_needs_data(decoder.try_next_reader());
push_ranges_to_decoder(&mut decoder, ranges);
let reader = expect_data(decoder.try_next_reader());
for batch in reader {
let _ = batch.unwrap();
}

// No row groups left.
assert_eq!(decoder.peek_next_row_group().unwrap(), None);
}

/// `peek_next_row_group` honors `with_row_groups` — restricting the
/// scan to a single row group means peek reports only that one and
/// then `None`.
#[test]
fn test_peek_next_row_group_respects_with_row_groups() {
let decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
.unwrap()
.with_row_groups(vec![1])
.build()
.unwrap();

assert_eq!(decoder.peek_next_row_group().unwrap(), Some(1));
}

/// When a row-selection segment leaves the next row group with zero
/// selected rows, `peek_next_row_group` mirrors
/// `next_readable_row_group`'s skip: it returns the *following*
/// row group instead of the empty one.
#[test]
Comment thread
zhuqi-lucas marked this conversation as resolved.
fn test_peek_next_row_group_skips_empty_selection() {
// Each row group has 200 rows. Skip all 200 of RG 0 plus 50 of
// RG 1; the next reader will be for RG 1, not RG 0.
let decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
.unwrap()
.with_row_selection(RowSelection::from(vec![
RowSelector::skip(250),
RowSelector::select(100),
]))
.build()
.unwrap();

assert_eq!(decoder.peek_next_row_group().unwrap(), Some(1));
}

/// `peek_next_row_group` returns `None` on a finished decoder.
#[test]
fn test_peek_next_row_group_finished() {
let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
.unwrap()
.with_row_groups(vec![])
.build()
.unwrap();

// No row groups requested ⇒ already finished, no peek.
expect_finished(decoder.try_next_reader());
assert_eq!(decoder.peek_next_row_group().unwrap(), None);
}

/// `peek_next_row_group` mirrors `next_readable_row_group`'s
/// budget-skipping logic: with an `OFFSET` that consumes the
/// entire first row group, peek must return the *following*
/// row group, not RG 0 (no predicates, so budget skips apply).
#[test]
fn test_peek_next_row_group_skips_for_budget() {
// Each row group has 200 rows. OFFSET 200 drains RG 0 entirely
// and lands the decoder's first emitted reader at RG 1.
let decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
.unwrap()
.with_offset(200)
.build()
.unwrap();

assert_eq!(decoder.peek_next_row_group().unwrap(), Some(1));
}

/// OFFSET larger than every remaining row group's row count
/// exhausts the budget, so peek should report `None` — matching
/// `next_readable_row_group`'s behavior of producing `Finished`.
#[test]
fn test_peek_next_row_group_budget_drains_all() {
// 2 row groups × 200 rows = 400 total. OFFSET 500 cannot land
// anywhere — every RG is skipped under the budget.
let decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
.unwrap()
.with_offset(500)
.build()
.unwrap();

assert_eq!(decoder.peek_next_row_group().unwrap(), None);
}

/// `into_builder` between row groups recovers a builder for the
/// not-yet-decoded row groups; rebuilding it with a new row filter
/// applies that filter to the subsequent row groups while leaving the
Expand Down
73 changes: 73 additions & 0 deletions parquet/src/arrow/push_decoder/remaining.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,63 @@ impl RowGroupFrontier {
self.budget = budget;
}

/// Peek at the next row-group index `next_readable_row_group` would
/// hand out, without mutating any state. Returns `None` if every
/// remaining row group would be skipped under the current
/// selection/budget, or if the queue is empty.
///
/// Mirrors the structure of `next_readable_row_group` but only walks

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to avoid the duplication with next_readable_row_group? I worry the two could drift out of sync

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, addressed in latest PR.

/// borrowed state — used by [`super::ParquetPushDecoder::peek_next_row_group`]
/// to let adaptive callers (e.g. dynamic row-group pruners or per-RG
/// `RowFilter` toggles) keep their per-RG state in lock-step with
/// the reader the decoder is about to emit.
fn peek_next_row_group(&self) -> Result<Option<usize>, ParquetError> {
// Short-circuit: budget exhausted or selection drained ⇒ same
// outcome as `next_readable_row_group`'s early return.
if self.budget.is_exhausted()
|| self
.selection
.as_ref()
.is_some_and(|selection| selection.row_count() == 0)
{
return Ok(None);
}

// We may have to walk past row groups whose split selection is
// empty. Cloning the selection lets us run the same `split_off`
// logic without disturbing the real one.
let mut selection = self.selection.clone();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😬 it would be nice to avoid cloning the selectin

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point!

let mut budget = self.budget;
for &row_group_idx in &self.row_groups {
let row_count = self.row_group_num_rows(row_group_idx)?;
let selected_rows = match selection.as_mut() {
Some(remaining) => {
let rg_segment = remaining.split_off(row_count);
rg_segment.row_count()
}
None => row_count,
};
if selected_rows == 0 {
// Same skip path as `next_readable_row_group`: row
// selection drained for this RG, move on.
continue;
}
if self.has_predicates {
// Predicates disable budget-based RG skipping for this RG;
// budget still gates row emission inside the row group.
return Ok(Some(row_group_idx));
}
Comment thread
zhuqi-lucas marked this conversation as resolved.
let rows_after_budget = budget.rows_after(selected_rows);
if rows_after_budget != 0 {
return Ok(Some(row_group_idx));
}
// Budget-skip: advance the simulated budget and keep
// walking; the next iteration sees the post-advance budget.
budget = budget.advance(selected_rows, rows_after_budget);
}
Ok(None)
}

fn clear_remaining(&mut self) {
self.selection = None;
self.row_groups.clear();
Expand Down Expand Up @@ -299,6 +356,22 @@ impl RemainingRowGroups {
self.frontier.row_groups.len()
}

/// Peek at the file-level row-group index that the next call to
/// [`Self::try_next_reader`] will produce a reader for, after
/// simulating the same skip logic [`Self::try_next_reader`] applies
/// internally (row-selection emptiness + offset/limit budget). Does
/// not mutate state.
///
/// Returns `None` when the active row group is still being decoded,
/// when no row groups remain, or when every remaining row group
/// would be skipped under the current selection/budget.
pub fn peek_next_row_group(&self) -> Result<Option<usize>, ParquetError> {
if self.row_group_reader_builder.has_active_row_group() {
return Ok(None);
}
self.frontier.peek_next_row_group()
}

/// returns [`ParquetRecordBatchReader`] suitable for reading the next
/// group of rows from the Parquet data, or the list of data ranges still
/// needed to proceed
Expand Down
Loading