From d55d59aea1773261f143a0dd80d766862c24abf7 Mon Sep 17 00:00:00 2001 From: Brent Gardner Date: Mon, 22 Jun 2026 09:41:01 -0600 Subject: [PATCH 1/2] feat(physical-expr): add Partitioning::DynamicRange variant Adds a new `Partitioning::DynamicRange(DynamicRangePartitioning)` variant alongside the existing `Partitioning::Range`. Where `Range` takes its split points as a plan-time constant (declared by a `TableProvider` or computed by a planner from statistics), `DynamicRange` describes the same model except the boundary set is only known once an upstream operator has observed its actual data range. The implementing operator is expected to discover the range at `execute()` time and compute interior split points before it routes the first row. ```rust pub struct DynamicRangePartitioning { ordering: LexOrdering, partition_count: usize, } ``` The number of output partitions is fixed at plan time so downstream distribution requirements have a stable answer; only the split point values are runtime-discovered. This PR only adds the variant + plumbing. Behavior mirrors `Range`: - `RepartitionExec`'s `repartitioned()`, `try_pushdown_sort()`, and projection-pushdown sites return `not_impl_err!` for the new variant, same as they already do for `Range`. - `RepartitionExec`'s row-routing path was already a catch-all `not_impl_err!` for non-Hash/non-RoundRobin variants, so no change is needed there. - `Partitioning::compatible_with`, `partition_count`, `project`, and `PartialEq` arms are added; `DynamicRange` is treated symmetrically to `Range`. - FFI bridges to `UnknownPartitioning(n)`, same path `Range` takes (per #22394). - Proto serialization returns `not_impl_err!`. Proto plumbing for `DynamicRange` will be added incrementally, mirroring how `Range` landed in steps. Three unit tests cover construction + display + partition_count, `compatible_with` (same/different ordering, same/different partition_count, single-partition, cross-variant), and `project` preservation/degradation. --- .../ffi/src/physical_expr/partitioning.rs | 3 + datafusion/physical-expr/src/partitioning.rs | 270 ++++++++++++++++++ .../physical-plan/src/repartition/mod.rs | 6 +- .../proto/src/physical_plan/to_proto.rs | 9 + 4 files changed, 285 insertions(+), 3 deletions(-) diff --git a/datafusion/ffi/src/physical_expr/partitioning.rs b/datafusion/ffi/src/physical_expr/partitioning.rs index eec437639e156..44fecd14e2e1b 100644 --- a/datafusion/ffi/src/physical_expr/partitioning.rs +++ b/datafusion/ffi/src/physical_expr/partitioning.rs @@ -50,6 +50,9 @@ impl From<&Partitioning> for FFI_Partitioning { Partitioning::Range(range) => { Self::UnknownPartitioning(range.partition_count()) } + Partitioning::DynamicRange(range) => { + Self::UnknownPartitioning(range.partition_count()) + } Partitioning::UnknownPartitioning(size) => Self::UnknownPartitioning(*size), } } diff --git a/datafusion/physical-expr/src/partitioning.rs b/datafusion/physical-expr/src/partitioning.rs index 2e0aaaf3fb4b7..2b78b972c42e5 100644 --- a/datafusion/physical-expr/src/partitioning.rs +++ b/datafusion/physical-expr/src/partitioning.rs @@ -122,6 +122,10 @@ pub enum Partitioning { Hash(Vec>, usize), /// Partition rows by source-declared ranges Range(RangePartitioning), + /// Partition rows by ranges whose split points are discovered at execution + /// time from upstream data, not declared at plan time. See + /// [`DynamicRangePartitioning`] for the contract. + DynamicRange(DynamicRangePartitioning), /// Unknown partitioning scheme with a known number of partitions UnknownPartitioning(usize), } @@ -139,6 +143,7 @@ impl Display for Partitioning { write!(f, "Hash([{phy_exprs_str}], {size})") } Partitioning::Range(range) => write!(f, "{range}"), + Partitioning::DynamicRange(range) => write!(f, "{range}"), Partitioning::UnknownPartitioning(size) => { write!(f, "UnknownPartitioning({size})") } @@ -344,6 +349,150 @@ fn format_range_split_points(split_points: &[SplitPoint]) -> String { .join(", ") } +/// Physical range partitioning where split points are discovered at execution +/// time from upstream data, not declared at plan time. +/// +/// Where [`RangePartitioning`] takes its split points as a plan-time +/// constant (declared by a `TableProvider` or computed by a planner from +/// statistics), `DynamicRangePartitioning` describes the same model except +/// the boundary set is only known once an upstream operator has observed +/// its actual data range. The implementing operator is expected to +/// discover the range at `execute()` time — typically by reading runtime +/// extrema from its input — and to compute interior split points before +/// it routes the first row. +/// +/// The number of output partitions is fixed at plan time so downstream +/// distribution requirements have a stable answer. Only the split point +/// values are runtime-discovered. +/// +/// Once the operator has computed split points, the partition contract is +/// the same as [`RangePartitioning`]: lexicographic ordering, half-open +/// intervals, one row per output partition. +/// +/// NOTE: Optimizer and execution behavior for this partitioning is +/// intentionally not implemented and will be introduced incrementally. +/// See . +#[derive(Debug, Clone, PartialEq)] +pub struct DynamicRangePartitioning { + /// Ordered partitioning key. + ordering: LexOrdering, + /// Number of output partitions. Fixed at plan time; split points + /// between them are discovered at execute time. + partition_count: usize, +} + +impl DynamicRangePartitioning { + /// Creates dynamic range partitioning metadata. + /// + /// `partition_count` must be at least 1. + pub fn new(ordering: LexOrdering, partition_count: usize) -> Self { + Self { + ordering, + partition_count, + } + } + + /// Returns the ordering that defines the range key. + pub fn ordering(&self) -> &LexOrdering { + &self.ordering + } + + /// Returns the number of output partitions. + pub fn partition_count(&self) -> usize { + self.partition_count + } + + /// Returns true when `self` and `other` describe the same dynamic range + /// partition map. + /// + /// Single-partition dynamic range partitionings are always compatible. + /// Otherwise the two partitionings must have the same partition count + /// and equivalent ordering expressions with the same sort options. + /// Split points are not compared because neither side has them yet. + pub fn compatible_with( + &self, + other: &Self, + eq_properties: &EquivalenceProperties, + ) -> bool { + if self.partition_count == 1 && other.partition_count == 1 { + return true; + } + + if self.partition_count != other.partition_count + || self.ordering.len() != other.ordering.len() + { + return false; + } + + if !self + .ordering + .iter() + .zip(other.ordering.iter()) + .all(|(left, right)| left.options == right.options) + { + return false; + } + + let left_exprs = self + .ordering + .iter() + .map(|sort_expr| Arc::clone(&sort_expr.expr)) + .collect::>(); + let right_exprs = other + .ordering + .iter() + .map(|sort_expr| Arc::clone(&sort_expr.expr)) + .collect::>(); + + equivalent_exprs(&left_exprs, &right_exprs, eq_properties) + } + + /// Calculates the dynamic range partitioning after applying the given + /// projection. + /// + /// Returns `None` if any range key cannot be projected or if projection + /// collapses distinct range keys into duplicate output expressions. + fn project( + &self, + mapping: &ProjectionMapping, + input_eq_properties: &EquivalenceProperties, + ) -> Option { + let exprs = self + .ordering + .iter() + .map(|sort_expr| Arc::clone(&sort_expr.expr)) + .collect::>(); + let projected_exprs = input_eq_properties + .project_expressions(&exprs, mapping) + .collect::>>()?; + let sort_exprs = self + .ordering + .iter() + .zip(projected_exprs) + .map(|(sort_expr, expr)| PhysicalSortExpr::new(expr, sort_expr.options)) + .collect::>(); + let ordering = LexOrdering::new(sort_exprs)?; + if ordering.len() != self.ordering.len() { + return None; + } + + Some(Self { + ordering, + partition_count: self.partition_count, + }) + } +} + +impl Display for DynamicRangePartitioning { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "DynamicRange([{}], {})", + self.ordering, self.partition_count + ) + } +} + fn equivalent_exprs( left: &[Arc], right: &[Arc], @@ -403,6 +552,7 @@ impl Partitioning { match self { RoundRobinBatch(n) | Hash(_, n) | UnknownPartitioning(n) => *n, Range(range) => range.partition_count(), + DynamicRange(range) => range.partition_count(), } } @@ -438,6 +588,9 @@ impl Partitioning { (Partitioning::Range(left), Partitioning::Range(right)) => { left.compatible_with(right, eq_properties) } + (Partitioning::DynamicRange(left), Partitioning::DynamicRange(right)) => { + left.compatible_with(right, eq_properties) + } _ => false, } } @@ -526,6 +679,7 @@ impl Partitioning { } Partitioning::RoundRobinBatch(_) | Partitioning::Range(_) + | Partitioning::DynamicRange(_) | Partitioning::UnknownPartitioning(_) => { PartitioningSatisfaction::NotSatisfied } @@ -560,6 +714,13 @@ impl Partitioning { Partitioning::UnknownPartitioning(range.partition_count()) } } + Partitioning::DynamicRange(range) => { + if let Some(projected) = range.project(mapping, input_eq_properties) { + Partitioning::DynamicRange(projected) + } else { + Partitioning::UnknownPartitioning(range.partition_count()) + } + } Partitioning::RoundRobinBatch(_) | Partitioning::UnknownPartitioning(_) => { self.clone() } @@ -580,6 +741,9 @@ impl PartialEq for Partitioning { true } (Partitioning::Range(left), Partitioning::Range(right)) => left == right, + (Partitioning::DynamicRange(left), Partitioning::DynamicRange(right)) => { + left == right + } _ => false, } } @@ -743,6 +907,22 @@ mod tests { .expect("test range partitioning should be valid"), ) } + + fn dynamic_range( + &self, + indices: impl IntoIterator, + partition_count: usize, + ) -> DynamicRangePartitioning { + DynamicRangePartitioning::new(self.range_ordering(indices), partition_count) + } + + fn dynamic_range_partitioning( + &self, + indices: impl IntoIterator, + partition_count: usize, + ) -> Partitioning { + Partitioning::DynamicRange(self.dynamic_range(indices, partition_count)) + } } #[test] @@ -1310,6 +1490,96 @@ mod tests { Ok(()) } + #[test] + fn test_dynamic_range_partitioning_metadata() -> Result<()> { + let fixture = PartitioningTestFixture::int64(&["a", "b"])?; + let dynamic_range = fixture.dynamic_range([0], 4); + assert_eq!(dynamic_range.ordering()[0].to_string(), "a@0 ASC"); + assert_eq!(dynamic_range.partition_count(), 4); + + let partitioning = Partitioning::DynamicRange(dynamic_range); + assert_eq!(partitioning.partition_count(), 4); + assert_eq!(partitioning.to_string(), "DynamicRange([a@0 ASC], 4)"); + + Ok(()) + } + + #[test] + fn test_dynamic_range_partitioning_compatible_with() -> Result<()> { + let fixture = PartitioningTestFixture::int64(&["a", "b"])?; + let mut eq_properties = fixture.eq_properties.clone(); + eq_properties.add_equal_conditions(fixture.col(0), fixture.col(1))?; + + let range_a = fixture.dynamic_range([0], 4); + let range_a_same = fixture.dynamic_range([0], 4); + let range_b_equivalent = fixture.dynamic_range([1], 4); + let range_b_different_count = fixture.dynamic_range([1], 8); + let range_a_desc = DynamicRangePartitioning::new( + [fixture.range_sort_expr(0, SortOptions::new(true, false))].into(), + 4, + ); + let single_partition_range_a = fixture.dynamic_range([0], 1); + let single_partition_range_b = fixture.dynamic_range([1], 1); + + assert!(range_a.compatible_with(&range_a_same, &fixture.eq_properties)); + assert!(range_a.compatible_with(&range_b_equivalent, &eq_properties)); + assert!(!range_a.compatible_with(&range_b_equivalent, &fixture.eq_properties)); + assert!(!range_a.compatible_with(&range_b_different_count, &eq_properties)); + assert!(!range_a.compatible_with(&range_a_desc, &eq_properties)); + assert!( + single_partition_range_a + .compatible_with(&single_partition_range_b, &fixture.eq_properties) + ); + + // Through the Partitioning enum, with cross-variant mismatch: + assert!(fixture.dynamic_range_partitioning([0], 4).compatible_with( + &fixture.dynamic_range_partitioning([0], 4), + &fixture.eq_properties + )); + assert!( + !fixture.dynamic_range_partitioning([0], 4).compatible_with( + &fixture.hash_partitioning([0], 4), + &fixture.eq_properties + ) + ); + // DynamicRange vs declared Range are never compatible — they + // describe different operator contracts. + assert!(!fixture.dynamic_range_partitioning([0], 3).compatible_with( + &fixture.range_partitioning( + [0], + vec![int_split_point([10]), int_split_point([20])] + ), + &fixture.eq_properties + )); + + Ok(()) + } + + #[test] + fn test_dynamic_range_partitioning_project_preserves_or_degrades() -> Result<()> { + let fixture = PartitioningTestFixture::int64(&["a", "b"])?; + let dynamic_range = Partitioning::DynamicRange(DynamicRangePartitioning::new( + [fixture.range_sort_expr(1, SortOptions::new(true, false))].into(), + 4, + )); + + let keep_b_mapping = ProjectionMapping::from_indices(&[1], &fixture.schema)?; + let projected = dynamic_range.project(&keep_b_mapping, &fixture.eq_properties); + assert_eq!( + projected.to_string(), + "DynamicRange([b@0 DESC NULLS LAST], 4)" + ); + + let drop_b_mapping = ProjectionMapping::from_indices(&[0], &fixture.schema)?; + let projected = dynamic_range.project(&drop_b_mapping, &fixture.eq_properties); + let Partitioning::UnknownPartitioning(partition_count) = projected else { + panic!("expected UnknownPartitioning, got {projected:?}"); + }; + assert_eq!(partition_count, 4); + + Ok(()) + } + #[test] fn test_hash_partitioning_compatible_with() -> Result<()> { let fixture = PartitioningTestFixture::int64(&["a", "b"])?; diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 2298183485f55..6405eddd2b2d3 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -1439,7 +1439,7 @@ impl ExecutionPlan for RepartitionExec { } Partitioning::Hash(new_partitions, *size) } - Partitioning::Range(_) => { + Partitioning::Range(_) | Partitioning::DynamicRange(_) => { // Range partitioning optimizer propagation is tracked in // https://github.com/apache/datafusion/issues/22395 return not_impl_err!( @@ -1483,7 +1483,7 @@ impl ExecutionPlan for RepartitionExec { return Ok(SortOrderPushdownResult::Unsupported); } match self.partitioning() { - Partitioning::Range(_) => { + Partitioning::Range(_) | Partitioning::DynamicRange(_) => { // Range partitioning optimizer propagation is tracked in // https://github.com/apache/datafusion/issues/22395 return not_impl_err!( @@ -1517,7 +1517,7 @@ impl ExecutionPlan for RepartitionExec { RoundRobinBatch(_) => RoundRobinBatch(target_partitions), Hash(hash, _) => Hash(hash, target_partitions), UnknownPartitioning(_) => UnknownPartitioning(target_partitions), - Range(_) => { + Range(_) | DynamicRange(_) => { // Range repartition execution is tracked in // https://github.com/apache/datafusion/issues/22397 return not_impl_err!( diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 7310c0928eee4..a498758c52a2f 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -380,6 +380,15 @@ pub fn serialize_partitioning( serialize_range_partitioning(range, codec, proto_converter)?, )), }, + Partitioning::DynamicRange(_) => { + // Proto plumbing for DynamicRange is intentionally not + // implemented in the variant-introduction PR and will be + // added incrementally. See + // . + return not_impl_err!( + "Serialization of DynamicRange partitioning is not implemented" + ); + } Partitioning::UnknownPartitioning(partition_count) => protobuf::Partitioning { partition_method: Some(protobuf::partitioning::PartitionMethod::Unknown( *partition_count as u64, From e6f846ed9b1e6985109f17e40b09f6d3a3bba954 Mon Sep 17 00:00:00 2001 From: Brent Gardner Date: Mon, 22 Jun 2026 10:06:52 -0600 Subject: [PATCH 2/2] DynamicRangePartitioning: add optional halo distance MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a per-side halo distance to `DynamicRangePartitioning` so the runtime-routing operator can declare "this bucket carries rows outside its primary range by distance D" at plan time: pub struct HaloSpec { preceding: ScalarValue, following: ScalarValue, } pub struct DynamicRangePartitioning { ordering: LexOrdering, partition_count: usize, halo: Option, // <- new } Distances are in the leading sort key's domain — e.g. for `RANGE BETWEEN 5 PRECEDING AND 3 FOLLOWING` over an `Int64` key, the halo is `preceding=5, following=3`. ROWS-frame halo (a count of neighboring rows) is intentionally not represented here; a separate variant can be added later if motivated. Builder pattern keeps the common case (no halo) terse: DynamicRangePartitioning::new(ordering, k) // disjoint DynamicRangePartitioning::new(ordering, k).with_halo(halo) // overlapping This is the API hook for a downstream operator (e.g. a halo-strip filter) to read `ExtremaKind::Expanded` extrema from the routing operator: the routing operator publishes the primary range, the filter trims the halo. Without halo, the partitioning produces disjoint buckets and a downstream consumer sees `ExtremaKind::Observed` extrema. - `compatible_with`: halos must match (or both `None`) to be compatible. - `project`: halo passes through unchanged. Halo is measured in the leading sort key's domain; projection must keep that key's `DataType` stable for the result to be valid. - `Display`: shape `DynamicRange([{ordering}], {k})` without halo; `DynamicRange([{ordering}], {k}, halo(preceding=P, following=F))` with. Three new tests: halo metadata + display, halo affecting `compatible_with` (mismatch, plain-vs-halo asymmetry), halo preserved through `project`. --- datafusion/physical-expr/src/partitioning.rs | 196 +++++++++++++++++-- 1 file changed, 185 insertions(+), 11 deletions(-) diff --git a/datafusion/physical-expr/src/partitioning.rs b/datafusion/physical-expr/src/partitioning.rs index 2b78b972c42e5..fc0e446790c79 100644 --- a/datafusion/physical-expr/src/partitioning.rs +++ b/datafusion/physical-expr/src/partitioning.rs @@ -22,7 +22,7 @@ use crate::{ expressions::UnKnownColumn, physical_exprs_equal, }; pub use datafusion_common::SplitPoint; -use datafusion_common::{Result, validate_range_split_points}; +use datafusion_common::{Result, ScalarValue, validate_range_split_points}; use datafusion_physical_expr_common::physical_expr::format_physical_expr_list; use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; use std::fmt; @@ -349,6 +349,68 @@ fn format_range_split_points(split_points: &[SplitPoint]) -> String { .join(", ") } +/// Per-side halo distances for a [`DynamicRangePartitioning`]. +/// +/// Halo rows are extra rows deliberately routed beyond a bucket's primary +/// `[min, max]` range so a downstream operator (typically a windowing or +/// filter pass) can see the full neighborhood at each seam. Each side's +/// distance is measured in the **leading sort key's domain** — the same +/// `DataType` as the first expression in +/// [`DynamicRangePartitioning::ordering`]. +/// +/// For example, for a `RANGE BETWEEN 5 PRECEDING AND 3 FOLLOWING` window +/// frame over an `Int64` sort key, the halo is `preceding = 5, following +/// = 3` — bucket `i`'s output contains its own primary range plus 5 +/// units of overlap to the left and 3 units to the right. +/// +/// Halo here is RANGE-frame style: a distance in the sort key's domain. +/// ROWS-frame halo (a count of neighboring rows) is not represented; +/// that interpretation will need a separate variant if and when it is +/// motivated. +#[derive(Debug, Clone, PartialEq)] +pub struct HaloSpec { + /// Distance the bucket extends below its primary `min` (toward + /// lex-smaller values along the leading sort key). Must share its + /// `DataType` with the leading sort key expression. + preceding: ScalarValue, + /// Distance the bucket extends above its primary `max` (toward + /// lex-larger values along the leading sort key). Must share its + /// `DataType` with the leading sort key expression. + following: ScalarValue, +} + +impl HaloSpec { + /// Creates a halo spec. `preceding` and `following` must share their + /// `DataType` with the leading sort key expression of the partitioning + /// they will be attached to; this is not validated at construction. + pub fn new(preceding: ScalarValue, following: ScalarValue) -> Self { + Self { + preceding, + following, + } + } + + /// Distance the bucket extends below its primary `min`. + pub fn preceding(&self) -> &ScalarValue { + &self.preceding + } + + /// Distance the bucket extends above its primary `max`. + pub fn following(&self) -> &ScalarValue { + &self.following + } +} + +impl Display for HaloSpec { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "halo(preceding={}, following={})", + self.preceding, self.following + ) + } +} + /// Physical range partitioning where split points are discovered at execution /// time from upstream data, not declared at plan time. /// @@ -367,7 +429,10 @@ fn format_range_split_points(split_points: &[SplitPoint]) -> String { /// /// Once the operator has computed split points, the partition contract is /// the same as [`RangePartitioning`]: lexicographic ordering, half-open -/// intervals, one row per output partition. +/// intervals, one row per output partition — unless [`Self::halo`] is set, +/// in which case the operator deliberately routes extra rows beyond each +/// bucket's primary range so a downstream pass can see the full +/// neighborhood at each seam. See [`HaloSpec`]. /// /// NOTE: Optimizer and execution behavior for this partitioning is /// intentionally not implemented and will be introduced incrementally. @@ -379,19 +444,38 @@ pub struct DynamicRangePartitioning { /// Number of output partitions. Fixed at plan time; split points /// between them are discovered at execute time. partition_count: usize, + /// Optional per-side halo distance. When set, the implementing + /// operator routes extra rows beyond each bucket's primary range; a + /// downstream operator is expected to strip them back to the primary + /// range by reading [`ExtremaKind::Expanded`] extrema. When unset, the + /// partitioning produces disjoint buckets and a downstream consumer + /// sees [`ExtremaKind::Observed`] extrema. + /// + /// [`ExtremaKind::Expanded`]: https://docs.rs/datafusion/latest/datafusion_physical_plan/enum.ExtremaKind.html#variant.Expanded + /// [`ExtremaKind::Observed`]: https://docs.rs/datafusion/latest/datafusion_physical_plan/enum.ExtremaKind.html#variant.Observed + halo: Option, } impl DynamicRangePartitioning { - /// Creates dynamic range partitioning metadata. + /// Creates dynamic range partitioning metadata with no halo. /// /// `partition_count` must be at least 1. pub fn new(ordering: LexOrdering, partition_count: usize) -> Self { Self { ordering, partition_count, + halo: None, } } + /// Attaches a [`HaloSpec`] to this partitioning, declaring that the + /// implementing operator routes extra rows beyond each bucket's + /// primary range. Builder-style. + pub fn with_halo(mut self, halo: HaloSpec) -> Self { + self.halo = Some(halo); + self + } + /// Returns the ordering that defines the range key. pub fn ordering(&self) -> &LexOrdering { &self.ordering @@ -402,13 +486,19 @@ impl DynamicRangePartitioning { self.partition_count } + /// Returns the halo spec, if set. + pub fn halo(&self) -> Option<&HaloSpec> { + self.halo.as_ref() + } + /// Returns true when `self` and `other` describe the same dynamic range /// partition map. /// /// Single-partition dynamic range partitionings are always compatible. - /// Otherwise the two partitionings must have the same partition count - /// and equivalent ordering expressions with the same sort options. - /// Split points are not compared because neither side has them yet. + /// Otherwise the two partitionings must have the same partition count, + /// matching halo (or both `None`), and equivalent ordering expressions + /// with the same sort options. Split points are not compared because + /// neither side has them yet. pub fn compatible_with( &self, other: &Self, @@ -420,6 +510,7 @@ impl DynamicRangePartitioning { if self.partition_count != other.partition_count || self.ordering.len() != other.ordering.len() + || self.halo != other.halo { return false; } @@ -452,6 +543,9 @@ impl DynamicRangePartitioning { /// /// Returns `None` if any range key cannot be projected or if projection /// collapses distinct range keys into duplicate output expressions. + /// Halo (if any) is preserved unchanged — halo is measured in the + /// leading sort key's domain, which the projection must keep stable + /// for the result to be valid. fn project( &self, mapping: &ProjectionMapping, @@ -479,17 +573,25 @@ impl DynamicRangePartitioning { Some(Self { ordering, partition_count: self.partition_count, + halo: self.halo.clone(), }) } } impl Display for DynamicRangePartitioning { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!( - f, - "DynamicRange([{}], {})", - self.ordering, self.partition_count - ) + match &self.halo { + Some(halo) => write!( + f, + "DynamicRange([{}], {}, {})", + self.ordering, self.partition_count, halo + ), + None => write!( + f, + "DynamicRange([{}], {})", + self.ordering, self.partition_count + ), + } } } @@ -1580,6 +1682,78 @@ mod tests { Ok(()) } + #[test] + fn test_dynamic_range_partitioning_halo_metadata() -> Result<()> { + let fixture = PartitioningTestFixture::int64(&["a", "b"])?; + let halo = + HaloSpec::new(ScalarValue::Int64(Some(5)), ScalarValue::Int64(Some(3))); + let with_halo = fixture.dynamic_range([0], 4).with_halo(halo.clone()); + + assert_eq!(with_halo.halo(), Some(&halo)); + assert_eq!( + with_halo.to_string(), + "DynamicRange([a@0 ASC], 4, halo(preceding=5, following=3))" + ); + + // No halo round-trips unchanged. + let no_halo = fixture.dynamic_range([0], 4); + assert_eq!(no_halo.halo(), None); + assert_eq!(no_halo.to_string(), "DynamicRange([a@0 ASC], 4)"); + + Ok(()) + } + + #[test] + fn test_dynamic_range_partitioning_halo_affects_compatible_with() -> Result<()> { + let fixture = PartitioningTestFixture::int64(&["a", "b"])?; + let halo_a = + HaloSpec::new(ScalarValue::Int64(Some(5)), ScalarValue::Int64(Some(3))); + let halo_b = + HaloSpec::new(ScalarValue::Int64(Some(5)), ScalarValue::Int64(Some(7))); + + let plain = fixture.dynamic_range([0], 4); + let with_a = fixture.dynamic_range([0], 4).with_halo(halo_a.clone()); + let with_a_same = fixture.dynamic_range([0], 4).with_halo(halo_a.clone()); + let with_b = fixture.dynamic_range([0], 4).with_halo(halo_b); + + // Identical halos → compatible. + assert!(with_a.compatible_with(&with_a_same, &fixture.eq_properties)); + // Mismatched halos → not compatible. + assert!(!with_a.compatible_with(&with_b, &fixture.eq_properties)); + // No-halo vs halo → not compatible. + assert!(!plain.compatible_with(&with_a, &fixture.eq_properties)); + assert!(!with_a.compatible_with(&plain, &fixture.eq_properties)); + + Ok(()) + } + + #[test] + fn test_dynamic_range_partitioning_project_preserves_halo() -> Result<()> { + let fixture = PartitioningTestFixture::int64(&["a", "b"])?; + let halo = + HaloSpec::new(ScalarValue::Int64(Some(5)), ScalarValue::Int64(Some(3))); + let dynamic_range = Partitioning::DynamicRange( + DynamicRangePartitioning::new( + [fixture.range_sort_expr(1, SortOptions::new(true, false))].into(), + 4, + ) + .with_halo(halo.clone()), + ); + + let keep_b_mapping = ProjectionMapping::from_indices(&[1], &fixture.schema)?; + let projected = dynamic_range.project(&keep_b_mapping, &fixture.eq_properties); + let Partitioning::DynamicRange(projected_inner) = &projected else { + panic!("expected DynamicRange, got {projected:?}"); + }; + assert_eq!(projected_inner.halo(), Some(&halo)); + assert_eq!( + projected.to_string(), + "DynamicRange([b@0 DESC NULLS LAST], 4, halo(preceding=5, following=3))" + ); + + Ok(()) + } + #[test] fn test_hash_partitioning_compatible_with() -> Result<()> { let fixture = PartitioningTestFixture::int64(&["a", "b"])?;