Skip to content

chore(config): deprecate min_cpu_per_task#7125

Open
XiaoHongbo-Hope wants to merge 22 commits into
Eventual-Inc:mainfrom
XiaoHongbo-Hope:fix/min-cpu-per-task-wiring
Open

chore(config): deprecate min_cpu_per_task#7125
XiaoHongbo-Hope wants to merge 22 commits into
Eventual-Inc:mainfrom
XiaoHongbo-Hope:fix/min-cpu-per-task-wiring

Conversation

@XiaoHongbo-Hope

@XiaoHongbo-Hope XiaoHongbo-Hope commented Jun 13, 2026

Copy link
Copy Markdown
Contributor

Summary

Deprecates min_cpu_per_task / DAFT_MIN_CPU_PER_TASK instead of wiring it into Flotilla scheduling, following the discussion in #7123. The option is preserved for compatibility, but explicit Python usage and env-var usage now emit deprecation warnings.

Changes

  • Mark min_cpu_per_task as deprecated in the Python execution config docs.
  • Emit a DeprecationWarning when min_cpu_per_task is passed through Python config APIs.
  • Emit a stderr warning when DAFT_MIN_CPU_PER_TASK is set, while preserving existing parsed config behavior.
  • Add coverage that verifies explicit min_cpu_per_task usage warns and still preserves the configured value.

Testing

  • git diff --check
  • ruff format --check daft/context.py tests/test_context.py
  • rustup run stable rustfmt --edition 2024 --check src/common/daft-config/src/lib.rs
  • python -m py_compile daft/context.py tests/test_context.py
  • cargo test -p common-daft-config --lib
  • python -m pytest tests/test_context.py::test_min_cpu_per_task_is_deprecated -q (blocked locally because the native daft.daft extension is not built: ImportError: cannot import name 'build_type' from 'daft.daft')

Notes

Relates to #7123.

@XiaoHongbo-Hope XiaoHongbo-Hope requested a review from a team as a code owner June 13, 2026 13:14
@github-actions github-actions Bot added the fix label Jun 13, 2026
@XiaoHongbo-Hope XiaoHongbo-Hope marked this pull request as draft June 13, 2026 13:15
@greptile-apps

greptile-apps Bot commented Jun 13, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR fixes min_cpu_per_task (DAFT_MIN_CPU_PER_TASK) being a no-op in the flotilla scheduler by wiring it as the fallback in TaskResourceRequest::num_cpus() and leaving num_cpus unset in LocalPhysicalPlan::resource_request(). It also fixes fractional CPU/GPU demand being sent as N individual bundles to Ray's autoscaler (issue #7123) by aggregating sub-unit requests into ceil(sum) integer bundles, and adds input validation for min_cpu_per_task and ResourceRequest fields.

  • Wiring: TaskResourceRequest now stores min_cpu_per_task and uses it as the num_cpus fallback; LocalPhysicalPlan::resource_request() changed from default_cpu() (hardcoded 1.0 CPU) to default() (unset) so the fallback applies to plain plans; default bumped from 0.5 → 1.0 to preserve existing observable behavior.
  • Bundle aggregation: New aggregate_ray_bundles packs fractional CPU-only tasks into {CPU:1} bundles and sub-GPU tasks into {CPU:1,GPU:1} bundles, while whole-unit or memory tasks remain individual; next_autoscale_request selects the minimal prefix that exceeds the high-water mark and delegates to the aggregator.
  • Validation: Non-finite/non-positive min_cpu_per_task values are rejected at both the Python setter and env-var parse sites; ResourceRequest now rejects non-finite/negative num_cpus to match the existing num_gpus guard.

Confidence Score: 5/5

Safe to merge. The wiring fix is self-contained and the default change from 0.5 to 1.0 preserves the previous hardcoded fallback behavior.

The three distinct changes (config wiring, autoscaler bundle aggregation, input validation) are each independently tested, the high-water mark comparison against integer bundle totals is correct, and DistributedActorPoolProject tasks retain their explicit 1.0-CPU request through the max() path so the plan-layer change is backward-compatible.

No files require special attention beyond what previous review threads already identified.

Important Files Changed

