Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,10 @@ harness = false
name = "preserve_file_partitioning"
required-features = ["parquet"]

[[bench]]
harness = false
name = "parallel_window_scaling"

[[bench]]
harness = false
name = "reset_plan_states"
187 changes: 187 additions & 0 deletions datafusion/core/benches/parallel_window_scaling.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Scaling sweep for the `ParallelWindow` optimizer rule.
//!
//! For each value of `target_partitions`, runs a bounded RANGE-frame
//! window query (no `PARTITION BY` — the shape #23197 calls out as
//! single-core-bottlenecked) twice: once with the `ParallelWindow`
//! rule filtered out of the physical optimizer chain (the baseline),
//! and once with the default chain (the rule fires). Emits a CSV row
//! per iteration to stdout. Pipe to
//! `scripts/plot_parallel_window_scaling.py` for a throughput-vs-cores
//! chart.
//!
//! Run:
//! cargo bench --bench parallel_window_scaling \
//! > parallel_window_scaling.csv

use arrow::array::{Float64Array, Int64Array, RecordBatch};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::datasource::MemTable;
use datafusion::execution::SessionStateBuilder;
use datafusion::physical_optimizer::optimizer::PhysicalOptimizer;
use datafusion::prelude::{SessionConfig, SessionContext};
use rand::SeedableRng;
use rand::rngs::StdRng;
use rand_distr::{Distribution, Uniform};
use std::hint::black_box;
use std::sync::Arc;
use std::time::Instant;
use tokio::runtime::Runtime;

/// Weak-scaling design: rows scale linearly with cores so total work is
/// proportional to the parallelism budget. The PoC line in the chart
/// should stay roughly flat (constant rows per core ⇒ constant
/// wall-clock if scaling is linear), while the baseline line grows
/// linearly with cores (no parallelization ⇒ wall-clock tracks total
/// rows).
const ROWS_PER_CORE: usize = 500_000;
const BATCH_SIZE: usize = 8 * 1024;
const HALO_RANGE: i64 = 100;
const ITERATIONS: usize = 3;
/// The "cores" axis: each value sets the table partition count, the
/// `target_partitions` budget, and (via `ROWS_PER_CORE`) total rows.
const CORE_SETTINGS: &[usize] = &[1, 2, 4, 8, 16, 32];

fn schema() -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("ts", DataType::Int64, false),
Field::new("v", DataType::Float64, false),
]))
}

/// Build `num_partitions` partitions of `(ts, v)` rows, with `ts`
/// monotonically increasing within each partition AND between partitions
/// (partition `i` covers `ts ∈ [i * per, (i + 1) * per)`). Declaring this
/// ordering on the `MemTable` lets DataFusion elide the `SortExec` from
/// both the baseline and the PoC plans, so the bench measures BWAG +
/// repartitioning cost rather than sort cost.
fn make_partitions(num_partitions: usize) -> Vec<Vec<RecordBatch>> {
let mut rng = StdRng::seed_from_u64(0xC0FFEE_C0FFEE);
let v_dist = Uniform::new(0.0f64, 1.0).unwrap();
let schema = schema();
(0..num_partitions)
.map(|part| {
let mut batches = Vec::new();
let part_start = (part * ROWS_PER_CORE) as i64;
let mut next_ts = part_start;
let mut remaining = ROWS_PER_CORE;
while remaining > 0 {
let len = remaining.min(BATCH_SIZE);
let ts: Vec<i64> = (0..len as i64).map(|i| next_ts + i).collect();
next_ts += len as i64;
let v: Vec<f64> = (0..len).map(|_| v_dist.sample(&mut rng)).collect();
batches.push(
RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int64Array::from(ts)),
Arc::new(Float64Array::from(v)),
],
)
.unwrap(),
);
remaining -= len;
}
batches
})
.collect()
}

fn make_ctx(
data: &[Vec<RecordBatch>],
target_partitions: usize,
with_parallel_window: bool,
) -> SessionContext {
// Data is monotonic per partition (see `make_partitions`), so SortExec
// runs cheaply, but we deliberately don't declare `with_sort_order` here:
// the PoC's `RangeRepartitionExec` sources its global extremes from the
// SortExec it sits above, and elision would break that. Sort cost is
// already amortized across `target_partitions` input partitions.
let table = MemTable::try_new(schema(), data.to_vec()).unwrap();
let config = SessionConfig::new()
.with_target_partitions(target_partitions)
.with_batch_size(BATCH_SIZE);

let mut builder = SessionStateBuilder::new()
.with_default_features()
.with_config(config);
if !with_parallel_window {
let rules: Vec<_> = PhysicalOptimizer::new()
.rules
.into_iter()
.filter(|r| r.name() != "ParallelWindow")
.collect();
builder = builder.with_physical_optimizer_rules(rules);
}
let state = builder.build();
let ctx = SessionContext::new_with_state(state);
ctx.register_table("t", Arc::new(table)).unwrap();
ctx
}

