feat: PartitionExtrema + SortExec observer + BoundedWindowAggExec passthrough#23090
feat: PartitionExtrema + SortExec observer + BoundedWindowAggExec passthrough#23090avantgardnerio wants to merge 3 commits into
Conversation
…ded" - `runtime_partition_extrema`: replace the "observed-vs-intended" pointer with a one-sentence handoff to `ExtremaKind::Observed` / `ExtremaKind::Expanded`. The discriminant is now the source of truth for which interpretation applies. - `ExtremaKind::Expanded`: rename "intended primary range" → "primary range". The earlier doc carried "intended" as a leftover from the observed-vs-intended framing the enum replaces; with the variant named `Expanded`, "intended" only adds an extra synonym to map.
BWAG processes its input in order and appends new window-result
columns on the right of the existing schema. Its equivalence
properties are built by extending the input's, leaving the input's
leading sort expressions in place along the same column indices in
the output. So along the output's declared ordering, the partition
range observed (or expanded) by the upstream is the same range we
emit — a clean passthrough.
Skipped: CoalesceBatchesExec. The operator is deprecated since 52.0.0
("we now use BatchCoalescer from arrow-rs instead of a dedicated
operator"); coalescing now happens inside other operators' streams,
so there's no dedicated plan node to override.
|
run benchmarks |
|
run benchmark sort_tpch10 tpch10 (just making sure to run some benchmarks( |
|
Hi @Dandandan, your benchmark configuration could not be parsed (#23090 (comment)). Error: Usage: Any benchmark name is accepted: Per-side configuration ( env:
# shared env is inherited by BOTH the build and the run, so build
# flags go here. Builds default to no debuginfo for speed; opt back
# in for hung-job gdb dumps and cap jobs to stay within memory:
CARGO_PROFILE_RELEASE_DEBUG: "1"
CARGO_BUILD_JOBS: "1"
baseline:
ref: v45.0.0
env:
# per-side env only reaches the benchmark run, not the build
DATAFUSION_RUNTIME_MEMORY_LIMIT: 1G
changed:
ref: v46.0.0
env:
DATAFUSION_RUNTIME_MEMORY_LIMIT: 2GFile an issue against this benchmark runner |
|
run benchmark sort_tpch10 tpch10 |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing brent/partition-extrema (2cc5bd1) to f9c1e9e (merge-base) diff using: tpch10 File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing brent/partition-extrema (2cc5bd1) to f9c1e9e (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing brent/partition-extrema (2cc5bd1) to f9c1e9e (merge-base) diff using: sort_tpch10 File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing brent/partition-extrema (2cc5bd1) to f9c1e9e (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing brent/partition-extrema (2cc5bd1) to f9c1e9e (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch10 — base (merge-base)
tpch10 — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagesort_tpch10 — base (merge-base)
sort_tpch10 — branch
File an issue against this benchmark runner |
Which issue does this PR close?
Implements the proposal in #23089.
Rationale for this change
See #23089 for the full design rationale, including the dual-semantics motivation, alternatives considered, and coexistence with
Partitioning::Range. Short version: range-aware operators (parallel window functions, future dynamic-range repartitioning, range-elimination optimizations) want to ask anExecutionPlanfor the lex-min / lex-max of a partition's output along its declared ordering. Today there's no way to ask.What changes are included in this PR?
Pure addition; zero behavior change in any path that doesn't call the new method.
ExecutionPlan::runtime_partition_extrema(&self, partition) -> Result<Option<PartitionExtrema>>— defaultOk(None).PartitionExtrema { kind, min, max, row_count }andenum ExtremaKind { Observed, Expanded }.Observed(the only kind any operator in this PR returns) means the reported range literally bounds the partition's data.Expandedis reserved for future operators that deliberately route rows outside the reported range as a "halo" for a downstream filter to strip. The dual semantics live on the enum so passthroughs that don't care don't have tomatch.SortExecoverride: a per-partition slot is populated inside the sort code path (eachsort_batch_chunkedcall folds first/last sorted rows into the slot, zero-copy viaRecordBatch::slice). Once execution has consumed the input, the slot holds the lex-min / lex-max along the declared ordering.BoundedWindowAggExecoverride: passthrough. BWAG extends its input's equivalence properties and appends new window-result columns on the right of the schema, so the leading sort exprs remain stable in the output along the same column indices.Are these changes tested?
7 unit tests in
datafusion/physical-plan/src/sorts/sort.rs::tests:test_runtime_partition_extrema_before_execute_is_none— caller contract: reading without a poll returnsOk(None).test_runtime_partition_extrema_after_full_sort— two batches, in-memory merge path; extrema match expected lex-min / lex-max withkind = Observed.test_runtime_partition_extrema_descending_swaps_min_max— DESC sort:minis the largest value,maxis the smallest.test_runtime_partition_extrema_per_partition— two input partitions withpreserve_partitioning=true: each output partition's extrema track its own range.test_runtime_partition_extrema_default_is_none— default trait impl returnsOk(None)on a non-overriding operator (EmptyExec).Are there any user-facing changes?
datafusion::physical_plan:PartitionExtrema,ExtremaKind.ExecutionPlan::runtime_partition_extremawith a defaultOk(None). Existing customExecutionPlanimplementations are not required to change.