Filename Overview
src/daft-distributed/src/scheduling/task.rs Major additions: RayBundle, aggregate_ray_bundles, and next_autoscale_request implement fractional-to-integer bundle aggregation for Ray's autoscaler; TaskResourceRequest gains a min_cpu_per_task fallback field. Logic is correct and thoroughly tested.
src/daft-distributed/src/python/ray/worker_manager.rs Autoscale path refactored to delegate bundle selection to next_autoscale_request; high-water mark now stored as integer bundle totals for consistent escalation.
src/daft-local-plan/src/plan.rs Switched base resource request from default_cpu() to default() so the task layer min_cpu_per_task fallback applies; DistributedActorPoolProject still forces 1.0 CPU via the max() call.
src/common/daft-config/src/lib.rs Default min_cpu_per_task bumped from 0.5 to 1.0; is_valid_min_cpu_per_task shared validation helper added; env-var path now rejects invalid values with a diagnostic message.
src/common/resource-request/src/lib.rs Added validation for non-finite/negative num_cpus mirroring the existing num_gpus guard; unit tests added.
src/common/daft-config/src/python.rs Python setter for min_cpu_per_task now raises ValueError for invalid inputs via the shared is_valid_min_cpu_per_task helper.
daft/context.py Docstring updated to reflect new semantics and validation requirement.
tests/test_context.py New tests cover fractional acceptance and rejection of zero/negative/NaN/inf values for min_cpu_per_task.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    A[LocalPhysicalPlan::resource_request] -->|num_cpus = None| B[ResourceRequest default]
    B --> C[TaskResourceRequest::new with min_cpu_per_task]
    C --> D{num_cpus set in plan?}
    D -- Yes --> E[Use explicit value]
    D -- No --> F[Fallback: min_cpu_per_task default 1.0]
    G[pending TaskResourceRequests] --> H[next_autoscale_request accumulate raw sums]
    H -->|raw sum > high-water mark| I[aggregate_ray_bundles]
    I --> J{task type}
    J -->|memory>0 OR cpus>1 OR gpus>1| K[Individual RayBundle ceil each dimension]
    J -->|sub-GPU: 0 < gpus <= 1| L[Pack into GPU pool]
    J -->|CPU-only: 0 < cpus <= 1| M[Pack into CPU pool]
    L --> N[ceil max fractional_gpu_sum gpu_cpu_sum RayBundle cpu=0or1 gpu=1]
    M --> O[ceil fractional_cpu_sum RayBundle cpu=1 gpu=None]
    K --> P[Ray request_resources integer bundles only]
    N --> P
    O --> P
    P --> Q[Update high-water mark with integer bundle totals]
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
flowchart TD
    A[LocalPhysicalPlan::resource_request] -->|num_cpus = None| B[ResourceRequest default]
    B --> C[TaskResourceRequest::new with min_cpu_per_task]
    C --> D{num_cpus set in plan?}
    D -- Yes --> E[Use explicit value]
    D -- No --> F[Fallback: min_cpu_per_task default 1.0]
    G[pending TaskResourceRequests] --> H[next_autoscale_request accumulate raw sums]
    H -->|raw sum > high-water mark| I[aggregate_ray_bundles]
    I --> J{task type}
    J -->|memory>0 OR cpus>1 OR gpus>1| K[Individual RayBundle ceil each dimension]
    J -->|sub-GPU: 0 < gpus <= 1| L[Pack into GPU pool]
    J -->|CPU-only: 0 < cpus <= 1| M[Pack into CPU pool]
    L --> N[ceil max fractional_gpu_sum gpu_cpu_sum RayBundle cpu=0or1 gpu=1]
    M --> O[ceil fractional_cpu_sum RayBundle cpu=1 gpu=None]
    K --> P[Ray request_resources integer bundles only]
    N --> P
    O --> P
    P --> Q[Update high-water mark with integer bundle totals]
Loading

Reviews (4): Last reviewed commit: "fix(flotilla): honor explicit num_cpus=0..." | Re-trigger Greptile