fn run_once(ctx: &SessionContext, rt: &Runtime, sql: &str) -> usize {
let df = rt.block_on(ctx.sql(sql)).unwrap();
let batches = rt.block_on(df.collect()).unwrap();
let rows: usize = batches.iter().map(|b| b.num_rows()).sum();
black_box(batches);
rows
}

fn main() {
let rt = Runtime::new().unwrap();
// Five aggregates over the same OVER clause: BWAG fuses them into
// one operator with five accumulators, so each sliding output row
// pays ~5x the per-row work of `SUM` alone. Shifts the cost balance
// toward BWAG and away from the (pre-sorted) input scan.
let sql = format!(
"SELECT \
SUM(v) OVER w, \
AVG(v) OVER w, \
MIN(v) OVER w, \
MAX(v) OVER w, \
COUNT(*) OVER w \
FROM t \
WINDOW w AS \
(ORDER BY ts RANGE BETWEEN {HALO_RANGE} PRECEDING AND CURRENT ROW)"
);

println!("cores,with_poc,iter,seconds,rows");
for &cores in CORE_SETTINGS {
let data = make_partitions(cores);
for &with_poc in &[false, true] {
let ctx = make_ctx(&data, cores, with_poc);
// Warmup: compile the plan, prime allocators.
let warmup_rows = run_once(&ctx, &rt, &sql);
for iter in 0..ITERATIONS {
let t = Instant::now();
let rows = run_once(&ctx, &rt, &sql);
let secs = t.elapsed().as_secs_f64();
assert_eq!(
rows, warmup_rows,
"row count drifted: warmup={warmup_rows} run={rows}"
);
println!("{cores},{with_poc},{iter},{secs:.6},{rows}");
eprintln!(
"cores={cores:>2} poc={with_poc:<5} iter={iter} \
secs={secs:>6.3} rows={rows}"
);
}
}
}
}
152 changes: 152 additions & 0 deletions datafusion/core/benches/scripts/plot_parallel_window_scaling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
#!/usr/bin/env python3
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Plot the weak-scaling sweep emitted by `parallel_window_scaling.rs`.

The bench emits one CSV row per iteration with header
`cores,with_poc,iter,seconds,rows`. Rows scale linearly with cores, so
under perfect scaling the PoC's wall-clock stays constant and its
throughput grows linearly with cores; the baseline's wall-clock grows
linearly with cores (no parallelism ⇒ pure serial work) and its
throughput stays flat at single-core capacity.

