Skip to content

[PoC] Parallel prefix scan for cumulative-aggregate window functions#426

Draft
avantgardnerio wants to merge 4 commits into
brent/parallel-windowfrom
brent/parallel-window-cumulative
Draft

[PoC] Parallel prefix scan for cumulative-aggregate window functions#426
avantgardnerio wants to merge 4 commits into
brent/parallel-windowfrom
brent/parallel-window-cumulative

Conversation

@avantgardnerio

@avantgardnerio avantgardnerio commented Jun 23, 2026

Copy link
Copy Markdown

Status: PoC, not for merge. Stacked on top of apache#23026's branch. Open as a draft because this is just kicking the tires on how distribution could work in Ballista.

Edit: I think this is actually upstreamable in either parallel or streaming forms.

Based on: https://www.cs.cmu.edu/~guyb/papers/Ble93.pdf

What shape this parallelizes

Cumulative aggregates: ORDER BY x ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW. No PARTITION BY. Canonical log-analytics shape — "running sum over time."

SELECT seq, SUM(amount) OVER (
    ORDER BY seq
    ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) AS cumulative_sum
FROM events

Today this hits the same Distribution::SinglePartition collapse as the bounded-RANGE case the parent PR addresses. But HaloDropExec's tactic doesn't apply: halo means "give me N units of context past my bucket boundary," and cumulative means "give me every prior row," which is exactly the same as serializing.

What the PoC does instead: parallel prefix scan

Two-stage. Stage 1 is the existing RangeRepartitionExec (sans halo) + per-partition BoundedWindowAggExec with parallel_aware = true — each partition computes its own local cumulative sum starting at zero, in parallel. Stage 2 is a new CarryExec that pipeline-breaks: it gathers all partitions' batches, derives each partition's final cumulative value from the last row of its last batch, computes the prefix sum across finals, and re-emits each partition's batches with that prefix added to the agg column.

Conceptually this mirrors the Ballista inter-stage shuffle model, just intra-node: stage 1's outputs are fully materialized; stage 2 reads stage 1's totals as a small "broadcast" and re-emits with offsets applied.

Plan transformation:

# Today
ProjectionExec
  GlobalLimitExec
    BoundedWindowAggExec(SinglePartition)
      SortPreservingMergeExec [seq]
        SortExec preserve_partitioning=true
          DataSourceExec(N partitions)

# With this PR
SortPreservingMergeExec [seq]
  CarryExec                              # <- new; gathers + offsets
    BoundedWindowAggExec(parallel_aware) # <- runs per partition
      RangeRepartitionExec               # <- no halo
        SortExec preserve_partitioning=true
          DataSourceExec(N partitions)

CarryExec mechanics

"First poll wins" via tokio::sync::OnceCell — no spawned coordinator task, no per-partition oneshot channels. Whichever output partition polls first runs the gather; concurrent and subsequent polls await the same memoized result. Since Carry is pipeline-breaking, downstream can't emit before gather completes anyway, so we lose no parallelism by gathering inline.

The buffered batches are the finals state — partitions[i].last().last_row()[agg_col] is partition i's final. No separate Vec<ScalarValue>. Prefix is just a running scalar accumulated via ScalarValue::add.

Empty partitions contribute the additive identity (zero in the agg type); their prefix equals the running total at that point. "No data anywhere" passes through with a null prefix (unchanged).

TDD via SLT

parallel_window.slt gains four assertions for the cumulative case:

  1. EXPLAIN — locks the target plan shape.
  2. LIMIT 10 result — rows 0..9 sit entirely in partition 0 where carry-in is zero anyway; correct on a passthrough Carry too, so this catches plan-shape regressions, not carry correctness.
  3. count(cumulative_sum) sentinel — proves Carry neither drops nor duplicates rows.
  4. Cross-partition-boundary result — queries seq in {24, 25, 26, 49, 50, 51, 74, 75, 76} which straddle the range-partition cutoffs (~25, 50, 75). This is the real correctness gate: a passthrough Carry returns local-cumsum-restarting-at-0 at each boundary; the real Carry's offsets line up with the serial baseline.