Comment on lines 39 to 43
pub fn num_cpus(&self) -> f64 {
self.resource_request.num_cpus().unwrap_or(1.0)
self.resource_request
.num_cpus()
.unwrap_or(self.min_cpu_per_task)
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Silent default-CPU reduction from 1.0 → 0.5

The old fallback was the literal 1.0; DaftExecutionConfig::default().min_cpu_per_task is 0.5. Any deployment that relied on the previous implicit 1.0-CPU bundle request will now submit 0.5-CPU bundles without any config change. On an autoscaler that uses CPU headroom to decide when to launch new nodes, halving the per-task CPU request could double the number of tasks packed per node before a scale-out fires, increasing the likelihood of OOM or throttled execution. The PR description notes this trade-off, but the change in default observable behaviour is worth confirming with the team before merging.

Comment on lines 713 to 718
pub fn with_resource_request(mut self, resource_request: ResourceRequest) -> Self {
self.resource_request = TaskResourceRequest::new(resource_request);
self.resource_request = TaskResourceRequest::new(
resource_request,
DaftExecutionConfig::default().min_cpu_per_task,
);
self

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 with_resource_request ignores the builder's own config

MockTaskBuilder::with_resource_request constructs a fresh DaftExecutionConfig::default() to obtain the min_cpu_per_task fallback instead of reusing whatever min_cpu_per_task the calling test may have injected into self.resource_request already. Any test that constructs a MockTask with a non-default min_cpu_per_task and later calls with_resource_request will silently reset the fallback to 0.5. This won't break current tests (none set a custom value), but it makes the helper subtly inconsistent and could mask future test regressions. Consider storing the min_cpu_per_task on the builder and reusing it here.

The min_cpu_per_task execution config field had no readers in the
distributed scheduler: TaskResourceRequest::num_cpus() returned a
hardcoded 1.0 when the plan's ResourceRequest had no num_cpus.

This was wired up in Eventual-Inc#4506 for the legacy ray runner only; Eventual-Inc#5375
removed that runner along with its lone reader, and the flotilla
scheduler was never wired in. Result: setting min_cpu_per_task via
daft.set_execution_config or DAFT_MIN_CPU_PER_TASK had zero effect
on autoscaler bundle requests.

Plumb the value from DaftExecutionConfig (already in scope at
SwordfishTaskBuilder::build) into TaskResourceRequest, and use it
as the fallback in num_cpus() instead of the literal 1.0.

Closes Eventual-Inc#7123
@XiaoHongbo-Hope XiaoHongbo-Hope force-pushed the fix/min-cpu-per-task-wiring branch from ca66e01 to 1d05985 Compare June 13, 2026 13:20
- Comment said 'Floor' but the implementation is 'Default-when-None'
  (explicit num_cpus is honored as-is). Reword to match.
- Add two unit tests:
  * num_cpus falls back to min_cpu_per_task when ResourceRequest is empty
  * explicit num_cpus passes through unchanged
Before this PR, the flotilla scheduler fell back to a hardcoded 1.0
CPU when ResourceRequest had no num_cpus; the configured
min_cpu_per_task default of 0.5 was inert.

Wiring the field through (1d05985) without changing the default
would silently halve the per-task CPU floor for every existing user,
which Greptile's review flagged as a behaviour change risk on
capacity-tuned clusters (e.g. KubeRay packs 2x more tasks before
scale-out, increasing OOM risk).

Move the default to 1.0 so the no-explicit-num_cpus path matches the
pre-wiring behaviour exactly. Users who want a smaller floor set it
explicitly via daft.set_execution_config(min_cpu_per_task=...) or
DAFT_MIN_CPU_PER_TASK, which is the original purpose of the knob.
…uest

MockTaskBuilder.with_resource_request was constructing a fresh
DaftExecutionConfig::default() to fetch the min_cpu_per_task fallback,
which silently reset any non-default value already set on the builder.
Reuse self.resource_request.min_cpu_per_task instead so the fallback
threads through chained .with_* calls correctly.

Test-helper-only; no production behavior change.

Addresses greptile P2 review comment.
@XiaoHongbo-Hope XiaoHongbo-Hope marked this pull request as ready for review June 19, 2026 15:07
@XiaoHongbo-Hope XiaoHongbo-Hope marked this pull request as draft June 19, 2026 15:26
…inite min_cpu_per_task

Two follow-ups from review of the min_cpu_per_task wiring:

- The Ray autoscaler bundle still rounded CPU up via `num_cpus().ceil()`,
  so a configured `min_cpu_per_task=0.1` reached `request_resources` as
  `{"CPU": 1}` (issue Eventual-Inc#7123). Add `TaskResourceRequest::autoscale_bundle`
  which keeps CPU fractional (GPU/memory stay integer, zero values dropped)
  and build the Ray bundle as a `PyDict` with a float CPU. Update the
  `try_autoscale` type hint accordingly.

- `min_cpu_per_task` validation let NaN/inf through (`nan <= 0` and
  `inf <= 0` are both false). Use `is_finite() && > 0` on both the Python
  setter and the env path.

Add Rust unit tests for the fractional bundle and env validation, and a
Python test covering acceptance of fractional values and rejection of
0/-0.5/nan/inf.
…undles

Ray's request_resources (<= 2.55; Daft pins ray==2.55.1) rejects non-integer
bundle values: `isinstance(bundle[key], int)` raises `TypeError: each bundle
key should be str and value as int.`. Sending a float CPU therefore crashes at
runtime — even a whole 1.0, since pyo3 emits a Python float.

Replace the per-task float bundle with `aggregate_ray_bundles`: CPU-only tasks
have their fractional CPU summed and emitted as ceil(sum) unit {"CPU": 1}
bundles, so N tasks at 0.1 CPU request ceil(0.1*N) CPUs instead of N
(issue Eventual-Inc#7123), while never sending a non-integer value. Tasks carrying GPU or
memory keep an individual bundle (CPU rounded up) since those resources pin
placement to a node.

Revert the try_autoscale type hint to dict[str, int]. Replace the fractional
bundle unit tests with aggregation tests.
aggregate_ray_bundles packed every CPU-only task into unit {"CPU": 1}
bundles. That is wrong for a task requesting num_cpus >= 1: a 4-CPU task
runs on one worker, so splitting it into 4 spread bundles lets the
autoscaler provision 4 single-CPU nodes and leaves the task unschedulable.
It also turned CPU magnitude into the loop count, so a huge or non-finite
explicit num_cpus (inf as i64 == i64::MAX) could hang/OOM, and a NaN
poisoned the running sum and zeroed the batch's CPU request.

Only pack sub-1.0 CPU-only tasks now; tasks with GPU, memory, or
num_cpus >= 1 keep an individual bundle (CPU rounded up to at least 1).
Non-finite / non-positive CPU contributes nothing. The packed sum is now
bounded by task count, so the loop can no longer blow up.
…r mark

The high-water mark recorded the fractional cpu_sum, but the request actually
sent to Ray is the integer-aggregated bundle total. With min_cpu_per_task=0.1
the mark grew ~0.1 per cycle while ceil() only bumped the real CPU request
every ~10 cycles, so scale-up for many pending tasks stalled for
~1/min_cpu_per_task cycles (≈50s at the default 5s interval) per extra CPU.

Record the aggregated integer bundle totals (what Ray actually receives) as the
mark instead. Because each cycle selects bundles until the fractional cpu_sum
exceeds the integer mark, ceil() now bumps by at least one CPU every cycle,
restoring the intended one-unit-per-cycle ramp while still never requesting
less than before. Convergence is unchanged: once pending demand can no longer
exceed the mark, the cycle is skipped.

Verified: cargo test -p daft-distributed --lib (8 task tests pass),
cargo check/clippy -p daft-distributed --features python clean.
try_new_internal only checked num_gpus for negativity, so an explicit
num_cpus = inf/NaN (or negative) flowed through. Downstream that became a
bundle of {"CPU": i64::MAX} (inf as i64 saturates) in the autoscaler. Require
both num_cpus and num_gpus to be finite and nonnegative; this also catches a
NaN num_gpus that the existing >1-must-be-integer check missed.
aggregate_ray_bundles only packed fractional CPU; fractional-GPU tasks (Daft
supports num_gpus<1) were each emitted as a full {"GPU":1} bundle. Combined
with the high-water mark comparing raw fractional gpu_sum against the integer
mark, 11 tasks at 0.1 GPU would request 11 GPUs instead of ceil(1.1)=2 —
likely tripping "requested bundles exceed max capacity, autoscaler refuses
all". Sum sub-GPU demand into ceil(sum) {"CPU":1,"GPU":1} bundles (their small
CPU rides on the GPU node, not double-counted); memory/multi-unit tasks still
keep an individual placement-pinned bundle.

Extract the select-then-aggregate ramp into a pure next_autoscale_request() in
task.rs so it's unit-testable outside the python-gated worker manager; add
tests for whole-unit CPU/GPU escalation, GPU packing, and the skip-below-mark
case. Tighten comments.

Verified: cargo test -p daft-distributed --lib (78 pass) and
-p common-resource-request; cargo check/clippy --features python clean.
Packing sub-GPU tasks dropped their CPU demand entirely: a task with
num_gpus<1 and the min_cpu_per_task fallback (e.g. @daft.cls(gpus=0.5) ->
num_cpus=None -> 1.0 CPU) only contributed to fractional_gpu_sum, so the
{CPU:1,GPU:1} bundles under-counted CPU. Worse, next_autoscale_request selects
on raw cpu_sum while the high-water mark records the post-aggregation totals,
so two such tasks at mark 1/1 kept re-requesting {CPU:1,GPU:1} without the mark
ever growing — the cluster could stall and never provision the second CPU,
leaving tasks unschedulable.

Accumulate the packed tasks' CPU (gpu_cpu_sum) and spread it across the GPU
bundles as ceil(gpu_cpu_sum / gpu_bundles). The request now reflects real CPU
demand, so the recorded mark grows whenever selection triggers and the ramp
converges. Add a regression test for the gpus=0.5 + fallback-CPU stall.

Verified: cargo test -p daft-distributed --lib (13 task tests) and
cargo check/clippy --features python clean.
The "finite and > 0" check for min_cpu_per_task was duplicated in the env path
and the Python setter, which could drift. Extract it into a single
DaftExecutionConfig::is_valid_min_cpu_per_task and call it from both.
Carrying GPU tasks' CPU as ceil(gpu_cpu_sum / gpu_bundles) produced bundles
like {CPU:2, GPU:1}. As a single Ray request_resources shape that fits no
standard 1-CPU/1-GPU node, so the autoscaler can't scale up — and the value is
recorded as the high-water mark, stalling further attempts.

Emit unit {CPU:1, GPU:1} bundles instead (a sub-GPU task's cpu and gpu are each
<= 1, so one always fits a standard GPU node), with the count covering both
dimensions: ceil(max(gpu_sum, gpu_cpu_sum)). Two 1-CPU/0.5-GPU tasks now request
two {CPU:1,GPU:1} shapes (2 CPU / 2 GPU) rather than one unschedulable {CPU:2}.
Assert the schedulable shape in the regression test.
@XiaoHongbo-Hope XiaoHongbo-Hope marked this pull request as ready for review June 21, 2026 05:58
@XiaoHongbo-Hope XiaoHongbo-Hope marked this pull request as draft June 21, 2026 06:03
aggregate_ray_bundles forced CPU >= 1 (via .max(1) on individual bundles and a
fixed CPU:1 on every GPU bundle), so a task that explicitly sets num_cpus=0
still requested a CPU — breaking "explicit num_cpus passes through unchanged"
and over-requesting CPU for GPU-only / memory-only workloads.

Drop the .max(1); give GPU bundles CPU only when the packed tasks actually need
it (gpu_cpu_sum > 0); and omit the CPU key from the Ray bundle dict when it is
zero. Add a test for num_cpus=0 GPU-only and memory-only tasks.
@XiaoHongbo-Hope XiaoHongbo-Hope marked this pull request as ready for review June 21, 2026 06:49
@XiaoHongbo-Hope XiaoHongbo-Hope changed the title fix(flotilla): wire min_cpu_per_task into TaskResourceRequest chore(config): deprecate min_cpu_per_task Jun 24, 2026
@XiaoHongbo-Hope

Copy link
Copy Markdown
Contributor Author

Updated this PR based on the feedback in #7123.

Instead of wiring min_cpu_per_task into Flotilla scheduling, this now deprecates the Python argument and DAFT_MIN_CPU_PER_TASK env var while preserving the existing config behavior for compatibility.

I also updated the PR title/description to match the new scope and included the local verification results there.

@github-actions github-actions Bot added chore and removed fix labels Jun 24, 2026
Comment thread daft/context.py
"`min_cpu_per_task` is deprecated and has no effect on Flotilla scheduling. "
"It will be removed in the next minor version.",
DeprecationWarning,
stacklevel=2,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stacklevel=2 only surfaces this warning for direct set_execution_config(...) calls. When users call execution_config_ctx(min_cpu_per_task=...), the warning is attributed to daft/context.py instead of user code, so the default DeprecationWarning filter hides it. The new test catches the warning with pytest.warns, but misses this user-visible behavior. Could we warn from the outer context-manager path or adjust the stacklevel/helper so both public APIs point at the caller?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants