diff --git a/parquet/src/arrow/push_decoder/mod.rs b/parquet/src/arrow/push_decoder/mod.rs index 6dc5520bb975..ba075b4c08b6 100644 --- a/parquet/src/arrow/push_decoder/mod.rs +++ b/parquet/src/arrow/push_decoder/mod.rs @@ -538,6 +538,30 @@ impl ParquetPushDecoder { self.state.row_groups_remaining() } + /// Returns the row-group index that the next call to + /// [`Self::try_next_reader`] will yield a reader for, after applying + /// any internal skipping (row selection emptiness, exhausted budget, + /// finished state). + /// + /// Safe to call at any time. When called mid-row-group (i.e. while a + /// previously-emitted [`ParquetRecordBatchReader`] is still being + /// drained), the returned index refers to the row group that + /// [`Self::try_next_reader`] will produce *after* the current one. + /// + /// Returns `Ok(None)` when: + /// - the decoder has no more row groups to read, or + /// - every remaining row group would be skipped. + /// + /// This method does not mutate decoder state. It is useful for + /// callers that maintain per-row-group state in lock-step with the + /// decoder (e.g. dynamic row-group pruners) to determine which row + /// group the next reader corresponds to, since + /// [`Self::try_next_reader`] may silently advance past row groups + /// based on filtering and other criteria. + pub fn peek_next_row_group(&self) -> Result, ParquetError> { + self.state.peek_next_row_group() + } + /// Decompose this decoder back into a [`ParquetPushDecoderBuilder`] for the /// row groups that have *not* yet been decoded. /// @@ -840,6 +864,26 @@ impl ParquetDecoderState { } } + /// See [`ParquetPushDecoder::peek_next_row_group`] for the public + /// API contract. This inner method delegates to the underlying + /// [`RemainingRowGroups`] for both `ReadingRowGroup` and + /// `DecodingRowGroup`: mid-row-group the answer is the row group + /// `try_next_reader` will produce *after* the active one finishes, + /// which is exactly what `RemainingRowGroups::peek_next_row_group` + /// computes from the queued frontier. + fn peek_next_row_group(&self) -> Result, ParquetError> { + match self { + ParquetDecoderState::ReadingRowGroup { + remaining_row_groups, + } => remaining_row_groups.peek_next_row_group(), + ParquetDecoderState::DecodingRowGroup { + remaining_row_groups, + .. + } => remaining_row_groups.peek_next_row_group(), + ParquetDecoderState::Finished => Ok(None), + } + } + fn into_builder(self) -> Result { let remaining_row_groups = match self { ParquetDecoderState::ReadingRowGroup { @@ -1743,6 +1787,206 @@ mod test { expect_finished(decoder.try_decode()); } + /// `peek_next_row_group` reports the index of the row group the + /// next `try_next_reader` call will hand back, matching the + /// frontier's internal skip logic. + #[test] + fn test_peek_next_row_group_basic() { + let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .build() + .unwrap(); + + // Two row groups (0, 1). At boundary before any read, peek should + // see RG 0. + assert_eq!(decoder.peek_next_row_group().unwrap(), Some(0)); + assert!(decoder.is_at_row_group_boundary()); + + let ranges = expect_needs_data(decoder.try_next_reader()); + push_ranges_to_decoder(&mut decoder, ranges); + let reader = expect_data(decoder.try_next_reader()); + // Once the reader for RG 0 has been handed off, the decoder is + // back at a boundary waiting for RG 1 — peek must reflect that + // (the active reader lives outside the decoder). + assert!(decoder.is_at_row_group_boundary()); + assert_eq!(decoder.peek_next_row_group().unwrap(), Some(1)); + + // Drain RG 0's reader and consume RG 1. + for batch in reader { + let _ = batch.unwrap(); + } + let ranges = expect_needs_data(decoder.try_next_reader()); + push_ranges_to_decoder(&mut decoder, ranges); + let reader = expect_data(decoder.try_next_reader()); + for batch in reader { + let _ = batch.unwrap(); + } + + // No row groups left. + assert_eq!(decoder.peek_next_row_group().unwrap(), None); + } + + /// `peek_next_row_group` honors `with_row_groups` — restricting the + /// scan to a single row group means peek reports only that one and + /// then `None`. + #[test] + fn test_peek_next_row_group_respects_with_row_groups() { + let decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .with_row_groups(vec![1]) + .build() + .unwrap(); + + assert_eq!(decoder.peek_next_row_group().unwrap(), Some(1)); + } + + /// When a row-selection segment leaves the next row group with zero + /// selected rows, `peek_next_row_group` mirrors + /// `next_readable_row_group`'s skip: it returns the *following* + /// row group instead of the empty one. + #[test] + fn test_peek_next_row_group_skips_empty_selection() { + // Each row group has 200 rows. Skip all 200 of RG 0 plus 50 of + // RG 1; the next reader will be for RG 1, not RG 0. + let decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .with_row_selection(RowSelection::from(vec![ + RowSelector::skip(250), + RowSelector::select(100), + ])) + .build() + .unwrap(); + + assert_eq!(decoder.peek_next_row_group().unwrap(), Some(1)); + } + + /// `peek_next_row_group` returns `None` on a finished decoder. + #[test] + fn test_peek_next_row_group_finished() { + let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .with_row_groups(vec![]) + .build() + .unwrap(); + + // No row groups requested ⇒ already finished, no peek. + expect_finished(decoder.try_next_reader()); + assert_eq!(decoder.peek_next_row_group().unwrap(), None); + } + + /// `peek_next_row_group` mirrors `next_readable_row_group`'s + /// budget-skipping logic: with an `OFFSET` that consumes the + /// entire first row group, peek must return the *following* + /// row group, not RG 0 (no predicates, so budget skips apply). + #[test] + fn test_peek_next_row_group_skips_for_budget() { + // Each row group has 200 rows. OFFSET 200 drains RG 0 entirely + // and lands the decoder's first emitted reader at RG 1. + let decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .with_offset(200) + .build() + .unwrap(); + + assert_eq!(decoder.peek_next_row_group().unwrap(), Some(1)); + } + + /// OFFSET larger than every remaining row group's row count + /// exhausts the budget, so peek should report `None` — matching + /// `next_readable_row_group`'s behavior of producing `Finished`. + #[test] + fn test_peek_next_row_group_budget_drains_all() { + // 2 row groups × 200 rows = 400 total. OFFSET 500 cannot land + // anywhere — every RG is skipped under the budget. + let decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .with_offset(500) + .build() + .unwrap(); + + assert_eq!(decoder.peek_next_row_group().unwrap(), None); + } + + /// `peek_next_row_group` is safe to call while a row group's reader + /// is still being drained via `try_decode`. In `DecodingRowGroup` + /// state it must report the row group `try_next_reader` will yield + /// a reader for *after* the active one — never `None` just because + /// the decoder is mid-row-group. + #[test] + fn test_peek_next_row_group_during_decoding_row_group() { + // Two row groups (200 rows each), small batch size so the + // decoder stays in `DecodingRowGroup` across multiple + // `try_decode` calls within RG 0. + let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .with_batch_size(100) + .build() + .unwrap(); + decoder + .push_range(test_file_range(), TEST_FILE_DATA.clone()) + .unwrap(); + + // First batch of RG 0 — after this, the decoder is in + // `DecodingRowGroup` state with the reader retained internally + // and one more 100-row batch still owed for RG 0. Peek must + // therefore look past the active RG to RG 1. + let _batch0 = expect_data(decoder.try_decode()); + assert!(!decoder.is_at_row_group_boundary()); + assert_eq!(decoder.peek_next_row_group().unwrap(), Some(1)); + + // Drain the rest of RG 0; peek still reports RG 1. + let _batch1 = expect_data(decoder.try_decode()); + assert_eq!(decoder.peek_next_row_group().unwrap(), Some(1)); + + // Move into RG 1 — peek now sees no further row groups. + let _batch2 = expect_data(decoder.try_decode()); + assert!(!decoder.is_at_row_group_boundary()); + assert_eq!(decoder.peek_next_row_group().unwrap(), None); + + let _batch3 = expect_data(decoder.try_decode()); + expect_finished(decoder.try_decode()); + } + + /// Peeking is a read-only operation: calling it repeatedly between + /// `try_next_reader` calls must never change which row group the + /// reader path actually produces. Drives the decoder all the way + /// through and asserts each peek/read pair agrees. + #[test] + fn test_peek_next_row_group_does_not_mutate_state() { + let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .build() + .unwrap(); + + // Two row groups expected — drive both, asserting peek/read agree. + for expected_rg in [0usize, 1usize] { + assert!(decoder.is_at_row_group_boundary()); + + // Multiple peeks before reading must all agree, and must not + // disturb the upcoming read. + let first_peek = decoder.peek_next_row_group().unwrap(); + let second_peek = decoder.peek_next_row_group().unwrap(); + let third_peek = decoder.peek_next_row_group().unwrap(); + assert_eq!(first_peek, Some(expected_rg)); + assert_eq!(first_peek, second_peek); + assert_eq!(second_peek, third_peek); + + // Now read for real and confirm the decoder hands back exactly + // what peek promised. + let ranges = expect_needs_data(decoder.try_next_reader()); + push_ranges_to_decoder(&mut decoder, ranges); + let reader = expect_data(decoder.try_next_reader()); + for batch in reader { + let _ = batch.unwrap(); + } + } + + // Decoder is drained. Peek must agree with `try_next_reader`'s + // terminal state. + assert_eq!(decoder.peek_next_row_group().unwrap(), None); + expect_finished(decoder.try_next_reader()); + } + /// `into_builder` between row groups recovers a builder for the /// not-yet-decoded row groups; rebuilding it with a new row filter /// applies that filter to the subsequent row groups while leaving the diff --git a/parquet/src/arrow/push_decoder/remaining.rs b/parquet/src/arrow/push_decoder/remaining.rs index d1070d2aa69f..93bd32a33244 100644 --- a/parquet/src/arrow/push_decoder/remaining.rs +++ b/parquet/src/arrow/push_decoder/remaining.rs @@ -16,7 +16,7 @@ // under the License. use crate::DecodeResult; -use crate::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection}; +use crate::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection, RowSelector}; use crate::arrow::push_decoder::reader_builder::{ RowBudget, RowGroupBuildResult, RowGroupReaderBuilder, RowGroupReaderBuilderParts, }; @@ -37,6 +37,58 @@ enum QueuedRowGroupDecision { Skip { remaining_budget: RowBudget }, } +/// Borrowed cursor over a [`RowSelection`] that counts selected rows in +/// each row group's slice without mutating the selection. +/// +/// Used by [`RowGroupFrontier::peek_next_row_group`] to walk per-row-group +/// selection slices without cloning the underlying selectors. +struct PeekSelectionCursor<'a> { + iter: Box + 'a>, + /// The selector currently being consumed (only partly used). + current: Option<&'a RowSelector>, + /// Rows already consumed from `current`. + consumed_in_current: usize, +} + +impl<'a> PeekSelectionCursor<'a> { + fn new(selection: &'a RowSelection) -> Self { + Self { + iter: Box::new(selection.iter()), + current: None, + consumed_in_current: 0, + } + } + + /// Consume the next `row_count` rows from the cursor and return the + /// number of those rows that are selected (i.e. `!selector.skip`). + /// Advances the cursor past the consumed range. + fn take(&mut self, row_count: usize) -> usize { + let mut selected = 0usize; + let mut remaining = row_count; + while remaining > 0 { + if self.current.is_none() { + self.current = self.iter.next(); + self.consumed_in_current = 0; + if self.current.is_none() { + break; + } + } + let selector = self.current.expect("current selector present"); + let available = selector.row_count.saturating_sub(self.consumed_in_current); + let consume = available.min(remaining); + if !selector.skip { + selected += consume; + } + remaining -= consume; + self.consumed_in_current += consume; + if self.consumed_in_current >= selector.row_count { + self.current = None; + } + } + selected + } +} + /// Work item handed from [`RowGroupFrontier`] to [`RowGroupReaderBuilder`]. #[derive(Debug)] struct NextRowGroup { @@ -93,6 +145,62 @@ impl RowGroupFrontier { self.budget = budget; } + /// Peek at the next row-group index `next_readable_row_group` would + /// hand out, without mutating any state. Returns `None` if every + /// remaining row group would be skipped under the current + /// selection/budget, or if the queue is empty. + /// + /// Walks the queued frontier via [`PeekSelectionCursor`] so the + /// real `RowSelection` is not cloned. The Read/Skip rule inlined + /// below is intentionally kept in lock-step with + /// [`Self::plan_selected_row_group`]; both touch a small enough + /// set of decisions that a shared helper would obscure more than + /// it saves. The `peek_matches_next_readable_first_hit` test + /// asserts the lock-step on the head element across a range of + /// inputs. + fn peek_next_row_group(&self) -> Result, ParquetError> { + // Short-circuit: budget exhausted or selection drained ⇒ same + // outcome as `next_readable_row_group`'s early return. + if self.budget.is_exhausted() + || self + .selection + .as_ref() + .is_some_and(|selection| selection.row_count() == 0) + { + return Ok(None); + } + + let mut cursor = self.selection.as_ref().map(PeekSelectionCursor::new); + let mut budget = self.budget; + for &row_group_idx in &self.row_groups { + if budget.is_exhausted() { + break; + } + let row_count = self.row_group_num_rows(row_group_idx)?; + let selected_rows = match cursor.as_mut() { + Some(cursor) => cursor.take(row_count), + None => row_count, + }; + if selected_rows == 0 { + // Selection-skip: mirrors `next_readable_row_group`'s + // "selected_rows == 0 ⇒ pop_front, continue". + continue; + } + // Inline Read/Skip rule — keep in lock-step with + // `plan_selected_row_group`. + if self.has_predicates { + return Ok(Some(row_group_idx)); + } + let rows_after_budget = budget.rows_after(selected_rows); + if rows_after_budget != 0 { + return Ok(Some(row_group_idx)); + } + // Budget skip: advance the simulated budget and keep walking. + budget = budget.advance(selected_rows, rows_after_budget); + } + Ok(None) + } + fn clear_remaining(&mut self) { self.selection = None; self.row_groups.clear(); @@ -299,6 +407,22 @@ impl RemainingRowGroups { self.frontier.row_groups.len() } + /// Peek at the file-level row-group index that the next call to + /// [`Self::try_next_reader`] will produce a reader for, after + /// simulating the same skip logic [`Self::try_next_reader`] applies + /// internally (row-selection emptiness + offset/limit budget). Does + /// not mutate state. + /// + /// Returns `None` when the active row group is still being decoded, + /// when no row groups remain, or when every remaining row group + /// would be skipped under the current selection/budget. + pub fn peek_next_row_group(&self) -> Result, ParquetError> { + if self.row_group_reader_builder.has_active_row_group() { + return Ok(None); + } + self.frontier.peek_next_row_group() + } + /// returns [`ParquetRecordBatchReader`] suitable for reading the next /// group of rows from the Parquet data, or the list of data ranges still /// needed to proceed