Usage:
cargo bench --bench parallel_window_scaling > scaling.csv
python3 plot_parallel_window_scaling.py scaling.csv [--out file.png]
"""

import argparse
import csv
import statistics
import sys
from collections import defaultdict

import matplotlib

matplotlib.use("Agg") # headless PNG only
import matplotlib.pyplot as plt


def read_rows(source):
reader = csv.DictReader(source)
seconds = defaultdict(list)
rows_for = {}
for row in reader:
cores = int(row["cores"])
with_poc = row["with_poc"].lower() == "true"
secs = float(row["seconds"])
n_rows = int(row["rows"])
seconds[(cores, with_poc)].append(secs)
rows_for[cores] = n_rows
return seconds, rows_for


def medians(seconds):
out = defaultdict(dict)
for (cores, with_poc), samples in seconds.items():
out[with_poc][cores] = statistics.median(samples)
return out


def main():
ap = argparse.ArgumentParser()
ap.add_argument("csv", help="CSV path; use '-' for stdin")
ap.add_argument(
"--out",
default="parallel_window_scaling.png",
help="output PNG path (default: parallel_window_scaling.png)",
)
args = ap.parse_args()

source = sys.stdin if args.csv == "-" else open(args.csv)
seconds, rows_for = read_rows(source)
if args.csv != "-":
source.close()

by_poc = medians(seconds)
baseline = by_poc[False]
parallel = by_poc[True]
cores = sorted(set(baseline) | set(parallel))

fig, (ax_time, ax_throughput) = plt.subplots(
1, 2, figsize=(13, 5), constrained_layout=True
)

# Wall-clock vs cores (weak scaling: rows grow with cores).
bx = sorted(baseline)
by = [baseline[c] for c in bx]
px = sorted(parallel)
py = [parallel[c] for c in px]
ideal_baseline = [by[0] * (c / bx[0]) for c in bx]
ax_time.plot(
bx,
ideal_baseline,
linestyle="--",
color="grey",
label="ideal serial (y = x · t₁)",
)
ax_time.plot(bx, by, marker="o", color="C0", label="ParallelWindow off (baseline)")
ax_time.plot(px, py, marker="s", color="C1", label="ParallelWindow on (this PR)")
ax_time.set_xscale("log", base=2)
ax_time.set_xticks(cores)
ax_time.set_xticklabels([str(c) for c in cores])
ax_time.set_xlabel("cores (= target_partitions = input partitions)")
ax_time.set_ylabel("wall-clock (seconds)")
ax_time.set_title("Wall-clock vs cores (weak scaling)")
ax_time.grid(True, which="both", alpha=0.3)
ax_time.legend(loc="upper left")

# Throughput vs cores: under linear scaling PoC follows y = x · t1
# (matching the dashed reference).
bt = [rows_for[c] / baseline[c] / 1e6 for c in bx]
pt = [rows_for[c] / parallel[c] / 1e6 for c in px]
ideal_parallel = [pt[0] * (c / px[0]) for c in px]
ax_throughput.plot(
px,
ideal_parallel,
linestyle="--",
color="grey",
label="ideal linear scaling (y = x · t₁)",
)
ax_throughput.plot(
bx, bt, marker="o", color="C0", label="ParallelWindow off (baseline)"
)
ax_throughput.plot(
px, pt, marker="s", color="C1", label="ParallelWindow on (this PR)"
)
ax_throughput.set_xscale("log", base=2)
ax_throughput.set_yscale("log", base=2)
ax_throughput.set_xticks(cores)
ax_throughput.set_xticklabels([str(c) for c in cores])
ax_throughput.set_xlabel("cores (= target_partitions = input partitions)")
ax_throughput.set_ylabel("throughput (million rows / second, log₂)")
ax_throughput.set_title("Throughput vs cores")
ax_throughput.grid(True, which="both", alpha=0.3)
ax_throughput.legend(loc="upper left")

rows_per_core = next(iter(rows_for.values())) // cores[0] if cores else 0
fig.suptitle(
f"ParallelWindow weak-scaling — {rows_per_core:,} rows per core, "
f"5 window aggregates over RANGE BETWEEN 100 PRECEDING AND CURRENT ROW"
)
fig.savefig(args.out, dpi=120)
print(f"wrote {args.out}", file=sys.stderr)


if __name__ == "__main__":
main()
1 change: 1 addition & 0 deletions datafusion/physical-optimizer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ datafusion-physical-expr-common = { workspace = true }
datafusion-physical-plan = { workspace = true }
datafusion-pruning = { workspace = true }
itertools = { workspace = true }
log = { workspace = true }
recursive = { workspace = true, optional = true }

[dev-dependencies]
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-optimizer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub mod limit_pushdown_past_window;
pub mod limited_distinct_aggregation;
pub mod optimizer;
pub mod output_requirements;
pub mod parallel_window;
pub mod projection_pushdown;
pub use datafusion_pruning as pruning;
pub mod hash_join_buffering;
Expand Down
8 changes: 8 additions & 0 deletions datafusion/physical-optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::join_selection::JoinSelection;
use crate::limit_pushdown::LimitPushdown;
use crate::limited_distinct_aggregation::LimitedDistinctAggregation;
use crate::output_requirements::OutputRequirements;
use crate::parallel_window::ParallelWindow;
use crate::projection_pushdown::ProjectionPushdown;
use crate::sanity_checker::SanityCheckPlan;
use crate::topk_aggregation::TopKAggregation;
Expand Down Expand Up @@ -187,6 +188,13 @@ impl PhysicalOptimizer {
// [`EnsureRequirements`](crate::ensure_requirements) for the per-phase
// breakdown, and <https://github.com/apache/datafusion/issues/21973>
// for the original failure mode.
// Re-shape no-PARTITION-BY RANGE-frame windows into a parallel
// form: SortExec(preserve_partitioning) + RangeRepartitionExec
// + parallel-aware BWAG. Runs *before* EnsureRequirements so
// we own the distribution decision — otherwise EnsureRequirements
// would satisfy BWAG's SinglePartition requirement by inserting
// an SPM that collapses the parallelism we're trying to create.
Arc::new(ParallelWindow::new()),
Arc::new(EnsureRequirements::new()),
// The CombinePartialFinalAggregate rule should be applied after distribution enforcement
Arc::new(CombinePartialFinalAggregate::new()),
Expand Down
Loading