The middle commit (7020f8e) wires the rule branch with a passthrough Carry stub and shows assertion 4 going red; the final commit (468f293) implements the prefix-scan body and flips it green. Useful as a self-contained "first see the wrong thing, then see the right thing" history.

Same as parent: don't merge, don't chase CI green (this branch inherits the parent's failing checks). Discussion only.

Adds ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW case to
parallel_window.slt. ParallelWindow rule rejects this frame today
(parallel_window.rs:146 rejects Rows; parallel_window.rs:158 rejects
UNBOUNDED), so the plan collapses to Distribution::SinglePartition.

Target plan introduces CarryExec — a pipeline-breaking N→N operator
that buffers all input batches, derives each input partition's final
cumulative value from the buffered batches (no separate state), and
re-emits with partition i's rows offset by the prefix sum of prior
finals. Mirrors the Ballista stage-shuffle model intra-node.

EXPLAIN expected text is structural; statistics decoration needs an
update-mode pass once CarryExec lands.
End-to-end plan-shape change visible in the SLT: cumulative ROWS
UNBOUNDED PRECEDING / CURRENT ROW windows now plan as

  CarryExec(BWAG_parallel_aware(RangeRepartitionExec(SortExec)))

instead of the BWAG-SinglePartition / SPM collapse. CarryExec is a
passthrough — output equals input — so the EXPLAIN block, the LIMIT 10
result block (rows 0-9 all sit in partition 0 where carry-in is zero
anyway), and the count sentinel all pass.

Cross-partition-boundary result block goes RED as designed: at the
boundaries (seq=24/49/74) each input partition's local cumulative sum
restarts at zero — the prefix-sum offset that real CarryExec will
apply is missing. Lands in the next commit.

is_candidate_carry checks the frame shape via v.is_null() on the
Preceding bound's ScalarValue rather than matching its concrete type,
so we're robust to whatever datatype UNBOUNDED PRECEDING resolves to.
Replaces the passthrough stub with a poll-driven pipeline-break: the
first output partition to poll triggers a single async gather over all
input partitions; concurrent and subsequent polls await the same memoized
result via tokio::sync::OnceCell. No spawned coordinator task, no per-
partition oneshot channels, no State mutex — work happens on whichever
executor task polls the stream.

The gather drains every input partition into Vec<RecordBatch>, derives
each partition's final cumulative value from the last row of its last
batch (the buffered batches ARE the state), and computes the prefix sum
across finals. Each output stream then re-emits its buffered batches
with `add(agg_col, prefix)` applied.

Error fan-out uses Arc<String> rather than cloning DataFusionError
(which doesn't implement Clone) — the message surfaces identically on
every output partition's stream.

Cross-partition-boundary SLT block flips from RED to GREEN; all four
assertions in parallel_window.slt pass.
// would let us broadcast a single-element array as a Datum, but the
// replicate cost is negligible (one scalar per batch).
let prefix_array = prefix.to_array_of_size(batch.num_rows())?;
let new_agg: ArrayRef = add(&agg.as_ref(), &prefix_array.as_ref())?;

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just add the "prefix" from prior partitions to each row.

if let Some(last) = batches.last() {
let final_i =
ScalarValue::try_from_array(last.column(agg_col), last.num_rows() - 1)?;
running = running.add(&final_i)?;

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calculate the prefix to add

@avantgardnerio

Copy link
Copy Markdown
Author

@2010YOUY01 and @milenkovicm - I can't seem to add you as reviewers, since this is a stacked PR on a fork, but perhaps you will see my mention.

@github-actions github-actions Bot added the core label Jun 30, 2026
@avantgardnerio

avantgardnerio commented Jun 30, 2026

Copy link
Copy Markdown
Author

Bench (16 cores / 32 threads):
image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant