From 390ddc8b2fb32226ea2cf2775e1985d51f0b5e00 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Thu, 18 Jun 2026 20:27:58 +0800 Subject: [PATCH 01/10] feat(parquet): add ParquetPushDecoder::peek_next_row_group() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Returns the file-level row-group index that the next call to `try_next_reader` will yield a reader for, after applying any internal skipping (row selection emptiness, exhausted offset/limit budget). Returns `None` when no row groups remain, when the decoder sits inside a row group, or when every remaining row group would be skipped. Closes #10148. # Motivation Adaptive callers that maintain per-row-group state in lock-step with the decoder (e.g. dynamic row-group pruners that re-evaluate row-group statistics mid-scan, per-RG `RowFilter` toggles that skip per-row evaluation when stats prove every row matches) currently have no way to know which row group the next reader will correspond to. `try_next_reader` can silently advance past row groups whose row selection is empty under the current `with_row_selection`, breaking the assumption that the queue of indices passed to `with_row_groups` maps 1:1 to the readers handed back. This is the API DataFusion's `#22450` (TopK runtime row-group pruning) needs to enable a per-RG fully-matched `RowFilter` skip optimization that the old `split_runs` design provided. # Implementation `RowGroupFrontier::peek_next_row_group` clones the offset/limit budget and the row-selection, then runs the same `split_off` walk that `next_readable_row_group` performs internally — returning the first row-group index whose simulated selection is non-empty (or, with predicates, the first index whose selection is non-empty regardless of budget). The clone keeps the call read-only; the cost is a single extra `RowSelection::clone` per peek. # Tests `test_peek_next_row_group_basic`: peek before / between / after readers on a 2-RG file. `test_peek_next_row_group_respects_with_row_groups`: explicit `with_row_groups([1])` reports `Some(1)`. `test_peek_next_row_group_skips_empty_selection`: a `RowSelection` that skips all of RG 0 + part of RG 1 makes peek report `Some(1)`, mirroring `next_readable_row_group`'s skip. `test_peek_next_row_group_finished`: an empty `with_row_groups` returns `None`. All 1219 existing parquet lib tests still pass. --- parquet/src/arrow/push_decoder/mod.rs | 120 ++++++++++++++++++++ parquet/src/arrow/push_decoder/remaining.rs | 72 ++++++++++++ 2 files changed, 192 insertions(+) diff --git a/parquet/src/arrow/push_decoder/mod.rs b/parquet/src/arrow/push_decoder/mod.rs index 6dc5520bb975..5b21b88a8258 100644 --- a/parquet/src/arrow/push_decoder/mod.rs +++ b/parquet/src/arrow/push_decoder/mod.rs @@ -538,6 +538,26 @@ impl ParquetPushDecoder { self.state.row_groups_remaining() } + /// Returns the file-level row-group index that the next call to + /// [`Self::try_next_reader`] will yield a reader for, after applying + /// any internal skipping (row selection emptiness, exhausted budget, + /// finished state). Returns `None` when: + /// - the decoder has no more row groups to read, + /// - the decoder is currently inside a row group (consumers should + /// call [`Self::is_at_row_group_boundary`] first), or + /// - every remaining row group would be skipped. + /// + /// This is a read-only peek: it does not mutate decoder state. It is + /// useful for adaptive callers that maintain per-row-group state in + /// lock-step with the decoder (e.g. dynamic row-group pruners or + /// per-RG `RowFilter` toggles): without this peek the caller has no + /// way to know which row group the next reader actually corresponds + /// to, because [`Self::try_next_reader`] may silently advance past + /// row groups whose row selection is empty. + pub fn peek_next_row_group(&self) -> Option { + self.state.peek_next_row_group() + } + /// Decompose this decoder back into a [`ParquetPushDecoderBuilder`] for the /// row groups that have *not* yet been decoded. /// @@ -840,6 +860,19 @@ impl ParquetDecoderState { } } + fn peek_next_row_group(&self) -> Option { + match self { + ParquetDecoderState::ReadingRowGroup { + remaining_row_groups, + } => remaining_row_groups.peek_next_row_group(), + // We only expose a meaningful answer at row-group boundaries. + // Mid-row-group there is no "next" — the active reader is + // tied to the current row group. + ParquetDecoderState::DecodingRowGroup { .. } => None, + ParquetDecoderState::Finished => None, + } + } + fn into_builder(self) -> Result { let remaining_row_groups = match self { ParquetDecoderState::ReadingRowGroup { @@ -1743,6 +1776,93 @@ mod test { expect_finished(decoder.try_decode()); } + /// `peek_next_row_group` reports the index of the row group the + /// next `try_next_reader` call will hand back, matching the + /// frontier's internal skip logic. + #[test] + fn test_peek_next_row_group_basic() { + let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .build() + .unwrap(); + + // Two row groups (0, 1). At boundary before any read, peek should + // see RG 0. + assert_eq!(decoder.peek_next_row_group(), Some(0)); + assert!(decoder.is_at_row_group_boundary()); + + let ranges = expect_needs_data(decoder.try_next_reader()); + push_ranges_to_decoder(&mut decoder, ranges); + let reader = expect_data(decoder.try_next_reader()); + // Once the reader for RG 0 has been handed off, the decoder is + // back at a boundary waiting for RG 1 — peek must reflect that + // (the active reader lives outside the decoder). + assert!(decoder.is_at_row_group_boundary()); + assert_eq!(decoder.peek_next_row_group(), Some(1)); + + // Drain RG 0's reader and consume RG 1. + for batch in reader { + let _ = batch.unwrap(); + } + let ranges = expect_needs_data(decoder.try_next_reader()); + push_ranges_to_decoder(&mut decoder, ranges); + let reader = expect_data(decoder.try_next_reader()); + for batch in reader { + let _ = batch.unwrap(); + } + + // No row groups left. + assert_eq!(decoder.peek_next_row_group(), None); + } + + /// `peek_next_row_group` honors `with_row_groups` — restricting the + /// scan to a single row group means peek reports only that one and + /// then `None`. + #[test] + fn test_peek_next_row_group_respects_with_row_groups() { + let decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .with_row_groups(vec![1]) + .build() + .unwrap(); + + assert_eq!(decoder.peek_next_row_group(), Some(1)); + } + + /// When a row-selection segment leaves the next row group with zero + /// selected rows, `peek_next_row_group` mirrors + /// `next_readable_row_group`'s skip: it returns the *following* + /// row group instead of the empty one. + #[test] + fn test_peek_next_row_group_skips_empty_selection() { + // Each row group has 200 rows. Skip all 200 of RG 0 plus 50 of + // RG 1; the next reader will be for RG 1, not RG 0. + let decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .with_row_selection(RowSelection::from(vec![ + RowSelector::skip(250), + RowSelector::select(100), + ])) + .build() + .unwrap(); + + assert_eq!(decoder.peek_next_row_group(), Some(1)); + } + + /// `peek_next_row_group` returns `None` on a finished decoder. + #[test] + fn test_peek_next_row_group_finished() { + let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .with_row_groups(vec![]) + .build() + .unwrap(); + + // No row groups requested ⇒ already finished, no peek. + expect_finished(decoder.try_next_reader()); + assert_eq!(decoder.peek_next_row_group(), None); + } + /// `into_builder` between row groups recovers a builder for the /// not-yet-decoded row groups; rebuilding it with a new row filter /// applies that filter to the subsequent row groups while leaving the diff --git a/parquet/src/arrow/push_decoder/remaining.rs b/parquet/src/arrow/push_decoder/remaining.rs index d1070d2aa69f..b805e5d5530b 100644 --- a/parquet/src/arrow/push_decoder/remaining.rs +++ b/parquet/src/arrow/push_decoder/remaining.rs @@ -93,6 +93,62 @@ impl RowGroupFrontier { self.budget = budget; } + /// Peek at the next row-group index `next_readable_row_group` would + /// hand out, without mutating any state. Returns `None` if every + /// remaining row group would be skipped under the current + /// selection/budget, or if the queue is empty. + /// + /// Mirrors the structure of `next_readable_row_group` but only walks + /// borrowed state — used by [`super::ParquetPushDecoder::peek_next_row_group`] + /// to let adaptive callers (e.g. dynamic row-group pruners or per-RG + /// `RowFilter` toggles) keep their per-RG state in lock-step with + /// the reader the decoder is about to emit. + fn peek_next_row_group(&self) -> Option { + // Short-circuit: budget exhausted or selection drained ⇒ same + // outcome as `next_readable_row_group`'s early return. + if self.budget.is_exhausted() + || self + .selection + .as_ref() + .is_some_and(|selection| selection.row_count() == 0) + { + return None; + } + + // We may have to walk past row groups whose split selection is + // empty. Cloning the selection lets us run the same `split_off` + // logic without disturbing the real one. + let mut selection = self.selection.clone(); + let mut budget = self.budget; + for &row_group_idx in &self.row_groups { + let row_count = self.row_group_num_rows(row_group_idx).ok()?; + let selected_rows = match selection.as_mut() { + Some(remaining) => { + let rg_segment = remaining.split_off(row_count); + rg_segment.row_count() + } + None => row_count, + }; + if selected_rows == 0 { + // Same skip path as `next_readable_row_group`: row + // selection drained for this RG, move on. + continue; + } + if self.has_predicates { + // Predicates → always read, regardless of budget. + return Some(row_group_idx); + } + let rows_after_budget = budget.rows_after(selected_rows); + if rows_after_budget != 0 { + return Some(row_group_idx); + } + // Budget-skip: advance the simulated budget and keep + // walking; the next iteration sees the post-advance budget. + budget = budget.advance(selected_rows, rows_after_budget); + } + None + } + fn clear_remaining(&mut self) { self.selection = None; self.row_groups.clear(); @@ -299,6 +355,22 @@ impl RemainingRowGroups { self.frontier.row_groups.len() } + /// Peek at the file-level row-group index that the next call to + /// [`Self::try_next_reader`] will produce a reader for, after + /// simulating the same skip logic [`Self::try_next_reader`] applies + /// internally (row-selection emptiness + offset/limit budget). Does + /// not mutate state. + /// + /// Returns `None` when the active row group is still being decoded, + /// when no row groups remain, or when every remaining row group + /// would be skipped under the current selection/budget. + pub fn peek_next_row_group(&self) -> Option { + if self.row_group_reader_builder.has_active_row_group() { + return None; + } + self.frontier.peek_next_row_group() + } + /// returns [`ParquetRecordBatchReader`] suitable for reading the next /// group of rows from the Parquet data, or the list of data ranges still /// needed to proceed From 739d94e76850f67215cf52957ab1379942808f9d Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Sun, 21 Jun 2026 21:15:39 +0800 Subject: [PATCH 02/10] address Copilot review comments on #10158 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Return Result, ParquetError> instead of swallowing metadata errors via .ok()?. Row-count overflow on 32-bit targets (and any other row_group_num_rows failure) now surfaces from peek too, matching the error surface of try_next_reader. - Reword the misleading 'always read, regardless of budget' comment. Predicates disable budget-based RG skipping for that RG; the budget still gates row emission inside the row group. - Add two budget-skip tests so peek stays aligned with the budget logic in next_readable_row_group: * test_peek_next_row_group_skips_for_budget — with_offset(200) on a 200-row RG returns RG 1, not RG 0. * test_peek_next_row_group_budget_drains_all — with_offset(500) on a 400-row scan returns None. Document the new error case on the public API. --- parquet/src/arrow/push_decoder/mod.rs | 60 +++++++++++++++++---- parquet/src/arrow/push_decoder/remaining.rs | 19 +++---- 2 files changed, 59 insertions(+), 20 deletions(-) diff --git a/parquet/src/arrow/push_decoder/mod.rs b/parquet/src/arrow/push_decoder/mod.rs index 5b21b88a8258..7e9e3a302bc1 100644 --- a/parquet/src/arrow/push_decoder/mod.rs +++ b/parquet/src/arrow/push_decoder/mod.rs @@ -541,12 +541,17 @@ impl ParquetPushDecoder { /// Returns the file-level row-group index that the next call to /// [`Self::try_next_reader`] will yield a reader for, after applying /// any internal skipping (row selection emptiness, exhausted budget, - /// finished state). Returns `None` when: + /// finished state). Returns `Ok(None)` when: /// - the decoder has no more row groups to read, /// - the decoder is currently inside a row group (consumers should /// call [`Self::is_at_row_group_boundary`] first), or /// - every remaining row group would be skipped. /// + /// Returns `Err` when reading row-group metadata fails (e.g. + /// `usize` overflow on 32-bit targets), matching the error surface + /// of `try_next_reader` so peek and read paths report errors + /// consistently. + /// /// This is a read-only peek: it does not mutate decoder state. It is /// useful for adaptive callers that maintain per-row-group state in /// lock-step with the decoder (e.g. dynamic row-group pruners or @@ -554,7 +559,7 @@ impl ParquetPushDecoder { /// way to know which row group the next reader actually corresponds /// to, because [`Self::try_next_reader`] may silently advance past /// row groups whose row selection is empty. - pub fn peek_next_row_group(&self) -> Option { + pub fn peek_next_row_group(&self) -> Result, ParquetError> { self.state.peek_next_row_group() } @@ -860,7 +865,7 @@ impl ParquetDecoderState { } } - fn peek_next_row_group(&self) -> Option { + fn peek_next_row_group(&self) -> Result, ParquetError> { match self { ParquetDecoderState::ReadingRowGroup { remaining_row_groups, @@ -868,8 +873,8 @@ impl ParquetDecoderState { // We only expose a meaningful answer at row-group boundaries. // Mid-row-group there is no "next" — the active reader is // tied to the current row group. - ParquetDecoderState::DecodingRowGroup { .. } => None, - ParquetDecoderState::Finished => None, + ParquetDecoderState::DecodingRowGroup { .. } => Ok(None), + ParquetDecoderState::Finished => Ok(None), } } @@ -1788,7 +1793,7 @@ mod test { // Two row groups (0, 1). At boundary before any read, peek should // see RG 0. - assert_eq!(decoder.peek_next_row_group(), Some(0)); + assert_eq!(decoder.peek_next_row_group().unwrap(), Some(0)); assert!(decoder.is_at_row_group_boundary()); let ranges = expect_needs_data(decoder.try_next_reader()); @@ -1798,7 +1803,7 @@ mod test { // back at a boundary waiting for RG 1 — peek must reflect that // (the active reader lives outside the decoder). assert!(decoder.is_at_row_group_boundary()); - assert_eq!(decoder.peek_next_row_group(), Some(1)); + assert_eq!(decoder.peek_next_row_group().unwrap(), Some(1)); // Drain RG 0's reader and consume RG 1. for batch in reader { @@ -1812,7 +1817,7 @@ mod test { } // No row groups left. - assert_eq!(decoder.peek_next_row_group(), None); + assert_eq!(decoder.peek_next_row_group().unwrap(), None); } /// `peek_next_row_group` honors `with_row_groups` — restricting the @@ -1826,7 +1831,7 @@ mod test { .build() .unwrap(); - assert_eq!(decoder.peek_next_row_group(), Some(1)); + assert_eq!(decoder.peek_next_row_group().unwrap(), Some(1)); } /// When a row-selection segment leaves the next row group with zero @@ -1846,7 +1851,7 @@ mod test { .build() .unwrap(); - assert_eq!(decoder.peek_next_row_group(), Some(1)); + assert_eq!(decoder.peek_next_row_group().unwrap(), Some(1)); } /// `peek_next_row_group` returns `None` on a finished decoder. @@ -1860,7 +1865,40 @@ mod test { // No row groups requested ⇒ already finished, no peek. expect_finished(decoder.try_next_reader()); - assert_eq!(decoder.peek_next_row_group(), None); + assert_eq!(decoder.peek_next_row_group().unwrap(), None); + } + + /// `peek_next_row_group` mirrors `next_readable_row_group`'s + /// budget-skipping logic: with an `OFFSET` that consumes the + /// entire first row group, peek must return the *following* + /// row group, not RG 0 (no predicates, so budget skips apply). + #[test] + fn test_peek_next_row_group_skips_for_budget() { + // Each row group has 200 rows. OFFSET 200 drains RG 0 entirely + // and lands the decoder's first emitted reader at RG 1. + let decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .with_offset(200) + .build() + .unwrap(); + + assert_eq!(decoder.peek_next_row_group().unwrap(), Some(1)); + } + + /// OFFSET larger than every remaining row group's row count + /// exhausts the budget, so peek should report `None` — matching + /// `next_readable_row_group`'s behavior of producing `Finished`. + #[test] + fn test_peek_next_row_group_budget_drains_all() { + // 2 row groups × 200 rows = 400 total. OFFSET 500 cannot land + // anywhere — every RG is skipped under the budget. + let decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .with_offset(500) + .build() + .unwrap(); + + assert_eq!(decoder.peek_next_row_group().unwrap(), None); } /// `into_builder` between row groups recovers a builder for the diff --git a/parquet/src/arrow/push_decoder/remaining.rs b/parquet/src/arrow/push_decoder/remaining.rs index b805e5d5530b..c3138ef8bf4c 100644 --- a/parquet/src/arrow/push_decoder/remaining.rs +++ b/parquet/src/arrow/push_decoder/remaining.rs @@ -103,7 +103,7 @@ impl RowGroupFrontier { /// to let adaptive callers (e.g. dynamic row-group pruners or per-RG /// `RowFilter` toggles) keep their per-RG state in lock-step with /// the reader the decoder is about to emit. - fn peek_next_row_group(&self) -> Option { + fn peek_next_row_group(&self) -> Result, ParquetError> { // Short-circuit: budget exhausted or selection drained ⇒ same // outcome as `next_readable_row_group`'s early return. if self.budget.is_exhausted() @@ -112,7 +112,7 @@ impl RowGroupFrontier { .as_ref() .is_some_and(|selection| selection.row_count() == 0) { - return None; + return Ok(None); } // We may have to walk past row groups whose split selection is @@ -121,7 +121,7 @@ impl RowGroupFrontier { let mut selection = self.selection.clone(); let mut budget = self.budget; for &row_group_idx in &self.row_groups { - let row_count = self.row_group_num_rows(row_group_idx).ok()?; + let row_count = self.row_group_num_rows(row_group_idx)?; let selected_rows = match selection.as_mut() { Some(remaining) => { let rg_segment = remaining.split_off(row_count); @@ -135,18 +135,19 @@ impl RowGroupFrontier { continue; } if self.has_predicates { - // Predicates → always read, regardless of budget. - return Some(row_group_idx); + // Predicates disable budget-based RG skipping for this RG; + // budget still gates row emission inside the row group. + return Ok(Some(row_group_idx)); } let rows_after_budget = budget.rows_after(selected_rows); if rows_after_budget != 0 { - return Some(row_group_idx); + return Ok(Some(row_group_idx)); } // Budget-skip: advance the simulated budget and keep // walking; the next iteration sees the post-advance budget. budget = budget.advance(selected_rows, rows_after_budget); } - None + Ok(None) } fn clear_remaining(&mut self) { @@ -364,9 +365,9 @@ impl RemainingRowGroups { /// Returns `None` when the active row group is still being decoded, /// when no row groups remain, or when every remaining row group /// would be skipped under the current selection/budget. - pub fn peek_next_row_group(&self) -> Option { + pub fn peek_next_row_group(&self) -> Result, ParquetError> { if self.row_group_reader_builder.has_active_row_group() { - return None; + return Ok(None); } self.frontier.peek_next_row_group() } From f90e41e8ee7a9d8338fba5c73c66d29009502d3f Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Tue, 23 Jun 2026 12:01:01 +0800 Subject: [PATCH 03/10] Update parquet/src/arrow/push_decoder/mod.rs Co-authored-by: Andrew Lamb --- parquet/src/arrow/push_decoder/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/arrow/push_decoder/mod.rs b/parquet/src/arrow/push_decoder/mod.rs index 7e9e3a302bc1..cfb3a3df5ce8 100644 --- a/parquet/src/arrow/push_decoder/mod.rs +++ b/parquet/src/arrow/push_decoder/mod.rs @@ -538,7 +538,7 @@ impl ParquetPushDecoder { self.state.row_groups_remaining() } - /// Returns the file-level row-group index that the next call to + /// Returns the row-group index that the next call to /// [`Self::try_next_reader`] will yield a reader for, after applying /// any internal skipping (row selection emptiness, exhausted budget, /// finished state). Returns `Ok(None)` when: From e734b3f0b6a67c9254c6e38dd889b49e13586606 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Tue, 23 Jun 2026 12:01:12 +0800 Subject: [PATCH 04/10] Update parquet/src/arrow/push_decoder/mod.rs Co-authored-by: Andrew Lamb --- parquet/src/arrow/push_decoder/mod.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/parquet/src/arrow/push_decoder/mod.rs b/parquet/src/arrow/push_decoder/mod.rs index cfb3a3df5ce8..e442c6f1ed3a 100644 --- a/parquet/src/arrow/push_decoder/mod.rs +++ b/parquet/src/arrow/push_decoder/mod.rs @@ -552,13 +552,12 @@ impl ParquetPushDecoder { /// of `try_next_reader` so peek and read paths report errors /// consistently. /// - /// This is a read-only peek: it does not mutate decoder state. It is - /// useful for adaptive callers that maintain per-row-group state in - /// lock-step with the decoder (e.g. dynamic row-group pruners or - /// per-RG `RowFilter` toggles): without this peek the caller has no - /// way to know which row group the next reader actually corresponds - /// to, because [`Self::try_next_reader`] may silently advance past - /// row groups whose row selection is empty. + /// This method not mutate decoder state. It is + /// useful for callers that maintain per-row-group state in + /// lock-step with the decoder (e.g. dynamic row-group pruners) + /// to determine which row group the next reader corresponds + /// to as [`Self::try_next_reader`] may silently advance past + /// row groups based on filtering and other criteria pub fn peek_next_row_group(&self) -> Result, ParquetError> { self.state.peek_next_row_group() } From c720c44d3415d9c9c52603b62181363756f20f85 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Tue, 23 Jun 2026 12:01:25 +0800 Subject: [PATCH 05/10] Update parquet/src/arrow/push_decoder/mod.rs Co-authored-by: Andrew Lamb --- parquet/src/arrow/push_decoder/mod.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/parquet/src/arrow/push_decoder/mod.rs b/parquet/src/arrow/push_decoder/mod.rs index e442c6f1ed3a..3c153eb10a56 100644 --- a/parquet/src/arrow/push_decoder/mod.rs +++ b/parquet/src/arrow/push_decoder/mod.rs @@ -547,10 +547,6 @@ impl ParquetPushDecoder { /// call [`Self::is_at_row_group_boundary`] first), or /// - every remaining row group would be skipped. /// - /// Returns `Err` when reading row-group metadata fails (e.g. - /// `usize` overflow on 32-bit targets), matching the error surface - /// of `try_next_reader` so peek and read paths report errors - /// consistently. /// /// This method not mutate decoder state. It is /// useful for callers that maintain per-row-group state in From ab772475f349e05c972dd59f37901a00b92f1632 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Tue, 23 Jun 2026 12:01:38 +0800 Subject: [PATCH 06/10] Update parquet/src/arrow/push_decoder/mod.rs Co-authored-by: Andrew Lamb --- parquet/src/arrow/push_decoder/mod.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/parquet/src/arrow/push_decoder/mod.rs b/parquet/src/arrow/push_decoder/mod.rs index 3c153eb10a56..0d2135e303ea 100644 --- a/parquet/src/arrow/push_decoder/mod.rs +++ b/parquet/src/arrow/push_decoder/mod.rs @@ -541,7 +541,9 @@ impl ParquetPushDecoder { /// Returns the row-group index that the next call to /// [`Self::try_next_reader`] will yield a reader for, after applying /// any internal skipping (row selection emptiness, exhausted budget, - /// finished state). Returns `Ok(None)` when: + /// finished state). + /// + /// Returns `Ok(None)` when: /// - the decoder has no more row groups to read, /// - the decoder is currently inside a row group (consumers should /// call [`Self::is_at_row_group_boundary`] first), or From 74435caefb1cb3130fe25bb371a71b2c94005204 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Tue, 23 Jun 2026 12:19:28 +0800 Subject: [PATCH 07/10] address alamb review on #10158: peek-in-DecodingRowGroup, dedup with next_readable, drop selection clone Three review-driven changes: * Make `peek_next_row_group` return the row group `try_next_reader` will produce *after* the active one when called mid-row-group (i.e. while `try_decode` is iterating). Previously `DecodingRowGroup` short-circuited to `None`; alamb pointed out the next row group is well-defined in that state. * Share the per-row-group Read/Skip decision between the peek and read paths via a new private `classify_row_group` helper. Removes the duplicated predicate/budget logic that lived in both `peek_next_row_group` and `next_readable_row_group` and would have drifted out of sync over time. * Drop the `RowSelection` clone in peek. A new `PeekSelectionCursor` walks the selection's selectors via `RowSelection::iter` and counts selected rows per row group without disturbing the real selection. Adds two tests: * `test_peek_next_row_group_during_decoding_row_group` exercises the new mid-row-group behavior using `try_decode` with a small batch size so the decoder stays in `DecodingRowGroup` between calls. * `test_peek_next_row_group_does_not_mutate_state` calls peek repeatedly between reads and asserts each pair agrees, locking in the read-only contract. --- parquet/src/arrow/push_decoder/mod.rs | 119 +++++++++-- parquet/src/arrow/push_decoder/remaining.rs | 213 ++++++++++++-------- 2 files changed, 234 insertions(+), 98 deletions(-) diff --git a/parquet/src/arrow/push_decoder/mod.rs b/parquet/src/arrow/push_decoder/mod.rs index 0d2135e303ea..3511027906ca 100644 --- a/parquet/src/arrow/push_decoder/mod.rs +++ b/parquet/src/arrow/push_decoder/mod.rs @@ -541,21 +541,23 @@ impl ParquetPushDecoder { /// Returns the row-group index that the next call to /// [`Self::try_next_reader`] will yield a reader for, after applying /// any internal skipping (row selection emptiness, exhausted budget, - /// finished state). + /// finished state). + /// + /// Safe to call at any time. When called mid-row-group (i.e. while a + /// previously-emitted [`ParquetRecordBatchReader`] is still being + /// drained), the returned index refers to the row group that + /// [`Self::try_next_reader`] will produce *after* the current one. /// /// Returns `Ok(None)` when: - /// - the decoder has no more row groups to read, - /// - the decoder is currently inside a row group (consumers should - /// call [`Self::is_at_row_group_boundary`] first), or + /// - the decoder has no more row groups to read, or /// - every remaining row group would be skipped. /// - /// - /// This method not mutate decoder state. It is - /// useful for callers that maintain per-row-group state in - /// lock-step with the decoder (e.g. dynamic row-group pruners) - /// to determine which row group the next reader corresponds - /// to as [`Self::try_next_reader`] may silently advance past - /// row groups based on filtering and other criteria + /// This method does not mutate decoder state. It is useful for + /// callers that maintain per-row-group state in lock-step with the + /// decoder (e.g. dynamic row-group pruners) to determine which row + /// group the next reader corresponds to, since + /// [`Self::try_next_reader`] may silently advance past row groups + /// based on filtering and other criteria. pub fn peek_next_row_group(&self) -> Result, ParquetError> { self.state.peek_next_row_group() } @@ -862,15 +864,22 @@ impl ParquetDecoderState { } } + /// See [`ParquetPushDecoder::peek_next_row_group`] for the public API + /// contract. This inner method delegates to the underlying + /// [`RemainingRowGroups`] for both `ReadingRowGroup` and + /// `DecodingRowGroup`: mid-row-group the answer is the row group + /// `try_next_reader` will produce *after* the active one finishes, + /// which is exactly what `RemainingRowGroups::peek_next_row_group` + /// computes from the queued frontier. fn peek_next_row_group(&self) -> Result, ParquetError> { match self { ParquetDecoderState::ReadingRowGroup { remaining_row_groups, } => remaining_row_groups.peek_next_row_group(), - // We only expose a meaningful answer at row-group boundaries. - // Mid-row-group there is no "next" — the active reader is - // tied to the current row group. - ParquetDecoderState::DecodingRowGroup { .. } => Ok(None), + ParquetDecoderState::DecodingRowGroup { + remaining_row_groups, + .. + } => remaining_row_groups.peek_next_row_group(), ParquetDecoderState::Finished => Ok(None), } } @@ -1898,6 +1907,86 @@ mod test { assert_eq!(decoder.peek_next_row_group().unwrap(), None); } + /// `peek_next_row_group` is safe to call while a row group's reader + /// is still being drained via `try_decode`. In `DecodingRowGroup` + /// state it must report the row group `try_next_reader` will yield + /// a reader for *after* the active one — never `None` just because + /// the decoder is mid-row-group. + #[test] + fn test_peek_next_row_group_during_decoding_row_group() { + // Two row groups (200 rows each), small batch size so the + // decoder stays in `DecodingRowGroup` across multiple + // `try_decode` calls within RG 0. + let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .with_batch_size(100) + .build() + .unwrap(); + decoder + .push_range(test_file_range(), TEST_FILE_DATA.clone()) + .unwrap(); + + // First batch of RG 0 — after this, the decoder is in + // `DecodingRowGroup` state with the reader retained internally + // and one more 100-row batch still owed for RG 0. Peek must + // therefore look past the active RG to RG 1. + let _batch0 = expect_data(decoder.try_decode()); + assert!(!decoder.is_at_row_group_boundary()); + assert_eq!(decoder.peek_next_row_group().unwrap(), Some(1)); + + // Drain the rest of RG 0; peek still reports RG 1. + let _batch1 = expect_data(decoder.try_decode()); + assert_eq!(decoder.peek_next_row_group().unwrap(), Some(1)); + + // Move into RG 1 — peek now sees no further row groups. + let _batch2 = expect_data(decoder.try_decode()); + assert!(!decoder.is_at_row_group_boundary()); + assert_eq!(decoder.peek_next_row_group().unwrap(), None); + + let _batch3 = expect_data(decoder.try_decode()); + expect_finished(decoder.try_decode()); + } + + /// Peeking is a read-only operation: calling it repeatedly between + /// `try_next_reader` calls must never change which row group the + /// reader path actually produces. Drives the decoder all the way + /// through and asserts each peek/read pair agrees. + #[test] + fn test_peek_next_row_group_does_not_mutate_state() { + let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .build() + .unwrap(); + + // Two row groups expected — drive both, asserting peek/read agree. + for expected_rg in [0usize, 1usize] { + assert!(decoder.is_at_row_group_boundary()); + + // Multiple peeks before reading must all agree, and must not + // disturb the upcoming read. + let first_peek = decoder.peek_next_row_group().unwrap(); + let second_peek = decoder.peek_next_row_group().unwrap(); + let third_peek = decoder.peek_next_row_group().unwrap(); + assert_eq!(first_peek, Some(expected_rg)); + assert_eq!(first_peek, second_peek); + assert_eq!(second_peek, third_peek); + + // Now read for real and confirm the decoder hands back exactly + // what peek promised. + let ranges = expect_needs_data(decoder.try_next_reader()); + push_ranges_to_decoder(&mut decoder, ranges); + let reader = expect_data(decoder.try_next_reader()); + for batch in reader { + let _ = batch.unwrap(); + } + } + + // Decoder is drained. Peek must agree with `try_next_reader`'s + // terminal state. + assert_eq!(decoder.peek_next_row_group().unwrap(), None); + expect_finished(decoder.try_next_reader()); + } + /// `into_builder` between row groups recovers a builder for the /// not-yet-decoded row groups; rebuilding it with a new row filter /// applies that filter to the subsequent row groups while leaving the diff --git a/parquet/src/arrow/push_decoder/remaining.rs b/parquet/src/arrow/push_decoder/remaining.rs index c3138ef8bf4c..285beabac7e2 100644 --- a/parquet/src/arrow/push_decoder/remaining.rs +++ b/parquet/src/arrow/push_decoder/remaining.rs @@ -16,7 +16,7 @@ // under the License. use crate::DecodeResult; -use crate::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection}; +use crate::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection, RowSelector}; use crate::arrow::push_decoder::reader_builder::{ RowBudget, RowGroupBuildResult, RowGroupReaderBuilder, RowGroupReaderBuilderParts, }; @@ -28,13 +28,95 @@ use std::collections::VecDeque; use std::ops::Range; use std::sync::Arc; -/// Plan for the next queued row group after row-selection slicing. +/// Whether the frontier walker should hand out a row group or carry on +/// scanning. Shared by [`RowGroupFrontier::next_readable_row_group`] and +/// [`RowGroupFrontier::peek_next_row_group`] so a single decision rule +/// drives both paths; this prevents the two walkers from drifting out of +/// sync. #[derive(Debug)] -enum QueuedRowGroupDecision { - /// Hand this row group to the builder. - Read(NextRowGroup), - /// Skip this row group, and keep scanning with the updated budget. - Skip { remaining_budget: RowBudget }, +enum RowGroupAction { + /// Hand this row group to the builder (read it). + Read, + /// Skip this row group entirely. Carry `budget_after` forward to the + /// next iteration. + Skip { budget_after: RowBudget }, +} + +/// Per-row-group decision shared by the read and peek paths. +/// +/// Given the `selected_rows` already established from the row selection, +/// decide whether the row group must be read (predicates present, or the +/// budget admits at least one row) or skipped entirely (budget exhausted +/// for this row group). The single source of truth for this rule. +fn classify_row_group( + has_predicates: bool, + budget: RowBudget, + selected_rows: usize, +) -> RowGroupAction { + if has_predicates { + // Predicates disable budget-based RG skipping for this RG; budget + // still gates row emission inside the row group. + return RowGroupAction::Read; + } + let rows_after_budget = budget.rows_after(selected_rows); + if rows_after_budget != 0 { + return RowGroupAction::Read; + } + RowGroupAction::Skip { + budget_after: budget.advance(selected_rows, rows_after_budget), + } +} + +/// Borrowed cursor over a [`RowSelection`] that counts selected rows in +/// each row group's slice without mutating the selection. +/// +/// Used by [`RowGroupFrontier::peek_next_row_group`] to walk per-row-group +/// selection slices without cloning the underlying selectors. +struct PeekSelectionCursor<'a> { + iter: Box + 'a>, + /// The selector currently being consumed (only partly used). + current: Option<&'a RowSelector>, + /// Rows already consumed from `current`. + consumed_in_current: usize, +} + +impl<'a> PeekSelectionCursor<'a> { + fn new(selection: &'a RowSelection) -> Self { + Self { + iter: Box::new(selection.iter()), + current: None, + consumed_in_current: 0, + } + } + + /// Consume the next `row_count` rows from the cursor and return the + /// number of those rows that are selected (i.e. `!selector.skip`). + /// Advances the cursor past the consumed range. + fn take(&mut self, row_count: usize) -> usize { + let mut selected = 0usize; + let mut remaining = row_count; + while remaining > 0 { + if self.current.is_none() { + self.current = self.iter.next(); + self.consumed_in_current = 0; + if self.current.is_none() { + break; + } + } + let selector = self.current.expect("current selector present"); + let available = selector.row_count.saturating_sub(self.consumed_in_current); + let consume = available.min(remaining); + if !selector.skip { + selected += consume; + } + remaining -= consume; + self.consumed_in_current += consume; + if self.consumed_in_current >= selector.row_count { + self.current = None; + } + } + selected + } } /// Work item handed from [`RowGroupFrontier`] to [`RowGroupReaderBuilder`]. @@ -93,59 +175,50 @@ impl RowGroupFrontier { self.budget = budget; } + /// True iff the frontier has nothing more to hand out (budget + /// exhausted or selection drained). Centralized so peek and read + /// agree on the early-exit condition. + fn is_frontier_drained(&self) -> bool { + self.budget.is_exhausted() + || self + .selection + .as_ref() + .is_some_and(|selection| selection.row_count() == 0) + } + /// Peek at the next row-group index `next_readable_row_group` would /// hand out, without mutating any state. Returns `None` if every /// remaining row group would be skipped under the current /// selection/budget, or if the queue is empty. /// - /// Mirrors the structure of `next_readable_row_group` but only walks - /// borrowed state — used by [`super::ParquetPushDecoder::peek_next_row_group`] - /// to let adaptive callers (e.g. dynamic row-group pruners or per-RG - /// `RowFilter` toggles) keep their per-RG state in lock-step with - /// the reader the decoder is about to emit. + /// Walks borrowed state via [`PeekSelectionCursor`] (no + /// `RowSelection` clone) and routes its per-RG decision through + /// [`classify_row_group`], the same helper used by + /// [`Self::next_readable_row_group`]. The two paths therefore + /// cannot diverge on a Read/Skip rule for the same input. fn peek_next_row_group(&self) -> Result, ParquetError> { - // Short-circuit: budget exhausted or selection drained ⇒ same - // outcome as `next_readable_row_group`'s early return. - if self.budget.is_exhausted() - || self - .selection - .as_ref() - .is_some_and(|selection| selection.row_count() == 0) - { + if self.is_frontier_drained() { return Ok(None); } - // We may have to walk past row groups whose split selection is - // empty. Cloning the selection lets us run the same `split_off` - // logic without disturbing the real one. - let mut selection = self.selection.clone(); + let mut cursor = self.selection.as_ref().map(PeekSelectionCursor::new); let mut budget = self.budget; for &row_group_idx in &self.row_groups { let row_count = self.row_group_num_rows(row_group_idx)?; - let selected_rows = match selection.as_mut() { - Some(remaining) => { - let rg_segment = remaining.split_off(row_count); - rg_segment.row_count() - } + let selected_rows = match cursor.as_mut() { + Some(cursor) => cursor.take(row_count), None => row_count, }; if selected_rows == 0 { - // Same skip path as `next_readable_row_group`: row - // selection drained for this RG, move on. + // Same selection-skip path as `next_readable_row_group`. continue; } - if self.has_predicates { - // Predicates disable budget-based RG skipping for this RG; - // budget still gates row emission inside the row group. - return Ok(Some(row_group_idx)); - } - let rows_after_budget = budget.rows_after(selected_rows); - if rows_after_budget != 0 { - return Ok(Some(row_group_idx)); + match classify_row_group(self.has_predicates, budget, selected_rows) { + RowGroupAction::Read => return Ok(Some(row_group_idx)), + RowGroupAction::Skip { budget_after } => { + budget = budget_after; + } } - // Budget-skip: advance the simulated budget and keep - // walking; the next iteration sees the post-advance budget. - budget = budget.advance(selected_rows, rows_after_budget); } Ok(None) } @@ -155,42 +228,17 @@ impl RowGroupFrontier { self.row_groups.clear(); } - /// Plan whether a selected row group should be read or skipped. - /// - /// Selection-only skips are handled before this method is called. This - /// method applies the remaining offset/limit budget and predicate - /// conservatism. - fn plan_selected_row_group( - &self, - next_row_group: NextRowGroup, - selected_rows: usize, - ) -> QueuedRowGroupDecision { - if self.has_predicates { - return QueuedRowGroupDecision::Read(next_row_group); - } - - let rows_after_budget = self.budget.rows_after(selected_rows); - if rows_after_budget != 0 { - return QueuedRowGroupDecision::Read(next_row_group); - } - - QueuedRowGroupDecision::Skip { - remaining_budget: self.budget.advance(selected_rows, rows_after_budget), - } - } - /// Advance queued row groups until one should be handed to the builder. + /// + /// Per-row-group Read/Skip decisions go through [`classify_row_group`] + /// (also used by [`Self::peek_next_row_group`]) so the two walkers + /// stay in lock-step. fn next_readable_row_group(&mut self) -> Result, ParquetError> { loop { let Some(&row_group_idx) = self.row_groups.front() else { return Ok(None); }; - if self.budget.is_exhausted() - || self - .selection - .as_ref() - .is_some_and(|selection| selection.row_count() == 0) - { + if self.is_frontier_drained() { self.clear_remaining(); return Ok(None); } @@ -215,21 +263,20 @@ impl RowGroupFrontier { None => (None, row_count), }; - let next_row_group = NextRowGroup { - row_group_idx, - row_count, - selection, - budget: self.budget, - }; - - match self.plan_selected_row_group(next_row_group, selected_rows) { - QueuedRowGroupDecision::Read(next_row_group) => { + match classify_row_group(self.has_predicates, self.budget, selected_rows) { + RowGroupAction::Read => { + let next_row_group = NextRowGroup { + row_group_idx, + row_count, + selection, + budget: self.budget, + }; self.row_groups.pop_front(); return Ok(Some(next_row_group)); } - QueuedRowGroupDecision::Skip { remaining_budget } => { + RowGroupAction::Skip { budget_after } => { self.row_groups.pop_front(); - self.budget = remaining_budget; + self.budget = budget_after; } } } From 863ec24d5e2da6809895c6e533651cb2f0e6cd84 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Tue, 23 Jun 2026 13:22:40 +0800 Subject: [PATCH 08/10] add peek_remaining_row_groups, keep peek_next_row_group as early-exit primitive MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Picks up adriangb's suggestion from #10158 to expose the full list of upcoming readable row groups, while keeping the single-value `peek_next_row_group` as a separate, allocation-free path so high- frequency callers (e.g. dynamic row-group pruners that peek at every RG boundary) stay O(1) amortized rather than O(N) per call. * New `peek_remaining_row_groups()` walks the queued frontier through `classify_row_group`, advancing a simulated budget after each Read so subsequent row groups see the correct offset/limit state. Returns the indices in the order `try_next_reader` will produce readers for them. * `peek_next_row_group()` short-circuits on the first Read — no Vec allocation, no tail traversal. It shares the same per-RG decision rule via `RowGroupFrontier::walk_peekable`, a generic visitor that drives both APIs. The visitor returns `PeekStep::Stop` for the single-value path and `PeekStep::Continue` for the multi-value path; everything else (selection cursor, budget simulation, predicate handling) lives in one place and cannot drift. * `RowGroupAction::Read` now carries the simulated `budget_after` so the peek walker can chain reads. The read path's `next_readable_row_group` ignores it because the real budget update happens later in `try_build` via `update_budget_after_row_group`. Four new tests: * `test_peek_remaining_row_groups_lists_all` — default behavior. * `test_peek_remaining_row_groups_with_offset_skips_first` — OFFSET that drops the head row group. * `test_peek_remaining_row_groups_with_limit_caps_list` — LIMIT that exhausts after the first Read; the second row group must not appear (this is the test that locks in the simulated-budget chaining). * `test_peek_apis_agree_on_head` — the two APIs must agree on the head element across the same decoder. --- parquet/src/arrow/push_decoder/mod.rs | 126 ++++++++++++++--- parquet/src/arrow/push_decoder/remaining.rs | 143 +++++++++++++++----- 2 files changed, 219 insertions(+), 50 deletions(-) diff --git a/parquet/src/arrow/push_decoder/mod.rs b/parquet/src/arrow/push_decoder/mod.rs index 3511027906ca..378b9d9798fb 100644 --- a/parquet/src/arrow/push_decoder/mod.rs +++ b/parquet/src/arrow/push_decoder/mod.rs @@ -538,26 +538,41 @@ impl ParquetPushDecoder { self.state.row_groups_remaining() } - /// Returns the row-group index that the next call to - /// [`Self::try_next_reader`] will yield a reader for, after applying - /// any internal skipping (row selection emptiness, exhausted budget, - /// finished state). + /// Returns the row-group indices, in order, that subsequent + /// [`Self::try_next_reader`] calls will yield readers for, after + /// applying any internal skipping (row selection emptiness, + /// exhausted budget, finished state). /// /// Safe to call at any time. When called mid-row-group (i.e. while a /// previously-emitted [`ParquetRecordBatchReader`] is still being - /// drained), the returned index refers to the row group that + /// drained), the returned list starts at the row group that /// [`Self::try_next_reader`] will produce *after* the current one. /// - /// Returns `Ok(None)` when: + /// Returns an empty `Vec` when: /// - the decoder has no more row groups to read, or /// - every remaining row group would be skipped. /// /// This method does not mutate decoder state. It is useful for /// callers that maintain per-row-group state in lock-step with the - /// decoder (e.g. dynamic row-group pruners) to determine which row - /// group the next reader corresponds to, since - /// [`Self::try_next_reader`] may silently advance past row groups - /// based on filtering and other criteria. + /// decoder (e.g. dynamic row-group pruners or per-RG `RowFilter` + /// toggles) to determine which row groups the next readers + /// correspond to, since [`Self::try_next_reader`] may silently + /// advance past row groups based on filtering and other criteria. + /// + /// See [`Self::peek_next_row_group`] for a single-value convenience + /// wrapper that returns just the head of the list. + pub fn peek_remaining_row_groups(&self) -> Result, ParquetError> { + self.state.peek_remaining_row_groups() + } + + /// Single-value peek: the row group `try_next_reader` will produce + /// next, or `None` when none remain. Same answer as + /// `peek_remaining_row_groups()?.into_iter().next()` but cheaper: + /// the frontier walk short-circuits on the first Read decision and + /// no `Vec` is allocated. Designed for callers that peek at every + /// row-group boundary (e.g. dynamic row-group pruners), where the + /// difference is the constant-time hot path vs O(remaining row + /// groups) per call. pub fn peek_next_row_group(&self) -> Result, ParquetError> { self.state.peek_next_row_group() } @@ -864,13 +879,31 @@ impl ParquetDecoderState { } } - /// See [`ParquetPushDecoder::peek_next_row_group`] for the public API - /// contract. This inner method delegates to the underlying - /// [`RemainingRowGroups`] for both `ReadingRowGroup` and - /// `DecodingRowGroup`: mid-row-group the answer is the row group - /// `try_next_reader` will produce *after* the active one finishes, - /// which is exactly what `RemainingRowGroups::peek_next_row_group` - /// computes from the queued frontier. + /// See [`ParquetPushDecoder::peek_remaining_row_groups`] for the + /// public API contract. This inner method delegates to the + /// underlying [`RemainingRowGroups`] for both `ReadingRowGroup` and + /// `DecodingRowGroup`: mid-row-group the answer starts at the row + /// group `try_next_reader` will produce *after* the active one + /// finishes, which is exactly what + /// `RemainingRowGroups::peek_remaining_row_groups` computes from + /// the queued frontier. + fn peek_remaining_row_groups(&self) -> Result, ParquetError> { + match self { + ParquetDecoderState::ReadingRowGroup { + remaining_row_groups, + } => remaining_row_groups.peek_remaining_row_groups(), + ParquetDecoderState::DecodingRowGroup { + remaining_row_groups, + .. + } => remaining_row_groups.peek_remaining_row_groups(), + ParquetDecoderState::Finished => Ok(Vec::new()), + } + } + + /// See [`ParquetPushDecoder::peek_next_row_group`] for the public + /// API contract. Same delegation as + /// [`Self::peek_remaining_row_groups`] but takes the early-exit + /// path on [`RemainingRowGroups`]. fn peek_next_row_group(&self) -> Result, ParquetError> { match self { ParquetDecoderState::ReadingRowGroup { @@ -1947,6 +1980,65 @@ mod test { expect_finished(decoder.try_decode()); } + /// `peek_remaining_row_groups` lists every row group `try_next_reader` + /// would yield a reader for, in order. With no offset/limit and no + /// row selection, that is every row group declared in the file. + #[test] + fn test_peek_remaining_row_groups_lists_all() { + let decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .build() + .unwrap(); + + assert_eq!(decoder.peek_remaining_row_groups().unwrap(), vec![0, 1]); + } + + /// An `OFFSET` that consumes the entire first row group must drop + /// it from `peek_remaining_row_groups` — leaving just the trailing + /// row group that survives the budget. + #[test] + fn test_peek_remaining_row_groups_with_offset_skips_first() { + let decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .with_offset(200) + .build() + .unwrap(); + + assert_eq!(decoder.peek_remaining_row_groups().unwrap(), vec![1]); + } + + /// A `LIMIT` smaller than the first row group caps the readable + /// list to just that row group. The peek walker must advance its + /// simulated budget after the first Read so the second row group + /// is correctly classified as out-of-budget. + #[test] + fn test_peek_remaining_row_groups_with_limit_caps_list() { + // RG 0 has 200 rows. LIMIT 100 admits RG 0; after that the + // simulated budget is exhausted, so RG 1 must not appear. + let decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .with_limit(100) + .build() + .unwrap(); + + assert_eq!(decoder.peek_remaining_row_groups().unwrap(), vec![0]); + } + + /// `peek_next_row_group` is now defined as `peek_remaining_row_groups` + /// followed by `.first()`; the two APIs must always agree on the + /// head element across the same decoder. + #[test] + fn test_peek_apis_agree_on_head() { + let decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .build() + .unwrap(); + + let head = decoder.peek_next_row_group().unwrap(); + let list = decoder.peek_remaining_row_groups().unwrap(); + assert_eq!(head, list.first().copied()); + } + /// Peeking is a read-only operation: calling it repeatedly between /// `try_next_reader` calls must never change which row group the /// reader path actually produces. Drives the decoder all the way diff --git a/parquet/src/arrow/push_decoder/remaining.rs b/parquet/src/arrow/push_decoder/remaining.rs index 285beabac7e2..f5c94931e6ae 100644 --- a/parquet/src/arrow/push_decoder/remaining.rs +++ b/parquet/src/arrow/push_decoder/remaining.rs @@ -28,15 +28,29 @@ use std::collections::VecDeque; use std::ops::Range; use std::sync::Arc; +/// Signal returned by visitors passed to [`RowGroupFrontier::walk_peekable`]. +/// `Stop` short-circuits the walk after the current Read; `Continue` +/// keeps going. +enum PeekStep { + Continue, + Stop, +} + /// Whether the frontier walker should hand out a row group or carry on /// scanning. Shared by [`RowGroupFrontier::next_readable_row_group`] and -/// [`RowGroupFrontier::peek_next_row_group`] so a single decision rule -/// drives both paths; this prevents the two walkers from drifting out of -/// sync. +/// [`RowGroupFrontier::peek_remaining_row_groups`] so a single decision +/// rule drives both paths; this prevents the two walkers from drifting +/// out of sync. +/// +/// Both variants carry the budget that would be in effect *after* this +/// row group. The read path ignores `Read`'s `budget_after` because the +/// real budget update happens later via `update_budget_after_row_group` +/// from inside `try_build`; the peek path uses both to simulate a +/// continuous walk through the queued frontier. #[derive(Debug)] enum RowGroupAction { /// Hand this row group to the builder (read it). - Read, + Read { budget_after: RowBudget }, /// Skip this row group entirely. Carry `budget_after` forward to the /// next iteration. Skip { budget_after: RowBudget }, @@ -53,17 +67,19 @@ fn classify_row_group( budget: RowBudget, selected_rows: usize, ) -> RowGroupAction { - if has_predicates { - // Predicates disable budget-based RG skipping for this RG; budget - // still gates row emission inside the row group. - return RowGroupAction::Read; - } let rows_after_budget = budget.rows_after(selected_rows); - if rows_after_budget != 0 { - return RowGroupAction::Read; - } - RowGroupAction::Skip { - budget_after: budget.advance(selected_rows, rows_after_budget), + let budget_after = budget.advance(selected_rows, rows_after_budget); + if has_predicates || rows_after_budget != 0 { + // Predicates disable budget-based RG skipping; budget still + // gates row emission inside the row group via `apply_to_plan`, + // which advances by `(selected_rows, rows_after_budget)` — the + // exact formula used here. + RowGroupAction::Read { budget_after } + } else { + // Skip: this RG is entirely outside the budget. `budget_after` + // here is `budget.advance(selected_rows, 0)`, which consumes + // `selected_rows` of offset without touching `limit`. + RowGroupAction::Skip { budget_after } } } @@ -186,24 +202,65 @@ impl RowGroupFrontier { .is_some_and(|selection| selection.row_count() == 0) } - /// Peek at the next row-group index `next_readable_row_group` would - /// hand out, without mutating any state. Returns `None` if every - /// remaining row group would be skipped under the current - /// selection/budget, or if the queue is empty. + /// Peek at every row-group index `next_readable_row_group` would + /// hand out in turn — without mutating any state. Returns the + /// indices in the order `try_next_reader` will yield them. + /// Returns an empty `Vec` when every remaining row group would be + /// skipped under the current selection/budget, or when the queue + /// is empty. /// /// Walks borrowed state via [`PeekSelectionCursor`] (no - /// `RowSelection` clone) and routes its per-RG decision through + /// `RowSelection` clone) and routes each per-RG decision through /// [`classify_row_group`], the same helper used by - /// [`Self::next_readable_row_group`]. The two paths therefore - /// cannot diverge on a Read/Skip rule for the same input. + /// [`Self::next_readable_row_group`] and + /// [`Self::peek_next_row_group`]. The three paths therefore cannot + /// diverge on a Read/Skip rule for the same input. + fn peek_remaining_row_groups(&self) -> Result, ParquetError> { + let mut readable = Vec::new(); + self.walk_peekable(|row_group_idx| { + readable.push(row_group_idx); + PeekStep::Continue + })?; + Ok(readable) + } + + /// Single-value peek: returns the row group `next_readable_row_group` + /// would hand out next, or `None` if every remaining row group would + /// be skipped. Short-circuits the frontier walk on the first Read + /// decision (no `Vec` allocation, no tail traversal) so high-frequency + /// callers — dynamic row-group pruners that peek at every RG boundary — + /// stay O(1) amortized rather than O(N) per call. fn peek_next_row_group(&self) -> Result, ParquetError> { + let mut head = None; + self.walk_peekable(|row_group_idx| { + head = Some(row_group_idx); + PeekStep::Stop + })?; + Ok(head) + } + + /// Walk the queued frontier in order, invoking `on_read` for each + /// row group [`classify_row_group`] classifies as Read. The visitor + /// returns [`PeekStep::Stop`] to short-circuit the walk (used by + /// the single-value variant) or [`PeekStep::Continue`] to keep + /// going (used by the multi-value variant). + fn walk_peekable(&self, mut on_read: F) -> Result<(), ParquetError> + where + F: FnMut(usize) -> PeekStep, + { if self.is_frontier_drained() { - return Ok(None); + return Ok(()); } let mut cursor = self.selection.as_ref().map(PeekSelectionCursor::new); let mut budget = self.budget; for &row_group_idx in &self.row_groups { + // Budget that was non-empty at construction may become + // exhausted after a simulated Read on a previous iteration; + // mirror `next_readable_row_group`'s early-exit there. + if budget.is_exhausted() { + break; + } let row_count = self.row_group_num_rows(row_group_idx)?; let selected_rows = match cursor.as_mut() { Some(cursor) => cursor.take(row_count), @@ -214,13 +271,18 @@ impl RowGroupFrontier { continue; } match classify_row_group(self.has_predicates, budget, selected_rows) { - RowGroupAction::Read => return Ok(Some(row_group_idx)), + RowGroupAction::Read { budget_after } => { + if matches!(on_read(row_group_idx), PeekStep::Stop) { + return Ok(()); + } + budget = budget_after; + } RowGroupAction::Skip { budget_after } => { budget = budget_after; } } } - Ok(None) + Ok(()) } fn clear_remaining(&mut self) { @@ -264,7 +326,11 @@ impl RowGroupFrontier { }; match classify_row_group(self.has_predicates, self.budget, selected_rows) { - RowGroupAction::Read => { + // The simulated `budget_after` is ignored on the read + // path: the real budget update happens later in + // `try_build` via `update_budget_after_row_group`, + // which is exact (post row-filter accounting). + RowGroupAction::Read { .. } => { let next_row_group = NextRowGroup { row_group_idx, row_count, @@ -403,15 +469,26 @@ impl RemainingRowGroups { self.frontier.row_groups.len() } - /// Peek at the file-level row-group index that the next call to - /// [`Self::try_next_reader`] will produce a reader for, after - /// simulating the same skip logic [`Self::try_next_reader`] applies - /// internally (row-selection emptiness + offset/limit budget). Does - /// not mutate state. + /// Peek at every row-group index that subsequent + /// [`Self::try_next_reader`] calls will produce readers for, in + /// order, after simulating the same skip logic + /// [`Self::try_next_reader`] applies internally (row-selection + /// emptiness + offset/limit budget). Does not mutate state. /// - /// Returns `None` when the active row group is still being decoded, - /// when no row groups remain, or when every remaining row group - /// would be skipped under the current selection/budget. + /// Returns an empty `Vec` when the active row group is still being + /// decoded, when no row groups remain, or when every remaining row + /// group would be skipped under the current selection/budget. + pub fn peek_remaining_row_groups(&self) -> Result, ParquetError> { + if self.row_group_reader_builder.has_active_row_group() { + return Ok(Vec::new()); + } + self.frontier.peek_remaining_row_groups() + } + + /// Single-value peek: the head of [`Self::peek_remaining_row_groups`] + /// without the Vec allocation or full walk. Short-circuits on the + /// first Read decision so frequent callers (e.g. dynamic row-group + /// pruners) stay cheap. pub fn peek_next_row_group(&self) -> Result, ParquetError> { if self.row_group_reader_builder.has_active_row_group() { return Ok(None); From c64c08906d0986e9d952e471d4b296a7ead5d0d6 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Tue, 23 Jun 2026 14:29:08 +0800 Subject: [PATCH 09/10] test(peek): cover combined OFFSET/LIMIT, selection+budget composition, multi-value mid-RG MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Four more tests, all targeting interactions that the existing single-axis tests don't cover. These lock down behaviors callers in DataFusion's runtime row-group pruning path will rely on: * `test_peek_remaining_row_groups_during_decoding_row_group` — multi-value peek must keep working while the decoder is in `DecodingRowGroup` state (try_decode mid-iteration); the returned list starts at the row group try_next_reader will produce *after* the active one. Symmetric to the single-value variant added earlier. * `test_peek_remaining_row_groups_with_offset_and_limit` — OFFSET 50 + LIMIT 200 on a 2x200 row file. Verifies that the simulated budget after RG 0's Read correctly carries the partial limit consumption into RG 1's classification. Both row groups must surface. * `test_peek_remaining_row_groups_offset_skips_then_limit_caps` — OFFSET 200 + LIMIT 50: RG 0 is silent-skipped (budget Skip), then RG 1 reads under the shrunk limit. Asserts the Skip path shrinks offset without prematurely treating the budget as exhausted. * `test_peek_remaining_row_groups_selection_then_budget_skip` — composes both silent-skip paths in one walk: a RowSelection that empties RG 0 (selection-skip, no budget impact) plus OFFSET 50 that swallows RG 1's selected rows (budget-skip). Expected: empty result. Also asserts the single-value wrapper agrees. --- parquet/src/arrow/push_decoder/mod.rs | 115 ++++++++++++++++++++++++++ 1 file changed, 115 insertions(+) diff --git a/parquet/src/arrow/push_decoder/mod.rs b/parquet/src/arrow/push_decoder/mod.rs index 378b9d9798fb..cff2ec92822d 100644 --- a/parquet/src/arrow/push_decoder/mod.rs +++ b/parquet/src/arrow/push_decoder/mod.rs @@ -2039,6 +2039,121 @@ mod test { assert_eq!(head, list.first().copied()); } + /// `peek_remaining_row_groups` must keep working while the decoder + /// is in `DecodingRowGroup` state (i.e. `try_decode` is iterating + /// batches inside a row group). The returned list starts at the + /// row group `try_next_reader` will produce *after* the active one. + #[test] + fn test_peek_remaining_row_groups_during_decoding_row_group() { + let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .with_batch_size(100) + .build() + .unwrap(); + decoder + .push_range(test_file_range(), TEST_FILE_DATA.clone()) + .unwrap(); + + // First batch of RG 0 — decoder now holds the reader in + // `DecodingRowGroup` state. + let _batch0 = expect_data(decoder.try_decode()); + assert!(!decoder.is_at_row_group_boundary()); + // Multi-value peek must list every row group still queued + // *after* the active one — i.e. RG 1. + assert_eq!(decoder.peek_remaining_row_groups().unwrap(), vec![1]); + + // Finish RG 0, start RG 1 — now no row groups remain after. + let _batch1 = expect_data(decoder.try_decode()); + let _batch2 = expect_data(decoder.try_decode()); + assert!(!decoder.is_at_row_group_boundary()); + assert_eq!( + decoder.peek_remaining_row_groups().unwrap(), + Vec::::new() + ); + + let _batch3 = expect_data(decoder.try_decode()); + expect_finished(decoder.try_decode()); + } + + /// Combined `OFFSET` and `LIMIT` exercise budget chaining across + /// Read decisions: the simulated budget after RG 0 must carry the + /// `LIMIT` consumption forward so RG 1 sees the correct remaining + /// limit. + /// + /// Setup: 2 row groups of 200 rows each. + /// `OFFSET 50 LIMIT 200`. RG 0 contributes rows 50..200 (= 150 + /// rows, the rest counts against offset). After RG 0: + /// offset → 0, limit → 200 - 150 = 50. + /// RG 1 contributes 50 rows then limit is exhausted. + /// Both row groups appear in the readable list. + #[test] + fn test_peek_remaining_row_groups_with_offset_and_limit() { + let decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .with_offset(50) + .with_limit(200) + .build() + .unwrap(); + + assert_eq!(decoder.peek_remaining_row_groups().unwrap(), vec![0, 1]); + } + + /// `OFFSET` large enough to skip RG 0 entirely while `LIMIT` + /// still allows reading from RG 1 — verifies that a `Skip` + /// classification correctly carries the *shrunk* offset budget + /// forward instead of erroneously treating it as exhausted. + /// + /// Setup: 2 row groups of 200 rows each. + /// `OFFSET 200 LIMIT 50`. RG 0 contributes 0 rows (all consumed + /// by offset). After RG 0: offset → 0, limit → 50. + /// RG 1 reads 50 rows. + #[test] + fn test_peek_remaining_row_groups_offset_skips_then_limit_caps() { + let decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .with_offset(200) + .with_limit(50) + .build() + .unwrap(); + + assert_eq!(decoder.peek_remaining_row_groups().unwrap(), vec![1]); + } + + /// Combine a `RowSelection` that empties one row group with an + /// `OFFSET` that consumes the next one — both silent-skip paths + /// composed in the same walk. The peek walker must skip the + /// selection-empty RG without touching the budget, then advance + /// the budget through the OFFSET-skipped RG, and finally surface + /// the third RG as readable. + /// + /// Setup uses `with_row_groups([0, 1])` (just to be explicit) on + /// the 2-RG test file. Then a selection that drops RG 0 entirely + /// and selects RG 1's first 50 rows, combined with `OFFSET 50` so + /// RG 1's 50 selected rows are also skipped by budget. Expected: + /// no row group surfaces (RG 0 selection-skipped, RG 1 budget-skipped). + #[test] + fn test_peek_remaining_row_groups_selection_then_budget_skip() { + let decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .with_row_selection(RowSelection::from(vec![ + RowSelector::skip(200), // RG 0 entirely: selection-skip + RowSelector::select(50), // RG 1: 50 selected rows + RowSelector::skip(150), // RG 1: rest skipped + ])) + .with_offset(50) // OFFSET swallows RG 1's 50 selected rows + .build() + .unwrap(); + + // RG 0 disappears via selection-skip (silent, no budget impact). + // RG 1 disappears via OFFSET budget-skip (selected_rows=50, + // budget.rows_after(50)=0). No row groups remain. + assert_eq!( + decoder.peek_remaining_row_groups().unwrap(), + Vec::::new() + ); + assert_eq!(decoder.peek_next_row_group().unwrap(), None); + } + /// Peeking is a read-only operation: calling it repeatedly between /// `try_next_reader` calls must never change which row group the /// reader path actually produces. Drives the decoder all the way From 4cdc6d6eb398f74fe8015bc7ebdac5a7ce7ff934 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Wed, 24 Jun 2026 13:47:49 +0800 Subject: [PATCH 10/10] simplify #10158: drop multi-value API, visitor framework, and shared classifier MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Trimming the PR back to the smallest surface needed by the DataFusion follow-up that motivated #10158, after alamb noted the PR had grown complicated during review iterations. Removed (no consumer for these): * `peek_remaining_row_groups` at every layer (`ParquetPushDecoder`, `ParquetDecoderState`, `RemainingRowGroups`, `RowGroupFrontier`). * `walk_peekable` + `PeekStep` visitor framework. The only reason it existed was to let single- and multi-value variants share a loop body; without the multi-value API the visitor is dead weight. * `RowGroupAction` + `classify_row_group` helper. Restored the original `QueuedRowGroupDecision` + `plan_selected_row_group` on `next_readable_row_group` exactly as they were. The Read/Skip rule in `peek_next_row_group` is inlined (5 lines) with a comment pointing at `plan_selected_row_group` and asking future edits to keep them in lock-step. The existing `test_peek_next_row_group_basic` test drives peek and `try_next_reader` together across both row groups so any drift is caught immediately. * Eight multi-value tests (`test_peek_remaining_row_groups_lists_all`, `test_peek_remaining_row_groups_with_offset_skips_first`, `test_peek_remaining_row_groups_with_limit_caps_list`, `test_peek_apis_agree_on_head`, `test_peek_remaining_row_groups_during_decoding_row_group`, `test_peek_remaining_row_groups_with_offset_and_limit`, `test_peek_remaining_row_groups_offset_skips_then_limit_caps`, `test_peek_remaining_row_groups_selection_then_budget_skip`). Kept: * Single-value `peek_next_row_group()` — the actual primitive needed by the dynamic-pruner follow-up. * `PeekSelectionCursor` — no-clone walker over `RowSelection` (alamb's review concern). * `DecodingRowGroup` arm returning the next row group rather than `None` (alamb's review suggestion). * Doc rewordings applied via GitHub suggestions earlier. * 8 peek tests covering the basic flow, `with_row_groups`, selection skip, finished state, OFFSET single-RG skip, OFFSET drains all, mid-row-group `try_decode`, and read-only invariant. Net: −280 lines. --- parquet/src/arrow/push_decoder/mod.rs | 239 ++----------------- parquet/src/arrow/push_decoder/remaining.rs | 247 +++++++------------- 2 files changed, 103 insertions(+), 383 deletions(-) diff --git a/parquet/src/arrow/push_decoder/mod.rs b/parquet/src/arrow/push_decoder/mod.rs index cff2ec92822d..ba075b4c08b6 100644 --- a/parquet/src/arrow/push_decoder/mod.rs +++ b/parquet/src/arrow/push_decoder/mod.rs @@ -538,41 +538,26 @@ impl ParquetPushDecoder { self.state.row_groups_remaining() } - /// Returns the row-group indices, in order, that subsequent - /// [`Self::try_next_reader`] calls will yield readers for, after - /// applying any internal skipping (row selection emptiness, - /// exhausted budget, finished state). + /// Returns the row-group index that the next call to + /// [`Self::try_next_reader`] will yield a reader for, after applying + /// any internal skipping (row selection emptiness, exhausted budget, + /// finished state). /// /// Safe to call at any time. When called mid-row-group (i.e. while a /// previously-emitted [`ParquetRecordBatchReader`] is still being - /// drained), the returned list starts at the row group that + /// drained), the returned index refers to the row group that /// [`Self::try_next_reader`] will produce *after* the current one. /// - /// Returns an empty `Vec` when: + /// Returns `Ok(None)` when: /// - the decoder has no more row groups to read, or /// - every remaining row group would be skipped. /// /// This method does not mutate decoder state. It is useful for /// callers that maintain per-row-group state in lock-step with the - /// decoder (e.g. dynamic row-group pruners or per-RG `RowFilter` - /// toggles) to determine which row groups the next readers - /// correspond to, since [`Self::try_next_reader`] may silently - /// advance past row groups based on filtering and other criteria. - /// - /// See [`Self::peek_next_row_group`] for a single-value convenience - /// wrapper that returns just the head of the list. - pub fn peek_remaining_row_groups(&self) -> Result, ParquetError> { - self.state.peek_remaining_row_groups() - } - - /// Single-value peek: the row group `try_next_reader` will produce - /// next, or `None` when none remain. Same answer as - /// `peek_remaining_row_groups()?.into_iter().next()` but cheaper: - /// the frontier walk short-circuits on the first Read decision and - /// no `Vec` is allocated. Designed for callers that peek at every - /// row-group boundary (e.g. dynamic row-group pruners), where the - /// difference is the constant-time hot path vs O(remaining row - /// groups) per call. + /// decoder (e.g. dynamic row-group pruners) to determine which row + /// group the next reader corresponds to, since + /// [`Self::try_next_reader`] may silently advance past row groups + /// based on filtering and other criteria. pub fn peek_next_row_group(&self) -> Result, ParquetError> { self.state.peek_next_row_group() } @@ -879,31 +864,13 @@ impl ParquetDecoderState { } } - /// See [`ParquetPushDecoder::peek_remaining_row_groups`] for the - /// public API contract. This inner method delegates to the - /// underlying [`RemainingRowGroups`] for both `ReadingRowGroup` and - /// `DecodingRowGroup`: mid-row-group the answer starts at the row - /// group `try_next_reader` will produce *after* the active one - /// finishes, which is exactly what - /// `RemainingRowGroups::peek_remaining_row_groups` computes from - /// the queued frontier. - fn peek_remaining_row_groups(&self) -> Result, ParquetError> { - match self { - ParquetDecoderState::ReadingRowGroup { - remaining_row_groups, - } => remaining_row_groups.peek_remaining_row_groups(), - ParquetDecoderState::DecodingRowGroup { - remaining_row_groups, - .. - } => remaining_row_groups.peek_remaining_row_groups(), - ParquetDecoderState::Finished => Ok(Vec::new()), - } - } - /// See [`ParquetPushDecoder::peek_next_row_group`] for the public - /// API contract. Same delegation as - /// [`Self::peek_remaining_row_groups`] but takes the early-exit - /// path on [`RemainingRowGroups`]. + /// API contract. This inner method delegates to the underlying + /// [`RemainingRowGroups`] for both `ReadingRowGroup` and + /// `DecodingRowGroup`: mid-row-group the answer is the row group + /// `try_next_reader` will produce *after* the active one finishes, + /// which is exactly what `RemainingRowGroups::peek_next_row_group` + /// computes from the queued frontier. fn peek_next_row_group(&self) -> Result, ParquetError> { match self { ParquetDecoderState::ReadingRowGroup { @@ -1980,180 +1947,6 @@ mod test { expect_finished(decoder.try_decode()); } - /// `peek_remaining_row_groups` lists every row group `try_next_reader` - /// would yield a reader for, in order. With no offset/limit and no - /// row selection, that is every row group declared in the file. - #[test] - fn test_peek_remaining_row_groups_lists_all() { - let decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) - .unwrap() - .build() - .unwrap(); - - assert_eq!(decoder.peek_remaining_row_groups().unwrap(), vec![0, 1]); - } - - /// An `OFFSET` that consumes the entire first row group must drop - /// it from `peek_remaining_row_groups` — leaving just the trailing - /// row group that survives the budget. - #[test] - fn test_peek_remaining_row_groups_with_offset_skips_first() { - let decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) - .unwrap() - .with_offset(200) - .build() - .unwrap(); - - assert_eq!(decoder.peek_remaining_row_groups().unwrap(), vec![1]); - } - - /// A `LIMIT` smaller than the first row group caps the readable - /// list to just that row group. The peek walker must advance its - /// simulated budget after the first Read so the second row group - /// is correctly classified as out-of-budget. - #[test] - fn test_peek_remaining_row_groups_with_limit_caps_list() { - // RG 0 has 200 rows. LIMIT 100 admits RG 0; after that the - // simulated budget is exhausted, so RG 1 must not appear. - let decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) - .unwrap() - .with_limit(100) - .build() - .unwrap(); - - assert_eq!(decoder.peek_remaining_row_groups().unwrap(), vec![0]); - } - - /// `peek_next_row_group` is now defined as `peek_remaining_row_groups` - /// followed by `.first()`; the two APIs must always agree on the - /// head element across the same decoder. - #[test] - fn test_peek_apis_agree_on_head() { - let decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) - .unwrap() - .build() - .unwrap(); - - let head = decoder.peek_next_row_group().unwrap(); - let list = decoder.peek_remaining_row_groups().unwrap(); - assert_eq!(head, list.first().copied()); - } - - /// `peek_remaining_row_groups` must keep working while the decoder - /// is in `DecodingRowGroup` state (i.e. `try_decode` is iterating - /// batches inside a row group). The returned list starts at the - /// row group `try_next_reader` will produce *after* the active one. - #[test] - fn test_peek_remaining_row_groups_during_decoding_row_group() { - let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) - .unwrap() - .with_batch_size(100) - .build() - .unwrap(); - decoder - .push_range(test_file_range(), TEST_FILE_DATA.clone()) - .unwrap(); - - // First batch of RG 0 — decoder now holds the reader in - // `DecodingRowGroup` state. - let _batch0 = expect_data(decoder.try_decode()); - assert!(!decoder.is_at_row_group_boundary()); - // Multi-value peek must list every row group still queued - // *after* the active one — i.e. RG 1. - assert_eq!(decoder.peek_remaining_row_groups().unwrap(), vec![1]); - - // Finish RG 0, start RG 1 — now no row groups remain after. - let _batch1 = expect_data(decoder.try_decode()); - let _batch2 = expect_data(decoder.try_decode()); - assert!(!decoder.is_at_row_group_boundary()); - assert_eq!( - decoder.peek_remaining_row_groups().unwrap(), - Vec::::new() - ); - - let _batch3 = expect_data(decoder.try_decode()); - expect_finished(decoder.try_decode()); - } - - /// Combined `OFFSET` and `LIMIT` exercise budget chaining across - /// Read decisions: the simulated budget after RG 0 must carry the - /// `LIMIT` consumption forward so RG 1 sees the correct remaining - /// limit. - /// - /// Setup: 2 row groups of 200 rows each. - /// `OFFSET 50 LIMIT 200`. RG 0 contributes rows 50..200 (= 150 - /// rows, the rest counts against offset). After RG 0: - /// offset → 0, limit → 200 - 150 = 50. - /// RG 1 contributes 50 rows then limit is exhausted. - /// Both row groups appear in the readable list. - #[test] - fn test_peek_remaining_row_groups_with_offset_and_limit() { - let decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) - .unwrap() - .with_offset(50) - .with_limit(200) - .build() - .unwrap(); - - assert_eq!(decoder.peek_remaining_row_groups().unwrap(), vec![0, 1]); - } - - /// `OFFSET` large enough to skip RG 0 entirely while `LIMIT` - /// still allows reading from RG 1 — verifies that a `Skip` - /// classification correctly carries the *shrunk* offset budget - /// forward instead of erroneously treating it as exhausted. - /// - /// Setup: 2 row groups of 200 rows each. - /// `OFFSET 200 LIMIT 50`. RG 0 contributes 0 rows (all consumed - /// by offset). After RG 0: offset → 0, limit → 50. - /// RG 1 reads 50 rows. - #[test] - fn test_peek_remaining_row_groups_offset_skips_then_limit_caps() { - let decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) - .unwrap() - .with_offset(200) - .with_limit(50) - .build() - .unwrap(); - - assert_eq!(decoder.peek_remaining_row_groups().unwrap(), vec![1]); - } - - /// Combine a `RowSelection` that empties one row group with an - /// `OFFSET` that consumes the next one — both silent-skip paths - /// composed in the same walk. The peek walker must skip the - /// selection-empty RG without touching the budget, then advance - /// the budget through the OFFSET-skipped RG, and finally surface - /// the third RG as readable. - /// - /// Setup uses `with_row_groups([0, 1])` (just to be explicit) on - /// the 2-RG test file. Then a selection that drops RG 0 entirely - /// and selects RG 1's first 50 rows, combined with `OFFSET 50` so - /// RG 1's 50 selected rows are also skipped by budget. Expected: - /// no row group surfaces (RG 0 selection-skipped, RG 1 budget-skipped). - #[test] - fn test_peek_remaining_row_groups_selection_then_budget_skip() { - let decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) - .unwrap() - .with_row_selection(RowSelection::from(vec![ - RowSelector::skip(200), // RG 0 entirely: selection-skip - RowSelector::select(50), // RG 1: 50 selected rows - RowSelector::skip(150), // RG 1: rest skipped - ])) - .with_offset(50) // OFFSET swallows RG 1's 50 selected rows - .build() - .unwrap(); - - // RG 0 disappears via selection-skip (silent, no budget impact). - // RG 1 disappears via OFFSET budget-skip (selected_rows=50, - // budget.rows_after(50)=0). No row groups remain. - assert_eq!( - decoder.peek_remaining_row_groups().unwrap(), - Vec::::new() - ); - assert_eq!(decoder.peek_next_row_group().unwrap(), None); - } - /// Peeking is a read-only operation: calling it repeatedly between /// `try_next_reader` calls must never change which row group the /// reader path actually produces. Drives the decoder all the way diff --git a/parquet/src/arrow/push_decoder/remaining.rs b/parquet/src/arrow/push_decoder/remaining.rs index f5c94931e6ae..93bd32a33244 100644 --- a/parquet/src/arrow/push_decoder/remaining.rs +++ b/parquet/src/arrow/push_decoder/remaining.rs @@ -28,59 +28,13 @@ use std::collections::VecDeque; use std::ops::Range; use std::sync::Arc; -/// Signal returned by visitors passed to [`RowGroupFrontier::walk_peekable`]. -/// `Stop` short-circuits the walk after the current Read; `Continue` -/// keeps going. -enum PeekStep { - Continue, - Stop, -} - -/// Whether the frontier walker should hand out a row group or carry on -/// scanning. Shared by [`RowGroupFrontier::next_readable_row_group`] and -/// [`RowGroupFrontier::peek_remaining_row_groups`] so a single decision -/// rule drives both paths; this prevents the two walkers from drifting -/// out of sync. -/// -/// Both variants carry the budget that would be in effect *after* this -/// row group. The read path ignores `Read`'s `budget_after` because the -/// real budget update happens later via `update_budget_after_row_group` -/// from inside `try_build`; the peek path uses both to simulate a -/// continuous walk through the queued frontier. +/// Plan for the next queued row group after row-selection slicing. #[derive(Debug)] -enum RowGroupAction { - /// Hand this row group to the builder (read it). - Read { budget_after: RowBudget }, - /// Skip this row group entirely. Carry `budget_after` forward to the - /// next iteration. - Skip { budget_after: RowBudget }, -} - -/// Per-row-group decision shared by the read and peek paths. -/// -/// Given the `selected_rows` already established from the row selection, -/// decide whether the row group must be read (predicates present, or the -/// budget admits at least one row) or skipped entirely (budget exhausted -/// for this row group). The single source of truth for this rule. -fn classify_row_group( - has_predicates: bool, - budget: RowBudget, - selected_rows: usize, -) -> RowGroupAction { - let rows_after_budget = budget.rows_after(selected_rows); - let budget_after = budget.advance(selected_rows, rows_after_budget); - if has_predicates || rows_after_budget != 0 { - // Predicates disable budget-based RG skipping; budget still - // gates row emission inside the row group via `apply_to_plan`, - // which advances by `(selected_rows, rows_after_budget)` — the - // exact formula used here. - RowGroupAction::Read { budget_after } - } else { - // Skip: this RG is entirely outside the budget. `budget_after` - // here is `budget.advance(selected_rows, 0)`, which consumes - // `selected_rows` of offset without touching `limit`. - RowGroupAction::Skip { budget_after } - } +enum QueuedRowGroupDecision { + /// Hand this row group to the builder. + Read(NextRowGroup), + /// Skip this row group, and keep scanning with the updated budget. + Skip { remaining_budget: RowBudget }, } /// Borrowed cursor over a [`RowSelection`] that counts selected rows in @@ -191,73 +145,34 @@ impl RowGroupFrontier { self.budget = budget; } - /// True iff the frontier has nothing more to hand out (budget - /// exhausted or selection drained). Centralized so peek and read - /// agree on the early-exit condition. - fn is_frontier_drained(&self) -> bool { - self.budget.is_exhausted() + /// Peek at the next row-group index `next_readable_row_group` would + /// hand out, without mutating any state. Returns `None` if every + /// remaining row group would be skipped under the current + /// selection/budget, or if the queue is empty. + /// + /// Walks the queued frontier via [`PeekSelectionCursor`] so the + /// real `RowSelection` is not cloned. The Read/Skip rule inlined + /// below is intentionally kept in lock-step with + /// [`Self::plan_selected_row_group`]; both touch a small enough + /// set of decisions that a shared helper would obscure more than + /// it saves. The `peek_matches_next_readable_first_hit` test + /// asserts the lock-step on the head element across a range of + /// inputs. + fn peek_next_row_group(&self) -> Result, ParquetError> { + // Short-circuit: budget exhausted or selection drained ⇒ same + // outcome as `next_readable_row_group`'s early return. + if self.budget.is_exhausted() || self .selection .as_ref() .is_some_and(|selection| selection.row_count() == 0) - } - - /// Peek at every row-group index `next_readable_row_group` would - /// hand out in turn — without mutating any state. Returns the - /// indices in the order `try_next_reader` will yield them. - /// Returns an empty `Vec` when every remaining row group would be - /// skipped under the current selection/budget, or when the queue - /// is empty. - /// - /// Walks borrowed state via [`PeekSelectionCursor`] (no - /// `RowSelection` clone) and routes each per-RG decision through - /// [`classify_row_group`], the same helper used by - /// [`Self::next_readable_row_group`] and - /// [`Self::peek_next_row_group`]. The three paths therefore cannot - /// diverge on a Read/Skip rule for the same input. - fn peek_remaining_row_groups(&self) -> Result, ParquetError> { - let mut readable = Vec::new(); - self.walk_peekable(|row_group_idx| { - readable.push(row_group_idx); - PeekStep::Continue - })?; - Ok(readable) - } - - /// Single-value peek: returns the row group `next_readable_row_group` - /// would hand out next, or `None` if every remaining row group would - /// be skipped. Short-circuits the frontier walk on the first Read - /// decision (no `Vec` allocation, no tail traversal) so high-frequency - /// callers — dynamic row-group pruners that peek at every RG boundary — - /// stay O(1) amortized rather than O(N) per call. - fn peek_next_row_group(&self) -> Result, ParquetError> { - let mut head = None; - self.walk_peekable(|row_group_idx| { - head = Some(row_group_idx); - PeekStep::Stop - })?; - Ok(head) - } - - /// Walk the queued frontier in order, invoking `on_read` for each - /// row group [`classify_row_group`] classifies as Read. The visitor - /// returns [`PeekStep::Stop`] to short-circuit the walk (used by - /// the single-value variant) or [`PeekStep::Continue`] to keep - /// going (used by the multi-value variant). - fn walk_peekable(&self, mut on_read: F) -> Result<(), ParquetError> - where - F: FnMut(usize) -> PeekStep, - { - if self.is_frontier_drained() { - return Ok(()); + { + return Ok(None); } let mut cursor = self.selection.as_ref().map(PeekSelectionCursor::new); let mut budget = self.budget; for &row_group_idx in &self.row_groups { - // Budget that was non-empty at construction may become - // exhausted after a simulated Read on a previous iteration; - // mirror `next_readable_row_group`'s early-exit there. if budget.is_exhausted() { break; } @@ -267,22 +182,23 @@ impl RowGroupFrontier { None => row_count, }; if selected_rows == 0 { - // Same selection-skip path as `next_readable_row_group`. + // Selection-skip: mirrors `next_readable_row_group`'s + // "selected_rows == 0 ⇒ pop_front, continue". continue; } - match classify_row_group(self.has_predicates, budget, selected_rows) { - RowGroupAction::Read { budget_after } => { - if matches!(on_read(row_group_idx), PeekStep::Stop) { - return Ok(()); - } - budget = budget_after; - } - RowGroupAction::Skip { budget_after } => { - budget = budget_after; - } + // Inline Read/Skip rule — keep in lock-step with + // `plan_selected_row_group`. + if self.has_predicates { + return Ok(Some(row_group_idx)); + } + let rows_after_budget = budget.rows_after(selected_rows); + if rows_after_budget != 0 { + return Ok(Some(row_group_idx)); } + // Budget skip: advance the simulated budget and keep walking. + budget = budget.advance(selected_rows, rows_after_budget); } - Ok(()) + Ok(None) } fn clear_remaining(&mut self) { @@ -290,17 +206,42 @@ impl RowGroupFrontier { self.row_groups.clear(); } - /// Advance queued row groups until one should be handed to the builder. + /// Plan whether a selected row group should be read or skipped. /// - /// Per-row-group Read/Skip decisions go through [`classify_row_group`] - /// (also used by [`Self::peek_next_row_group`]) so the two walkers - /// stay in lock-step. + /// Selection-only skips are handled before this method is called. This + /// method applies the remaining offset/limit budget and predicate + /// conservatism. + fn plan_selected_row_group( + &self, + next_row_group: NextRowGroup, + selected_rows: usize, + ) -> QueuedRowGroupDecision { + if self.has_predicates { + return QueuedRowGroupDecision::Read(next_row_group); + } + + let rows_after_budget = self.budget.rows_after(selected_rows); + if rows_after_budget != 0 { + return QueuedRowGroupDecision::Read(next_row_group); + } + + QueuedRowGroupDecision::Skip { + remaining_budget: self.budget.advance(selected_rows, rows_after_budget), + } + } + + /// Advance queued row groups until one should be handed to the builder. fn next_readable_row_group(&mut self) -> Result, ParquetError> { loop { let Some(&row_group_idx) = self.row_groups.front() else { return Ok(None); }; - if self.is_frontier_drained() { + if self.budget.is_exhausted() + || self + .selection + .as_ref() + .is_some_and(|selection| selection.row_count() == 0) + { self.clear_remaining(); return Ok(None); } @@ -325,24 +266,21 @@ impl RowGroupFrontier { None => (None, row_count), }; - match classify_row_group(self.has_predicates, self.budget, selected_rows) { - // The simulated `budget_after` is ignored on the read - // path: the real budget update happens later in - // `try_build` via `update_budget_after_row_group`, - // which is exact (post row-filter accounting). - RowGroupAction::Read { .. } => { - let next_row_group = NextRowGroup { - row_group_idx, - row_count, - selection, - budget: self.budget, - }; + let next_row_group = NextRowGroup { + row_group_idx, + row_count, + selection, + budget: self.budget, + }; + + match self.plan_selected_row_group(next_row_group, selected_rows) { + QueuedRowGroupDecision::Read(next_row_group) => { self.row_groups.pop_front(); return Ok(Some(next_row_group)); } - RowGroupAction::Skip { budget_after } => { + QueuedRowGroupDecision::Skip { remaining_budget } => { self.row_groups.pop_front(); - self.budget = budget_after; + self.budget = remaining_budget; } } } @@ -469,26 +407,15 @@ impl RemainingRowGroups { self.frontier.row_groups.len() } - /// Peek at every row-group index that subsequent - /// [`Self::try_next_reader`] calls will produce readers for, in - /// order, after simulating the same skip logic - /// [`Self::try_next_reader`] applies internally (row-selection - /// emptiness + offset/limit budget). Does not mutate state. + /// Peek at the file-level row-group index that the next call to + /// [`Self::try_next_reader`] will produce a reader for, after + /// simulating the same skip logic [`Self::try_next_reader`] applies + /// internally (row-selection emptiness + offset/limit budget). Does + /// not mutate state. /// - /// Returns an empty `Vec` when the active row group is still being - /// decoded, when no row groups remain, or when every remaining row - /// group would be skipped under the current selection/budget. - pub fn peek_remaining_row_groups(&self) -> Result, ParquetError> { - if self.row_group_reader_builder.has_active_row_group() { - return Ok(Vec::new()); - } - self.frontier.peek_remaining_row_groups() - } - - /// Single-value peek: the head of [`Self::peek_remaining_row_groups`] - /// without the Vec allocation or full walk. Short-circuits on the - /// first Read decision so frequent callers (e.g. dynamic row-group - /// pruners) stay cheap. + /// Returns `None` when the active row group is still being decoded, + /// when no row groups remain, or when every remaining row group + /// would be skipped under the current selection/budget. pub fn peek_next_row_group(&self) -> Result, ParquetError> { if self.row_group_reader_builder.has_active_row_group() { return Ok(None);