From 1d05985e8aa69038fa4668bc63b50926b1f324ca Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sat, 13 Jun 2026 09:20:32 -0400 Subject: [PATCH 01/21] fix(flotilla): wire min_cpu_per_task into TaskResourceRequest The min_cpu_per_task execution config field had no readers in the distributed scheduler: TaskResourceRequest::num_cpus() returned a hardcoded 1.0 when the plan's ResourceRequest had no num_cpus. This was wired up in #4506 for the legacy ray runner only; #5375 removed that runner along with its lone reader, and the flotilla scheduler was never wired in. Result: setting min_cpu_per_task via daft.set_execution_config or DAFT_MIN_CPU_PER_TASK had zero effect on autoscaler bundle requests. Plumb the value from DaftExecutionConfig (already in scope at SwordfishTaskBuilder::build) into TaskResourceRequest, and use it as the fallback in num_cpus() instead of the literal 1.0. Closes #7123 --- src/daft-distributed/src/scheduling/task.rs | 27 ++++++++++++++++----- 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/src/daft-distributed/src/scheduling/task.rs b/src/daft-distributed/src/scheduling/task.rs index 1be22465e8e..4f56008f951 100644 --- a/src/daft-distributed/src/scheduling/task.rs +++ b/src/daft-distributed/src/scheduling/task.rs @@ -23,15 +23,23 @@ use crate::{ #[derive(Debug, Clone)] pub(crate) struct TaskResourceRequest { pub resource_request: ResourceRequest, + /// Floor applied when the plan's ResourceRequest does not specify num_cpus. + /// Sourced from DaftExecutionConfig::min_cpu_per_task at task construction time. + min_cpu_per_task: f64, } impl TaskResourceRequest { - pub fn new(resource_request: ResourceRequest) -> Self { - Self { resource_request } + pub fn new(resource_request: ResourceRequest, min_cpu_per_task: f64) -> Self { + Self { + resource_request, + min_cpu_per_task, + } } pub fn num_cpus(&self) -> f64 { - self.resource_request.num_cpus().unwrap_or(1.0) + self.resource_request + .num_cpus() + .unwrap_or(self.min_cpu_per_task) } pub fn num_gpus(&self) -> f64 { @@ -508,7 +516,8 @@ impl SwordfishTaskBuilder { context.insert("plan_fingerprint".to_string(), plan_fingerprint.to_string()); // Extract resource_request from plan - let resource_request = TaskResourceRequest::new(self.plan.resource_request()); + let resource_request = + TaskResourceRequest::new(self.plan.resource_request(), self.config.min_cpu_per_task); // Mark the root of the local plan so the worker's NodeInfo carries // `is_task_root` on the StatSnapshot it ships back. `is_task_leaf` is @@ -680,7 +689,10 @@ pub(super) mod tests { task_name: String::new(), priority: MockTaskPriority { priority: 0 }, scheduling_strategy: SchedulingStrategy::Spread, - resource_request: TaskResourceRequest::new(ResourceRequest::default()), + resource_request: TaskResourceRequest::new( + ResourceRequest::default(), + DaftExecutionConfig::default().min_cpu_per_task, + ), task_result: crate::pipeline_node::MaterializedOutput::new( vec![partition_ref], "".into(), @@ -699,7 +711,10 @@ pub(super) mod tests { } pub fn with_resource_request(mut self, resource_request: ResourceRequest) -> Self { - self.resource_request = TaskResourceRequest::new(resource_request); + self.resource_request = TaskResourceRequest::new( + resource_request, + DaftExecutionConfig::default().min_cpu_per_task, + ); self } From 034ec2bd7e34b4720cd8459417e42cc58af39882 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sat, 13 Jun 2026 12:06:16 -0400 Subject: [PATCH 02/21] test: cover min_cpu_per_task wiring; clarify field doc - Comment said 'Floor' but the implementation is 'Default-when-None' (explicit num_cpus is honored as-is). Reword to match. - Add two unit tests: * num_cpus falls back to min_cpu_per_task when ResourceRequest is empty * explicit num_cpus passes through unchanged --- src/daft-distributed/src/scheduling/task.rs | 31 +++++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/src/daft-distributed/src/scheduling/task.rs b/src/daft-distributed/src/scheduling/task.rs index 4f56008f951..4f327fa94fc 100644 --- a/src/daft-distributed/src/scheduling/task.rs +++ b/src/daft-distributed/src/scheduling/task.rs @@ -23,8 +23,9 @@ use crate::{ #[derive(Debug, Clone)] pub(crate) struct TaskResourceRequest { pub resource_request: ResourceRequest, - /// Floor applied when the plan's ResourceRequest does not specify num_cpus. - /// Sourced from DaftExecutionConfig::min_cpu_per_task at task construction time. + /// Default used by `num_cpus()` when the plan's ResourceRequest leaves + /// `num_cpus` unset. Sourced from `DaftExecutionConfig::min_cpu_per_task` + /// at task construction. Explicit `num_cpus` values pass through unchanged. min_cpu_per_task: f64, } @@ -599,6 +600,32 @@ pub(super) mod tests { use super::*; use crate::utils::channel::OneshotSender; + #[test] + fn num_cpus_uses_min_cpu_per_task_when_unset() { + // Plan has no explicit num_cpus -> fall back to the configured min. + let r = TaskResourceRequest::new(ResourceRequest::default(), 0.1); + assert_eq!(r.num_cpus(), 0.1); + + // Default config still produces the historical 0.5 floor when unset. + let r = TaskResourceRequest::new( + ResourceRequest::default(), + DaftExecutionConfig::default().min_cpu_per_task, + ); + assert_eq!(r.num_cpus(), 0.5); + } + + #[test] + fn num_cpus_respects_explicit_request() { + // Explicit num_cpus on the plan is honored even when below the config min. + let explicit = ResourceRequest::try_new_internal(Some(2.0), None, None).unwrap(); + let r = TaskResourceRequest::new(explicit, 0.1); + assert_eq!(r.num_cpus(), 2.0); + + let explicit = ResourceRequest::try_new_internal(Some(0.25), None, None).unwrap(); + let r = TaskResourceRequest::new(explicit, 0.5); + assert_eq!(r.num_cpus(), 0.25); + } + #[derive(Debug)] pub struct MockPartition { num_rows: usize, From a0f07e5256b152395c271cb913dd5e05bf7a0760 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sat, 13 Jun 2026 21:33:55 -0400 Subject: [PATCH 03/21] fix(flotilla): keep min_cpu_per_task default at 1.0 (no-regression) Before this PR, the flotilla scheduler fell back to a hardcoded 1.0 CPU when ResourceRequest had no num_cpus; the configured min_cpu_per_task default of 0.5 was inert. Wiring the field through (1d05985e8) without changing the default would silently halve the per-task CPU floor for every existing user, which Greptile's review flagged as a behaviour change risk on capacity-tuned clusters (e.g. KubeRay packs 2x more tasks before scale-out, increasing OOM risk). Move the default to 1.0 so the no-explicit-num_cpus path matches the pre-wiring behaviour exactly. Users who want a smaller floor set it explicitly via daft.set_execution_config(min_cpu_per_task=...) or DAFT_MIN_CPU_PER_TASK, which is the original purpose of the knob. --- daft/context.py | 2 +- src/common/daft-config/src/lib.rs | 6 +++--- src/daft-distributed/src/scheduling/task.rs | 5 +++-- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/daft/context.py b/daft/context.py index 8d53a8e4a11..f93e7fd9197 100644 --- a/daft/context.py +++ b/daft/context.py @@ -285,7 +285,7 @@ def set_execution_config( pre_shuffle_merge_partition_threshold: Number of partitions threshold to enable pre-shuffle merge when shuffle_algorithm is "auto". Defaults to 200. scantask_max_parallel: Set the max parallelism for running scan tasks simultaneously. Currently, this only works for Native Runner. If set to 0, all available CPUs will be used. Defaults to 8. native_parquet_writer: Whether to use the native parquet writer vs the pyarrow parquet writer. Defaults to `True`. - min_cpu_per_task: Minimum CPU per task in the Ray runner. Defaults to 0.5. + min_cpu_per_task: Minimum CPU per task in the Ray runner. Defaults to 1.0. actor_udf_ready_timeout: Timeout for UDF actors to be ready. Defaults to 120 seconds. maintain_order: Whether to maintain order during execution. Defaults to True. Some blocking sink operators (e.g. write_parquet) won't respect this flag and will always keep maintain_order as false, and propagate to child operators. It's useful to set this to False for running df.collect() when no ordering is required. enable_dynamic_batching: Whether to enable dynamic batching. Defaults to False. diff --git a/src/common/daft-config/src/lib.rs b/src/common/daft-config/src/lib.rs index 53c61bf79b5..7ea90ff4843 100644 --- a/src/common/daft-config/src/lib.rs +++ b/src/common/daft-config/src/lib.rs @@ -192,7 +192,7 @@ impl Default for DaftExecutionConfig { pre_shuffle_merge_partition_threshold: 200, scantask_max_parallel: 8, native_parquet_writer: true, - min_cpu_per_task: 0.5, + min_cpu_per_task: 1.0, actor_udf_ready_timeout: 120, maintain_order: true, enable_dynamic_batching: false, @@ -513,7 +513,7 @@ mod tests { // ENV_DAFT_MIN_CPU_PER_TASK { let cfg = DaftExecutionConfig::from_env(); - assert_eq!(cfg.min_cpu_per_task, 0.5); + assert_eq!(cfg.min_cpu_per_task, 1.0); unsafe { std::env::set_var(DaftExecutionConfig::ENV_DAFT_MIN_CPU_PER_TASK, "0.1"); @@ -525,7 +525,7 @@ mod tests { std::env::set_var(DaftExecutionConfig::ENV_DAFT_MIN_CPU_PER_TASK, "invalid"); } let cfg = DaftExecutionConfig::from_env(); - assert_eq!(cfg.min_cpu_per_task, 0.5); + assert_eq!(cfg.min_cpu_per_task, 1.0); unsafe { std::env::remove_var(DaftExecutionConfig::ENV_DAFT_MIN_CPU_PER_TASK); diff --git a/src/daft-distributed/src/scheduling/task.rs b/src/daft-distributed/src/scheduling/task.rs index 4f327fa94fc..f20d4378634 100644 --- a/src/daft-distributed/src/scheduling/task.rs +++ b/src/daft-distributed/src/scheduling/task.rs @@ -606,12 +606,13 @@ pub(super) mod tests { let r = TaskResourceRequest::new(ResourceRequest::default(), 0.1); assert_eq!(r.num_cpus(), 0.1); - // Default config still produces the historical 0.5 floor when unset. + // The default config preserves the pre-fix hardcoded fallback of 1.0, + // so no plan without an explicit num_cpus changes behavior. let r = TaskResourceRequest::new( ResourceRequest::default(), DaftExecutionConfig::default().min_cpu_per_task, ); - assert_eq!(r.num_cpus(), 0.5); + assert_eq!(r.num_cpus(), 1.0); } #[test] From 2b7ce3138aa5241b9e32b54bc5aa511987203a23 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Mon, 15 Jun 2026 00:14:30 -0400 Subject: [PATCH 04/21] test(flotilla): reuse builder's min_cpu_per_task in with_resource_request MockTaskBuilder.with_resource_request was constructing a fresh DaftExecutionConfig::default() to fetch the min_cpu_per_task fallback, which silently reset any non-default value already set on the builder. Reuse self.resource_request.min_cpu_per_task instead so the fallback threads through chained .with_* calls correctly. Test-helper-only; no production behavior change. Addresses greptile P2 review comment. --- src/daft-distributed/src/scheduling/task.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/daft-distributed/src/scheduling/task.rs b/src/daft-distributed/src/scheduling/task.rs index f20d4378634..0e86169857b 100644 --- a/src/daft-distributed/src/scheduling/task.rs +++ b/src/daft-distributed/src/scheduling/task.rs @@ -741,7 +741,7 @@ pub(super) mod tests { pub fn with_resource_request(mut self, resource_request: ResourceRequest) -> Self { self.resource_request = TaskResourceRequest::new( resource_request, - DaftExecutionConfig::default().min_cpu_per_task, + self.resource_request.min_cpu_per_task, ); self } From ab1e67abc5777563b68701c8884740a580186b99 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Tue, 16 Jun 2026 23:42:43 -0700 Subject: [PATCH 05/21] fix(flotilla): apply min_cpu_per_task as floor and reject non-positive values --- daft/context.py | 2 +- src/common/daft-config/src/lib.rs | 16 ++++++++++++- src/common/daft-config/src/python.rs | 5 +++++ src/daft-distributed/src/scheduling/task.rs | 25 +++++++++------------ 4 files changed, 31 insertions(+), 17 deletions(-) diff --git a/daft/context.py b/daft/context.py index f93e7fd9197..ee884cf74b7 100644 --- a/daft/context.py +++ b/daft/context.py @@ -285,7 +285,7 @@ def set_execution_config( pre_shuffle_merge_partition_threshold: Number of partitions threshold to enable pre-shuffle merge when shuffle_algorithm is "auto". Defaults to 200. scantask_max_parallel: Set the max parallelism for running scan tasks simultaneously. Currently, this only works for Native Runner. If set to 0, all available CPUs will be used. Defaults to 8. native_parquet_writer: Whether to use the native parquet writer vs the pyarrow parquet writer. Defaults to `True`. - min_cpu_per_task: Minimum CPU per task in the Ray runner. Defaults to 1.0. + min_cpu_per_task: Floor on CPU allocation per task; plans with smaller explicit num_cpus are raised to this. Used by the flotilla scheduler for autoscaler bundle requests. Must be > 0. Defaults to 1.0. actor_udf_ready_timeout: Timeout for UDF actors to be ready. Defaults to 120 seconds. maintain_order: Whether to maintain order during execution. Defaults to True. Some blocking sink operators (e.g. write_parquet) won't respect this flag and will always keep maintain_order as false, and propagate to child operators. It's useful to set this to False for running df.collect() when no ordering is required. enable_dynamic_batching: Whether to enable dynamic batching. Defaults to False. diff --git a/src/common/daft-config/src/lib.rs b/src/common/daft-config/src/lib.rs index 7ea90ff4843..d4a8977c77a 100644 --- a/src/common/daft-config/src/lib.rs +++ b/src/common/daft-config/src/lib.rs @@ -241,7 +241,9 @@ impl DaftExecutionConfig { if let Some(val) = parse_number_from_env(Self::ENV_DAFT_MIN_CPU_PER_TASK, cfg.min_cpu_per_task) { - cfg.min_cpu_per_task = val; + if val > 0.0 { + cfg.min_cpu_per_task = val; + } } if let Some(val) = parse_number_from_env( @@ -527,6 +529,18 @@ mod tests { let cfg = DaftExecutionConfig::from_env(); assert_eq!(cfg.min_cpu_per_task, 1.0); + unsafe { + std::env::set_var(DaftExecutionConfig::ENV_DAFT_MIN_CPU_PER_TASK, "0"); + } + let cfg = DaftExecutionConfig::from_env(); + assert_eq!(cfg.min_cpu_per_task, 1.0); + + unsafe { + std::env::set_var(DaftExecutionConfig::ENV_DAFT_MIN_CPU_PER_TASK, "-0.5"); + } + let cfg = DaftExecutionConfig::from_env(); + assert_eq!(cfg.min_cpu_per_task, 1.0); + unsafe { std::env::remove_var(DaftExecutionConfig::ENV_DAFT_MIN_CPU_PER_TASK); } diff --git a/src/common/daft-config/src/python.rs b/src/common/daft-config/src/python.rs index 3f4218e69f5..661eb244112 100644 --- a/src/common/daft-config/src/python.rs +++ b/src/common/daft-config/src/python.rs @@ -269,6 +269,11 @@ impl PyDaftExecutionConfig { } if let Some(min_cpu_per_task) = min_cpu_per_task { + if min_cpu_per_task <= 0.0 { + return Err(PyErr::new::(format!( + "min_cpu_per_task must be > 0, got {min_cpu_per_task}" + ))); + } config.min_cpu_per_task = min_cpu_per_task; } diff --git a/src/daft-distributed/src/scheduling/task.rs b/src/daft-distributed/src/scheduling/task.rs index 0e86169857b..49a6a082e5a 100644 --- a/src/daft-distributed/src/scheduling/task.rs +++ b/src/daft-distributed/src/scheduling/task.rs @@ -23,9 +23,7 @@ use crate::{ #[derive(Debug, Clone)] pub(crate) struct TaskResourceRequest { pub resource_request: ResourceRequest, - /// Default used by `num_cpus()` when the plan's ResourceRequest leaves - /// `num_cpus` unset. Sourced from `DaftExecutionConfig::min_cpu_per_task` - /// at task construction. Explicit `num_cpus` values pass through unchanged. + /// Floor applied by `num_cpus()`. Sourced from `DaftExecutionConfig::min_cpu_per_task`. min_cpu_per_task: f64, } @@ -40,7 +38,7 @@ impl TaskResourceRequest { pub fn num_cpus(&self) -> f64 { self.resource_request .num_cpus() - .unwrap_or(self.min_cpu_per_task) + .map_or(self.min_cpu_per_task, |v| v.max(self.min_cpu_per_task)) } pub fn num_gpus(&self) -> f64 { @@ -602,12 +600,9 @@ pub(super) mod tests { #[test] fn num_cpus_uses_min_cpu_per_task_when_unset() { - // Plan has no explicit num_cpus -> fall back to the configured min. let r = TaskResourceRequest::new(ResourceRequest::default(), 0.1); assert_eq!(r.num_cpus(), 0.1); - // The default config preserves the pre-fix hardcoded fallback of 1.0, - // so no plan without an explicit num_cpus changes behavior. let r = TaskResourceRequest::new( ResourceRequest::default(), DaftExecutionConfig::default().min_cpu_per_task, @@ -616,15 +611,17 @@ pub(super) mod tests { } #[test] - fn num_cpus_respects_explicit_request() { - // Explicit num_cpus on the plan is honored even when below the config min. + fn num_cpus_passes_through_when_above_floor() { let explicit = ResourceRequest::try_new_internal(Some(2.0), None, None).unwrap(); - let r = TaskResourceRequest::new(explicit, 0.1); + let r = TaskResourceRequest::new(explicit, 0.5); assert_eq!(r.num_cpus(), 2.0); + } + #[test] + fn num_cpus_raises_explicit_to_floor() { let explicit = ResourceRequest::try_new_internal(Some(0.25), None, None).unwrap(); let r = TaskResourceRequest::new(explicit, 0.5); - assert_eq!(r.num_cpus(), 0.25); + assert_eq!(r.num_cpus(), 0.5); } #[derive(Debug)] @@ -739,10 +736,8 @@ pub(super) mod tests { } pub fn with_resource_request(mut self, resource_request: ResourceRequest) -> Self { - self.resource_request = TaskResourceRequest::new( - resource_request, - self.resource_request.min_cpu_per_task, - ); + self.resource_request = + TaskResourceRequest::new(resource_request, self.resource_request.min_cpu_per_task); self } From 48a4d2aa8f28ac82c8aa4eaaf8f96c5e764cae11 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Wed, 17 Jun 2026 00:05:14 -0700 Subject: [PATCH 06/21] fix(clippy): collapse let-chain in min_cpu_per_task env guard --- src/common/daft-config/src/lib.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/common/daft-config/src/lib.rs b/src/common/daft-config/src/lib.rs index d4a8977c77a..0dcee1d823f 100644 --- a/src/common/daft-config/src/lib.rs +++ b/src/common/daft-config/src/lib.rs @@ -240,10 +240,9 @@ impl DaftExecutionConfig { if let Some(val) = parse_number_from_env(Self::ENV_DAFT_MIN_CPU_PER_TASK, cfg.min_cpu_per_task) + && val > 0.0 { - if val > 0.0 { - cfg.min_cpu_per_task = val; - } + cfg.min_cpu_per_task = val; } if let Some(val) = parse_number_from_env( From c36c458ee53ff68ed0e65097913cb10b8cb1df58 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Fri, 19 Jun 2026 09:21:44 -0700 Subject: [PATCH 07/21] fix(plan): leave num_cpus unset so min_cpu_per_task can lower task CPU --- src/daft-distributed/src/scheduling/task.rs | 11 ++++------- src/daft-local-plan/src/plan.rs | 10 +++++++++- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/src/daft-distributed/src/scheduling/task.rs b/src/daft-distributed/src/scheduling/task.rs index 49a6a082e5a..4185d2ecc3d 100644 --- a/src/daft-distributed/src/scheduling/task.rs +++ b/src/daft-distributed/src/scheduling/task.rs @@ -23,7 +23,7 @@ use crate::{ #[derive(Debug, Clone)] pub(crate) struct TaskResourceRequest { pub resource_request: ResourceRequest, - /// Floor applied by `num_cpus()`. Sourced from `DaftExecutionConfig::min_cpu_per_task`. + /// Fallback for `num_cpus()` when the plan leaves it unset. min_cpu_per_task: f64, } @@ -38,7 +38,7 @@ impl TaskResourceRequest { pub fn num_cpus(&self) -> f64 { self.resource_request .num_cpus() - .map_or(self.min_cpu_per_task, |v| v.max(self.min_cpu_per_task)) + .unwrap_or(self.min_cpu_per_task) } pub fn num_gpus(&self) -> f64 { @@ -611,17 +611,14 @@ pub(super) mod tests { } #[test] - fn num_cpus_passes_through_when_above_floor() { + fn num_cpus_honors_explicit_request() { let explicit = ResourceRequest::try_new_internal(Some(2.0), None, None).unwrap(); let r = TaskResourceRequest::new(explicit, 0.5); assert_eq!(r.num_cpus(), 2.0); - } - #[test] - fn num_cpus_raises_explicit_to_floor() { let explicit = ResourceRequest::try_new_internal(Some(0.25), None, None).unwrap(); let r = TaskResourceRequest::new(explicit, 0.5); - assert_eq!(r.num_cpus(), 0.5); + assert_eq!(r.num_cpus(), 0.25); } #[derive(Debug)] diff --git a/src/daft-local-plan/src/plan.rs b/src/daft-local-plan/src/plan.rs index d3ca66b2180..10415fe1ae1 100644 --- a/src/daft-local-plan/src/plan.rs +++ b/src/daft-local-plan/src/plan.rs @@ -1231,7 +1231,8 @@ impl LocalPhysicalPlan { } pub fn resource_request(self: &Arc) -> ResourceRequest { - let mut base = ResourceRequest::default_cpu(); + // Leave num_cpus unset so task layer can fall back to min_cpu_per_task. + let mut base = ResourceRequest::default(); self.apply(|plan| match plan.as_ref() { Self::UDFProject(UDFProject { expr, @@ -2511,6 +2512,13 @@ mod task_topology_tests { ) } + #[test] + fn resource_request_leaves_num_cpus_unset_for_plain_plans() { + let plan = limit(scan(), 5); + let rr = plan.resource_request(); + assert!(rr.num_cpus().is_none()); + } + /// Constructors set `is_task_leaf` based on the node's children — leaves /// (no children) get `true`, others get `false` — regardless of how the /// caller-supplied `LocalNodeContext` was initialised. From cd6ea1fa3850e6dd4384b5f2e83aa885a3e1eaa7 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sat, 20 Jun 2026 00:32:18 -0700 Subject: [PATCH 08/21] docs: align min_cpu_per_task docstring with fallback (not floor) semantics --- daft/context.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/daft/context.py b/daft/context.py index ee884cf74b7..c801cdd12b9 100644 --- a/daft/context.py +++ b/daft/context.py @@ -285,7 +285,7 @@ def set_execution_config( pre_shuffle_merge_partition_threshold: Number of partitions threshold to enable pre-shuffle merge when shuffle_algorithm is "auto". Defaults to 200. scantask_max_parallel: Set the max parallelism for running scan tasks simultaneously. Currently, this only works for Native Runner. If set to 0, all available CPUs will be used. Defaults to 8. native_parquet_writer: Whether to use the native parquet writer vs the pyarrow parquet writer. Defaults to `True`. - min_cpu_per_task: Floor on CPU allocation per task; plans with smaller explicit num_cpus are raised to this. Used by the flotilla scheduler for autoscaler bundle requests. Must be > 0. Defaults to 1.0. + min_cpu_per_task: Default CPU per task when a plan does not specify num_cpus. Explicit num_cpus on a plan passes through unchanged. Used by the flotilla scheduler for autoscaler bundle requests. Must be > 0. Defaults to 1.0. actor_udf_ready_timeout: Timeout for UDF actors to be ready. Defaults to 120 seconds. maintain_order: Whether to maintain order during execution. Defaults to True. Some blocking sink operators (e.g. write_parquet) won't respect this flag and will always keep maintain_order as false, and propagate to child operators. It's useful to set this to False for running df.collect() when no ordering is required. enable_dynamic_batching: Whether to enable dynamic batching. Defaults to False. From 710c932669ea5aaff2aa961931e025b5681f7652 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sat, 20 Jun 2026 00:45:19 -0700 Subject: [PATCH 09/21] fix(autoscale): use TaskResourceRequest wrappers so min_cpu_per_task fallback applies --- src/daft-distributed/src/python/ray/worker_manager.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/daft-distributed/src/python/ray/worker_manager.rs b/src/daft-distributed/src/python/ray/worker_manager.rs index 4ac2c616ef7..68d030c65e4 100644 --- a/src/daft-distributed/src/python/ray/worker_manager.rs +++ b/src/daft-distributed/src/python/ray/worker_manager.rs @@ -272,9 +272,10 @@ impl WorkerManager for RayWorkerManager { let mut surpassed = false; let mut selected_bundles = Vec::new(); for bundle in &bundles { - cpu_sum += bundle.resource_request.num_cpus().unwrap_or(0.0); - gpu_sum += bundle.resource_request.num_gpus().unwrap_or(0.0); - memory_sum += bundle.resource_request.memory_bytes().unwrap_or(0); + // Use wrapper methods so the min_cpu_per_task fallback applies. + cpu_sum += bundle.num_cpus(); + gpu_sum += bundle.num_gpus(); + memory_sum += bundle.memory_bytes(); selected_bundles.push(bundle); if cpu_sum > high_water_mark_cpus || gpu_sum > high_water_mark_gpus From f0ee20cfaa76f9528a2ff006f22cb67d8505d2f0 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sat, 20 Jun 2026 07:33:44 -0700 Subject: [PATCH 10/21] fix(flotilla): send fractional CPU to Ray autoscaler and reject non-finite min_cpu_per_task Two follow-ups from review of the min_cpu_per_task wiring: - The Ray autoscaler bundle still rounded CPU up via `num_cpus().ceil()`, so a configured `min_cpu_per_task=0.1` reached `request_resources` as `{"CPU": 1}` (issue #7123). Add `TaskResourceRequest::autoscale_bundle` which keeps CPU fractional (GPU/memory stay integer, zero values dropped) and build the Ray bundle as a `PyDict` with a float CPU. Update the `try_autoscale` type hint accordingly. - `min_cpu_per_task` validation let NaN/inf through (`nan <= 0` and `inf <= 0` are both false). Use `is_finite() && > 0` on both the Python setter and the env path. Add Rust unit tests for the fractional bundle and env validation, and a Python test covering acceptance of fractional values and rejection of 0/-0.5/nan/inf. --- daft/runners/flotilla.py | 2 +- src/common/daft-config/src/lib.rs | 24 +++++++++- src/common/daft-config/src/python.rs | 4 +- .../src/python/ray/worker_manager.rs | 35 ++++++++------- src/daft-distributed/src/scheduling/task.rs | 44 +++++++++++++++++++ tests/test_context.py | 11 +++++ 6 files changed, 98 insertions(+), 22 deletions(-) diff --git a/daft/runners/flotilla.py b/daft/runners/flotilla.py index 05e75a312b4..30f568469f8 100644 --- a/daft/runners/flotilla.py +++ b/daft/runners/flotilla.py @@ -566,7 +566,7 @@ def start_ray_workers( return handles -def try_autoscale(bundles: list[dict[str, int]]) -> None: +def try_autoscale(bundles: list[dict[str, float]]) -> None: from ray.autoscaler.sdk import request_resources request_resources( diff --git a/src/common/daft-config/src/lib.rs b/src/common/daft-config/src/lib.rs index 0dcee1d823f..28fdc04fb08 100644 --- a/src/common/daft-config/src/lib.rs +++ b/src/common/daft-config/src/lib.rs @@ -240,9 +240,17 @@ impl DaftExecutionConfig { if let Some(val) = parse_number_from_env(Self::ENV_DAFT_MIN_CPU_PER_TASK, cfg.min_cpu_per_task) - && val > 0.0 { - cfg.min_cpu_per_task = val; + if val.is_finite() && val > 0.0 { + cfg.min_cpu_per_task = val; + } else { + eprintln!( + "Invalid {} value: {}, must be a finite number > 0, using default {}", + Self::ENV_DAFT_MIN_CPU_PER_TASK, + val, + cfg.min_cpu_per_task + ); + } } if let Some(val) = parse_number_from_env( @@ -540,6 +548,18 @@ mod tests { let cfg = DaftExecutionConfig::from_env(); assert_eq!(cfg.min_cpu_per_task, 1.0); + unsafe { + std::env::set_var(DaftExecutionConfig::ENV_DAFT_MIN_CPU_PER_TASK, "nan"); + } + let cfg = DaftExecutionConfig::from_env(); + assert_eq!(cfg.min_cpu_per_task, 1.0); + + unsafe { + std::env::set_var(DaftExecutionConfig::ENV_DAFT_MIN_CPU_PER_TASK, "inf"); + } + let cfg = DaftExecutionConfig::from_env(); + assert_eq!(cfg.min_cpu_per_task, 1.0); + unsafe { std::env::remove_var(DaftExecutionConfig::ENV_DAFT_MIN_CPU_PER_TASK); } diff --git a/src/common/daft-config/src/python.rs b/src/common/daft-config/src/python.rs index 661eb244112..5af6c04a2db 100644 --- a/src/common/daft-config/src/python.rs +++ b/src/common/daft-config/src/python.rs @@ -269,9 +269,9 @@ impl PyDaftExecutionConfig { } if let Some(min_cpu_per_task) = min_cpu_per_task { - if min_cpu_per_task <= 0.0 { + if !min_cpu_per_task.is_finite() || min_cpu_per_task <= 0.0 { return Err(PyErr::new::(format!( - "min_cpu_per_task must be > 0, got {min_cpu_per_task}" + "min_cpu_per_task must be a finite number > 0, got {min_cpu_per_task}" ))); } config.min_cpu_per_task = min_cpu_per_task; diff --git a/src/daft-distributed/src/python/ray/worker_manager.rs b/src/daft-distributed/src/python/ray/worker_manager.rs index 68d030c65e4..323f2ff2ba1 100644 --- a/src/daft-distributed/src/python/ray/worker_manager.rs +++ b/src/daft-distributed/src/python/ray/worker_manager.rs @@ -296,24 +296,25 @@ impl WorkerManager for RayWorkerManager { // 5. Send the selected bundles to Ray's autoscaler via request_resources(). // Strip zero-valued GPU/memory keys so Ray doesn't interpret them as a demand // for zero-resource bundles on specialized nodes. - let python_bundles = selected_bundles - .iter() - .map(|bundle| { - let mut dict = HashMap::new(); - dict.insert("CPU", bundle.num_cpus().ceil() as i64); - let gpu = bundle.num_gpus().ceil() as i64; - if gpu > 0 { - dict.insert("GPU", gpu); - } - let memory = bundle.memory_bytes() as i64; - if memory > 0 { - dict.insert("memory", memory); - } - dict - }) - .collect::>(); - Python::attach(|py| -> DaftResult<()> { + let python_bundles = selected_bundles + .iter() + .map(|bundle| { + let b = bundle.autoscale_bundle(); + let dict = pyo3::types::PyDict::new(py); + // Keep CPU fractional — Ray supports fractional CPU and + // rounding up would defeat min_cpu_per_task (issue #7123). + dict.set_item("CPU", b.cpu)?; + if let Some(gpu) = b.gpu { + dict.set_item("GPU", gpu)?; + } + if let Some(memory) = b.memory { + dict.set_item("memory", memory)?; + } + Ok::<_, PyErr>(dict) + }) + .collect::, _>>()?; + let flotilla_module = py.import(pyo3::intern!(py, "daft.runners.flotilla"))?; flotilla_module.call_method1(pyo3::intern!(py, "try_autoscale"), (python_bundles,))?; Ok(()) diff --git a/src/daft-distributed/src/scheduling/task.rs b/src/daft-distributed/src/scheduling/task.rs index 4185d2ecc3d..8c1eac9c12b 100644 --- a/src/daft-distributed/src/scheduling/task.rs +++ b/src/daft-distributed/src/scheduling/task.rs @@ -48,6 +48,32 @@ impl TaskResourceRequest { pub fn memory_bytes(&self) -> usize { self.resource_request.memory_bytes().unwrap_or(0) } + + /// Resource amounts for one Ray autoscaler bundle. + /// + /// CPU is kept fractional: Ray supports fractional CPU requests, so a + /// configured `min_cpu_per_task` of e.g. 0.1 must reach the autoscaler as + /// 0.1 rather than being rounded up to 1 (see issue #7123). GPU and memory + /// are integers, and zero values are dropped so Ray does not interpret a + /// bundle as a demand for zero-resource nodes. + pub fn autoscale_bundle(&self) -> AutoscaleBundle { + let gpu = self.num_gpus().ceil() as i64; + let memory = self.memory_bytes() as i64; + AutoscaleBundle { + cpu: self.num_cpus(), + gpu: (gpu > 0).then_some(gpu), + memory: (memory > 0).then_some(memory), + } + } +} + +/// Resource amounts for a single Ray autoscaler bundle. See +/// [`TaskResourceRequest::autoscale_bundle`]. +#[derive(Debug, Clone, PartialEq)] +pub(crate) struct AutoscaleBundle { + pub cpu: f64, + pub gpu: Option, + pub memory: Option, } pub(crate) type TaskID = u32; @@ -621,6 +647,24 @@ pub(super) mod tests { assert_eq!(r.num_cpus(), 0.25); } + #[test] + fn autoscale_bundle_keeps_fractional_cpu() { + // min_cpu_per_task=0.1 must reach the autoscaler as 0.1, not ceil'd to 1 (issue #7123). + let b = TaskResourceRequest::new(ResourceRequest::default(), 0.1).autoscale_bundle(); + assert_eq!(b.cpu, 0.1); + assert_eq!(b.gpu, None); + assert_eq!(b.memory, None); + } + + #[test] + fn autoscale_bundle_rounds_gpu_and_keeps_memory() { + let rr = ResourceRequest::try_new_internal(Some(0.1), Some(1.0), Some(1024)).unwrap(); + let b = TaskResourceRequest::new(rr, 1.0).autoscale_bundle(); + assert_eq!(b.cpu, 0.1); + assert_eq!(b.gpu, Some(1)); + assert_eq!(b.memory, Some(1024)); + } + #[derive(Debug)] pub struct MockPartition { num_rows: usize, diff --git a/tests/test_context.py b/tests/test_context.py index 971f017e6dd..2affde04a2f 100644 --- a/tests/test_context.py +++ b/tests/test_context.py @@ -375,3 +375,14 @@ def test_set_scantask_max_parallelism_greater_than_partition_num(): df = daft.range(start=0, end=1024, partitions=10) df.explain(show_all=True, file=str_io) assert "Num Parallel Scan Tasks = 17" in str_io.getvalue().strip() + + +def test_min_cpu_per_task_accepts_fractional(): + with daft.execution_config_ctx(min_cpu_per_task=0.1): + assert daft.context.get_context().daft_execution_config.min_cpu_per_task == 0.1 + + +@pytest.mark.parametrize("value", [0.0, -0.5, float("nan"), float("inf")]) +def test_min_cpu_per_task_rejects_invalid(value): + with pytest.raises(ValueError, match="min_cpu_per_task"): + daft.set_execution_config(min_cpu_per_task=value) From ad2f52879d4fa1c4b2112a13fbf09bf1b805c376 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sat, 20 Jun 2026 08:03:13 -0700 Subject: [PATCH 11/21] fix(flotilla): aggregate fractional CPU into integer Ray autoscaler bundles MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Ray's request_resources (<= 2.55; Daft pins ray==2.55.1) rejects non-integer bundle values: `isinstance(bundle[key], int)` raises `TypeError: each bundle key should be str and value as int.`. Sending a float CPU therefore crashes at runtime — even a whole 1.0, since pyo3 emits a Python float. Replace the per-task float bundle with `aggregate_ray_bundles`: CPU-only tasks have their fractional CPU summed and emitted as ceil(sum) unit {"CPU": 1} bundles, so N tasks at 0.1 CPU request ceil(0.1*N) CPUs instead of N (issue #7123), while never sending a non-integer value. Tasks carrying GPU or memory keep an individual bundle (CPU rounded up) since those resources pin placement to a node. Revert the try_autoscale type hint to dict[str, int]. Replace the fractional bundle unit tests with aggregation tests. --- daft/runners/flotilla.py | 2 +- .../src/python/ray/worker_manager.rs | 23 ++-- src/daft-distributed/src/scheduling/task.rs | 122 +++++++++++++----- 3 files changed, 103 insertions(+), 44 deletions(-) diff --git a/daft/runners/flotilla.py b/daft/runners/flotilla.py index 30f568469f8..05e75a312b4 100644 --- a/daft/runners/flotilla.py +++ b/daft/runners/flotilla.py @@ -566,7 +566,7 @@ def start_ray_workers( return handles -def try_autoscale(bundles: list[dict[str, float]]) -> None: +def try_autoscale(bundles: list[dict[str, int]]) -> None: from ray.autoscaler.sdk import request_resources request_resources( diff --git a/src/daft-distributed/src/python/ray/worker_manager.rs b/src/daft-distributed/src/python/ray/worker_manager.rs index 323f2ff2ba1..c2f106852a5 100644 --- a/src/daft-distributed/src/python/ray/worker_manager.rs +++ b/src/daft-distributed/src/python/ray/worker_manager.rs @@ -11,7 +11,7 @@ use pyo3::prelude::*; use super::{task::RayTaskResultHandle, worker::RaySwordfishWorker}; use crate::scheduling::{ scheduler::WorkerSnapshot, - task::{SwordfishTask, TaskContext, TaskResourceRequest}, + task::{SwordfishTask, TaskContext, TaskResourceRequest, aggregate_ray_bundles}, worker::{Worker, WorkerId, WorkerManager}, }; @@ -294,21 +294,24 @@ impl WorkerManager for RayWorkerManager { } // 5. Send the selected bundles to Ray's autoscaler via request_resources(). - // Strip zero-valued GPU/memory keys so Ray doesn't interpret them as a demand - // for zero-resource bundles on specialized nodes. + // Ray (as of 2.55) rejects non-integer bundle values, so a fractional + // min_cpu_per_task cannot be sent per task directly. aggregate_ray_bundles + // sums CPU-only fractional demand into ceil(sum) unit {CPU:1} bundles so + // the autoscaler is not asked for one full CPU per task (issue #7123), + // while GPU/memory tasks keep their own (placement-pinning) bundle. + // Zero-valued GPU/memory keys are omitted so Ray doesn't interpret them as + // a demand for zero-resource bundles on specialized nodes. + let ray_bundles = aggregate_ray_bundles(&selected_bundles); Python::attach(|py| -> DaftResult<()> { - let python_bundles = selected_bundles + let python_bundles = ray_bundles .iter() .map(|bundle| { - let b = bundle.autoscale_bundle(); let dict = pyo3::types::PyDict::new(py); - // Keep CPU fractional — Ray supports fractional CPU and - // rounding up would defeat min_cpu_per_task (issue #7123). - dict.set_item("CPU", b.cpu)?; - if let Some(gpu) = b.gpu { + dict.set_item("CPU", bundle.cpu)?; + if let Some(gpu) = bundle.gpu { dict.set_item("GPU", gpu)?; } - if let Some(memory) = b.memory { + if let Some(memory) = bundle.memory { dict.set_item("memory", memory)?; } Ok::<_, PyErr>(dict) diff --git a/src/daft-distributed/src/scheduling/task.rs b/src/daft-distributed/src/scheduling/task.rs index 8c1eac9c12b..57c8d0f2b2b 100644 --- a/src/daft-distributed/src/scheduling/task.rs +++ b/src/daft-distributed/src/scheduling/task.rs @@ -48,34 +48,54 @@ impl TaskResourceRequest { pub fn memory_bytes(&self) -> usize { self.resource_request.memory_bytes().unwrap_or(0) } - - /// Resource amounts for one Ray autoscaler bundle. - /// - /// CPU is kept fractional: Ray supports fractional CPU requests, so a - /// configured `min_cpu_per_task` of e.g. 0.1 must reach the autoscaler as - /// 0.1 rather than being rounded up to 1 (see issue #7123). GPU and memory - /// are integers, and zero values are dropped so Ray does not interpret a - /// bundle as a demand for zero-resource nodes. - pub fn autoscale_bundle(&self) -> AutoscaleBundle { - let gpu = self.num_gpus().ceil() as i64; - let memory = self.memory_bytes() as i64; - AutoscaleBundle { - cpu: self.num_cpus(), - gpu: (gpu > 0).then_some(gpu), - memory: (memory > 0).then_some(memory), - } - } } -/// Resource amounts for a single Ray autoscaler bundle. See -/// [`TaskResourceRequest::autoscale_bundle`]. +/// An integer-valued resource bundle for Ray's autoscaler `request_resources`, +/// which (as of Ray 2.55) rejects non-integer bundle values. #[derive(Debug, Clone, PartialEq)] -pub(crate) struct AutoscaleBundle { - pub cpu: f64, +pub(crate) struct RayBundle { + pub cpu: i64, pub gpu: Option, pub memory: Option, } +/// Aggregate per-task resource requests into integer Ray autoscaler bundles. +/// +/// Ray's `request_resources` requires integer bundle values, so a fractional +/// `min_cpu_per_task` (e.g. 0.1) cannot be expressed per task directly. CPU-only +/// tasks therefore have their fractional CPU summed and emitted as `ceil(sum)` +/// unit `{"CPU": 1}` bundles — so N tasks at 0.1 CPU request `ceil(0.1 * N)` CPUs +/// rather than N (issue #7123). Tasks carrying GPU or memory keep an individual +/// bundle (CPU rounded up) because those resources pin placement and must stay +/// co-located on a single node. +pub(crate) fn aggregate_ray_bundles(requests: &[&TaskResourceRequest]) -> Vec { + let mut cpu_only_sum = 0.0; + let mut bundles = Vec::new(); + for request in requests { + let gpu = request.num_gpus().ceil() as i64; + let memory = request.memory_bytes() as i64; + let gpu = (gpu > 0).then_some(gpu); + let memory = (memory > 0).then_some(memory); + if gpu.is_some() || memory.is_some() { + bundles.push(RayBundle { + cpu: request.num_cpus().ceil() as i64, + gpu, + memory, + }); + } else { + cpu_only_sum += request.num_cpus(); + } + } + for _ in 0..(cpu_only_sum.ceil() as i64) { + bundles.push(RayBundle { + cpu: 1, + gpu: None, + memory: None, + }); + } + bundles +} + pub(crate) type TaskID = u32; pub(crate) type TaskName = String; @@ -648,21 +668,57 @@ pub(super) mod tests { } #[test] - fn autoscale_bundle_keeps_fractional_cpu() { - // min_cpu_per_task=0.1 must reach the autoscaler as 0.1, not ceil'd to 1 (issue #7123). - let b = TaskResourceRequest::new(ResourceRequest::default(), 0.1).autoscale_bundle(); - assert_eq!(b.cpu, 0.1); - assert_eq!(b.gpu, None); - assert_eq!(b.memory, None); + fn aggregate_ray_bundles_packs_fractional_cpu() { + // 4 CPU-only tasks at 0.25 -> one {CPU:1} bundle, not 4 (issue #7123). + let r = TaskResourceRequest::new(ResourceRequest::default(), 0.25); + let refs = vec![&r, &r, &r, &r]; + assert_eq!( + aggregate_ray_bundles(&refs), + vec![RayBundle { + cpu: 1, + gpu: None, + memory: None + }] + ); + } + + #[test] + fn aggregate_ray_bundles_rounds_partial_cpu_up() { + // 0.1 * 5 = 0.5 -> ceil -> 1 bundle. + let r = TaskResourceRequest::new(ResourceRequest::default(), 0.1); + let refs = vec![&r, &r, &r, &r, &r]; + assert_eq!(aggregate_ray_bundles(&refs).len(), 1); } #[test] - fn autoscale_bundle_rounds_gpu_and_keeps_memory() { - let rr = ResourceRequest::try_new_internal(Some(0.1), Some(1.0), Some(1024)).unwrap(); - let b = TaskResourceRequest::new(rr, 1.0).autoscale_bundle(); - assert_eq!(b.cpu, 0.1); - assert_eq!(b.gpu, Some(1)); - assert_eq!(b.memory, Some(1024)); + fn aggregate_ray_bundles_keeps_gpu_and_memory_bundles() { + let cpu_only = TaskResourceRequest::new(ResourceRequest::default(), 0.5); + let gpu = TaskResourceRequest::new( + ResourceRequest::try_new_internal(Some(0.1), Some(1.0), None).unwrap(), + 1.0, + ); + let mem = TaskResourceRequest::new( + ResourceRequest::try_new_internal(Some(0.1), None, Some(1024)).unwrap(), + 1.0, + ); + let refs = vec![&cpu_only, &gpu, &mem]; + let bundles = aggregate_ray_bundles(&refs); + assert_eq!(bundles.len(), 3); + assert!(bundles.contains(&RayBundle { + cpu: 1, + gpu: Some(1), + memory: None + })); + assert!(bundles.contains(&RayBundle { + cpu: 1, + gpu: None, + memory: Some(1024) + })); + assert!(bundles.contains(&RayBundle { + cpu: 1, + gpu: None, + memory: None + })); } #[derive(Debug)] From 4620acb07c25386699946aa5ba91aab0adae68fa Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sat, 20 Jun 2026 09:02:10 -0700 Subject: [PATCH 12/21] fix(flotilla): keep multi-CPU tasks as individual autoscaler bundles aggregate_ray_bundles packed every CPU-only task into unit {"CPU": 1} bundles. That is wrong for a task requesting num_cpus >= 1: a 4-CPU task runs on one worker, so splitting it into 4 spread bundles lets the autoscaler provision 4 single-CPU nodes and leaves the task unschedulable. It also turned CPU magnitude into the loop count, so a huge or non-finite explicit num_cpus (inf as i64 == i64::MAX) could hang/OOM, and a NaN poisoned the running sum and zeroed the batch's CPU request. Only pack sub-1.0 CPU-only tasks now; tasks with GPU, memory, or num_cpus >= 1 keep an individual bundle (CPU rounded up to at least 1). Non-finite / non-positive CPU contributes nothing. The packed sum is now bounded by task count, so the loop can no longer blow up. --- src/daft-distributed/src/scheduling/task.rs | 64 +++++++++++++++++---- 1 file changed, 52 insertions(+), 12 deletions(-) diff --git a/src/daft-distributed/src/scheduling/task.rs b/src/daft-distributed/src/scheduling/task.rs index 57c8d0f2b2b..b028063ed9b 100644 --- a/src/daft-distributed/src/scheduling/task.rs +++ b/src/daft-distributed/src/scheduling/task.rs @@ -62,31 +62,38 @@ pub(crate) struct RayBundle { /// Aggregate per-task resource requests into integer Ray autoscaler bundles. /// /// Ray's `request_resources` requires integer bundle values, so a fractional -/// `min_cpu_per_task` (e.g. 0.1) cannot be expressed per task directly. CPU-only -/// tasks therefore have their fractional CPU summed and emitted as `ceil(sum)` -/// unit `{"CPU": 1}` bundles — so N tasks at 0.1 CPU request `ceil(0.1 * N)` CPUs -/// rather than N (issue #7123). Tasks carrying GPU or memory keep an individual -/// bundle (CPU rounded up) because those resources pin placement and must stay -/// co-located on a single node. +/// `min_cpu_per_task` (e.g. 0.1) cannot be expressed per task directly. Only +/// sub-CPU, CPU-only tasks are packed: their fractional CPU is summed and emitted +/// as `ceil(sum)` unit `{"CPU": 1}` bundles — so N tasks at 0.1 CPU request +/// `ceil(0.1 * N)` CPUs rather than N (issue #7123). +/// +/// A task keeps its own individual bundle (CPU rounded up to at least 1) when it +/// requests GPU, memory, or `num_cpus >= 1`: those resources pin the task to a +/// single node, so splitting them across unit bundles could make the task +/// unschedulable. Non-finite / non-positive CPU values contribute nothing (the +/// config is validated upstream, but an explicit plan request is not). pub(crate) fn aggregate_ray_bundles(requests: &[&TaskResourceRequest]) -> Vec { - let mut cpu_only_sum = 0.0; + let mut fractional_cpu_sum = 0.0; let mut bundles = Vec::new(); for request in requests { let gpu = request.num_gpus().ceil() as i64; let memory = request.memory_bytes() as i64; let gpu = (gpu > 0).then_some(gpu); let memory = (memory > 0).then_some(memory); - if gpu.is_some() || memory.is_some() { + let cpus = request.num_cpus(); + if gpu.is_some() || memory.is_some() || cpus > 1.0 { bundles.push(RayBundle { - cpu: request.num_cpus().ceil() as i64, + cpu: (cpus.ceil() as i64).max(1), gpu, memory, }); - } else { - cpu_only_sum += request.num_cpus(); + } else if cpus > 0.0 { + // `cpus > 0.0` is false for NaN, so a poisoned value is dropped + // rather than zeroing the whole batch's CPU request. + fractional_cpu_sum += cpus; } } - for _ in 0..(cpu_only_sum.ceil() as i64) { + for _ in 0..(fractional_cpu_sum.ceil() as i64) { bundles.push(RayBundle { cpu: 1, gpu: None, @@ -690,6 +697,39 @@ pub(super) mod tests { assert_eq!(aggregate_ray_bundles(&refs).len(), 1); } + #[test] + fn aggregate_ray_bundles_keeps_multi_cpu_tasks_individual() { + // A task needing 4 CPUs must stay one bundle, not 4 spread unit bundles, + // or the autoscaler could provision 4 single-CPU nodes and leave it + // unschedulable. + let multi = TaskResourceRequest::new( + ResourceRequest::try_new_internal(Some(4.0), None, None).unwrap(), + 1.0, + ); + let frac = TaskResourceRequest::new(ResourceRequest::default(), 0.5); + let refs = vec![&multi, &frac, &frac]; + let bundles = aggregate_ray_bundles(&refs); + assert!(bundles.contains(&RayBundle { + cpu: 4, + gpu: None, + memory: None + })); + // 0.5 + 0.5 = 1.0 -> one unit bundle. + assert_eq!( + bundles + .iter() + .filter(|b| **b + == RayBundle { + cpu: 1, + gpu: None, + memory: None + }) + .count(), + 1 + ); + assert_eq!(bundles.len(), 2); + } + #[test] fn aggregate_ray_bundles_keeps_gpu_and_memory_bundles() { let cpu_only = TaskResourceRequest::new(ResourceRequest::default(), 0.5); From 530337200a7afc72d48292242694e14ee16e9615 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sat, 20 Jun 2026 11:10:39 -0700 Subject: [PATCH 13/21] fix(flotilla): track post-aggregation request as autoscaler high-water mark MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The high-water mark recorded the fractional cpu_sum, but the request actually sent to Ray is the integer-aggregated bundle total. With min_cpu_per_task=0.1 the mark grew ~0.1 per cycle while ceil() only bumped the real CPU request every ~10 cycles, so scale-up for many pending tasks stalled for ~1/min_cpu_per_task cycles (≈50s at the default 5s interval) per extra CPU. Record the aggregated integer bundle totals (what Ray actually receives) as the mark instead. Because each cycle selects bundles until the fractional cpu_sum exceeds the integer mark, ceil() now bumps by at least one CPU every cycle, restoring the intended one-unit-per-cycle ramp while still never requesting less than before. Convergence is unchanged: once pending demand can no longer exceed the mark, the cycle is skipped. Verified: cargo test -p daft-distributed --lib (8 task tests pass), cargo check/clippy -p daft-distributed --features python clean. --- .../src/python/ray/worker_manager.rs | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/src/daft-distributed/src/python/ray/worker_manager.rs b/src/daft-distributed/src/python/ray/worker_manager.rs index c2f106852a5..cffb4290209 100644 --- a/src/daft-distributed/src/python/ray/worker_manager.rs +++ b/src/daft-distributed/src/python/ray/worker_manager.rs @@ -328,10 +328,20 @@ impl WorkerManager for RayWorkerManager { state.pending_release_blacklist.clear(); state.last_refresh = None; - // 6. Record this request as the new high-water mark so the next cycle will - // request exactly one bundle more, and so we never send a smaller request. - state.max_resources_requested = - ResourceRequest::try_new_internal(Some(cpu_sum), Some(gpu_sum), Some(memory_sum))?; + // 6. Record what we actually requested from Ray — the aggregated integer + // bundle totals, not the fractional cpu_sum — as the new high-water mark. + // Tracking the post-aggregation request keeps the mark in the same units + // Ray sees, so each cycle escalates the real request by at least one CPU + // rather than stalling for ~1/min_cpu_per_task cycles before the ceil + // bumps. We never send a smaller request than before. + let requested_cpus: i64 = ray_bundles.iter().map(|b| b.cpu).sum(); + let requested_gpus: i64 = ray_bundles.iter().filter_map(|b| b.gpu).sum(); + let requested_memory: i64 = ray_bundles.iter().filter_map(|b| b.memory).sum(); + state.max_resources_requested = ResourceRequest::try_new_internal( + Some(requested_cpus as f64), + Some(requested_gpus as f64), + Some(requested_memory as usize), + )?; state.last_autoscale_request_time = Some(Instant::now()); Ok(()) From eb091a36efeeab7c8ec8aeb1385388cd81dc7ac7 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sat, 20 Jun 2026 19:46:49 -0700 Subject: [PATCH 14/21] fix(resource-request): reject non-finite and negative num_cpus/num_gpus try_new_internal only checked num_gpus for negativity, so an explicit num_cpus = inf/NaN (or negative) flowed through. Downstream that became a bundle of {"CPU": i64::MAX} (inf as i64 saturates) in the autoscaler. Require both num_cpus and num_gpus to be finite and nonnegative; this also catches a NaN num_gpus that the existing >1-must-be-integer check missed. --- src/common/resource-request/src/lib.rs | 36 ++++++++++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/src/common/resource-request/src/lib.rs b/src/common/resource-request/src/lib.rs index e055bc73489..bf6e144ac87 100644 --- a/src/common/resource-request/src/lib.rs +++ b/src/common/resource-request/src/lib.rs @@ -30,10 +30,19 @@ impl ResourceRequest { num_gpus: Option, memory_bytes: Option, ) -> DaftResult { + if let Some(num_cpus) = num_cpus + && !(num_cpus.is_finite() && num_cpus >= 0.0) + { + return Err(DaftError::ValueError(format!( + "ResourceRequest num_cpus must be a finite, nonnegative number, got {}", + num_cpus + ))); + } + if let Some(num_gpus) = num_gpus { - if num_gpus < 0.0 { + if !(num_gpus.is_finite() && num_gpus >= 0.0) { return Err(DaftError::ValueError(format!( - "ResourceRequest num_gpus must be nonnegative, got {}", + "ResourceRequest num_gpus must be a finite, nonnegative number, got {}", num_gpus ))); } @@ -275,3 +284,26 @@ pub fn register_modules(parent: &Bound) -> PyResult<()> { parent.add_class::()?; Ok(()) } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn rejects_non_finite_or_negative_cpus() { + for bad in [f64::NAN, f64::INFINITY, f64::NEG_INFINITY, -0.5] { + assert!(ResourceRequest::try_new_internal(Some(bad), None, None).is_err()); + } + assert!(ResourceRequest::try_new_internal(Some(0.0), None, None).is_ok()); + assert!(ResourceRequest::try_new_internal(Some(0.25), None, None).is_ok()); + } + + #[test] + fn rejects_non_finite_or_negative_gpus() { + for bad in [f64::NAN, f64::INFINITY, -1.0] { + assert!(ResourceRequest::try_new_internal(None, Some(bad), None).is_err()); + } + assert!(ResourceRequest::try_new_internal(None, Some(0.5), None).is_ok()); + assert!(ResourceRequest::try_new_internal(None, Some(2.0), None).is_ok()); + } +} From d6f11a17cccec55b97e28daaff719a49bbdec120 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sat, 20 Jun 2026 19:46:49 -0700 Subject: [PATCH 15/21] fix(flotilla): pack fractional GPU into integer autoscaler bundles MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit aggregate_ray_bundles only packed fractional CPU; fractional-GPU tasks (Daft supports num_gpus<1) were each emitted as a full {"GPU":1} bundle. Combined with the high-water mark comparing raw fractional gpu_sum against the integer mark, 11 tasks at 0.1 GPU would request 11 GPUs instead of ceil(1.1)=2 — likely tripping "requested bundles exceed max capacity, autoscaler refuses all". Sum sub-GPU demand into ceil(sum) {"CPU":1,"GPU":1} bundles (their small CPU rides on the GPU node, not double-counted); memory/multi-unit tasks still keep an individual placement-pinned bundle. Extract the select-then-aggregate ramp into a pure next_autoscale_request() in task.rs so it's unit-testable outside the python-gated worker manager; add tests for whole-unit CPU/GPU escalation, GPU packing, and the skip-below-mark case. Tighten comments. Verified: cargo test -p daft-distributed --lib (78 pass) and -p common-resource-request; cargo check/clippy --features python clean. --- .../src/python/ray/worker_manager.rs | 61 +++------ src/daft-distributed/src/scheduling/task.rs | 117 ++++++++++++++---- 2 files changed, 112 insertions(+), 66 deletions(-) diff --git a/src/daft-distributed/src/python/ray/worker_manager.rs b/src/daft-distributed/src/python/ray/worker_manager.rs index cffb4290209..17dc0f7c995 100644 --- a/src/daft-distributed/src/python/ray/worker_manager.rs +++ b/src/daft-distributed/src/python/ray/worker_manager.rs @@ -11,7 +11,7 @@ use pyo3::prelude::*; use super::{task::RayTaskResultHandle, worker::RaySwordfishWorker}; use crate::scheduling::{ scheduler::WorkerSnapshot, - task::{SwordfishTask, TaskContext, TaskResourceRequest, aggregate_ray_bundles}, + task::{SwordfishTask, TaskContext, TaskResourceRequest, next_autoscale_request}, worker::{Worker, WorkerId, WorkerManager}, }; @@ -262,46 +262,20 @@ impl WorkerManager for RayWorkerManager { .unwrap_or(0) .max(cluster_memory_bytes); - // 3. Accumulate bundles one at a time until the running total surpasses the - // high-water mark in any resource dimension (CPU, GPU, or memory). This ensures - // each cycle's request is exactly one bundle larger than the previous max — - // gradual enough to avoid exceeding an unknown cluster capacity limit. - let mut cpu_sum = 0.0; - let mut gpu_sum = 0.0; - let mut memory_sum = 0; - let mut surpassed = false; - let mut selected_bundles = Vec::new(); - for bundle in &bundles { - // Use wrapper methods so the min_cpu_per_task fallback applies. - cpu_sum += bundle.num_cpus(); - gpu_sum += bundle.num_gpus(); - memory_sum += bundle.memory_bytes(); - selected_bundles.push(bundle); - if cpu_sum > high_water_mark_cpus - || gpu_sum > high_water_mark_gpus - || memory_sum > high_water_mark_memory - { - surpassed = true; - break; - } - } - - // 4. If we went through all pending bundles without surpassing the high-water mark, - // the remaining demand is smaller than what we previously requested. Skip this - // cycle — Ray still holds our previous (larger) request, so no downscale occurs. - if !surpassed { + // 3. Select the next request: one bundle larger than the previous high-water + // mark, so the cluster ramps up gradually. None means demand hasn't grown + // past the last request, so skip — Ray still holds that (larger) request. + let Some(ray_bundles) = next_autoscale_request( + &bundles, + high_water_mark_cpus, + high_water_mark_gpus, + high_water_mark_memory, + ) else { return Ok(()); - } + }; - // 5. Send the selected bundles to Ray's autoscaler via request_resources(). - // Ray (as of 2.55) rejects non-integer bundle values, so a fractional - // min_cpu_per_task cannot be sent per task directly. aggregate_ray_bundles - // sums CPU-only fractional demand into ceil(sum) unit {CPU:1} bundles so - // the autoscaler is not asked for one full CPU per task (issue #7123), - // while GPU/memory tasks keep their own (placement-pinning) bundle. - // Zero-valued GPU/memory keys are omitted so Ray doesn't interpret them as - // a demand for zero-resource bundles on specialized nodes. - let ray_bundles = aggregate_ray_bundles(&selected_bundles); + // 4. Send the bundles to Ray's autoscaler. Zero-valued GPU/memory keys are + // omitted so Ray doesn't demand zero-resource bundles on specialized nodes. Python::attach(|py| -> DaftResult<()> { let python_bundles = ray_bundles .iter() @@ -328,12 +302,9 @@ impl WorkerManager for RayWorkerManager { state.pending_release_blacklist.clear(); state.last_refresh = None; - // 6. Record what we actually requested from Ray — the aggregated integer - // bundle totals, not the fractional cpu_sum — as the new high-water mark. - // Tracking the post-aggregation request keeps the mark in the same units - // Ray sees, so each cycle escalates the real request by at least one CPU - // rather than stalling for ~1/min_cpu_per_task cycles before the ceil - // bumps. We never send a smaller request than before. + // 5. Record the new high-water mark as the aggregated integer totals we just + // requested — same units Ray sees, so each cycle escalates by a whole CPU/GPU + // instead of stalling for ~1/min_cpu_per_task cycles before the ceil bumps. let requested_cpus: i64 = ray_bundles.iter().map(|b| b.cpu).sum(); let requested_gpus: i64 = ray_bundles.iter().filter_map(|b| b.gpu).sum(); let requested_memory: i64 = ray_bundles.iter().filter_map(|b| b.memory).sum(); diff --git a/src/daft-distributed/src/scheduling/task.rs b/src/daft-distributed/src/scheduling/task.rs index b028063ed9b..89e6c39856c 100644 --- a/src/daft-distributed/src/scheduling/task.rs +++ b/src/daft-distributed/src/scheduling/task.rs @@ -59,37 +59,37 @@ pub(crate) struct RayBundle { pub memory: Option, } -/// Aggregate per-task resource requests into integer Ray autoscaler bundles. +/// Aggregate per-task requests into integer Ray autoscaler bundles (Ray's +/// `request_resources` rejects non-integer values). /// -/// Ray's `request_resources` requires integer bundle values, so a fractional -/// `min_cpu_per_task` (e.g. 0.1) cannot be expressed per task directly. Only -/// sub-CPU, CPU-only tasks are packed: their fractional CPU is summed and emitted -/// as `ceil(sum)` unit `{"CPU": 1}` bundles — so N tasks at 0.1 CPU request -/// `ceil(0.1 * N)` CPUs rather than N (issue #7123). -/// -/// A task keeps its own individual bundle (CPU rounded up to at least 1) when it -/// requests GPU, memory, or `num_cpus >= 1`: those resources pin the task to a -/// single node, so splitting them across unit bundles could make the task -/// unschedulable. Non-finite / non-positive CPU values contribute nothing (the -/// config is validated upstream, but an explicit plan request is not). +/// Sub-unit demand is packed: sub-GPU tasks sum into `ceil(sum)` `{CPU:1,GPU:1}` +/// bundles, sub-CPU CPU-only tasks into `ceil(sum)` `{CPU:1}` bundles — so N tasks +/// at 0.1 request `ceil(0.1*N)` units, not N (issue #7123). Tasks with memory or a +/// whole unit (`> 1`) keep an individual, placement-pinned bundle. Non-positive +/// values are dropped; non-finite is rejected by `try_new_internal`. pub(crate) fn aggregate_ray_bundles(requests: &[&TaskResourceRequest]) -> Vec { let mut fractional_cpu_sum = 0.0; + let mut fractional_gpu_sum = 0.0; let mut bundles = Vec::new(); for request in requests { - let gpu = request.num_gpus().ceil() as i64; - let memory = request.memory_bytes() as i64; - let gpu = (gpu > 0).then_some(gpu); - let memory = (memory > 0).then_some(memory); let cpus = request.num_cpus(); - if gpu.is_some() || memory.is_some() || cpus > 1.0 { + let gpus = request.num_gpus(); + let memory = request.memory_bytes() as i64; + if memory > 0 || cpus > 1.0 || gpus > 1.0 { + let gpu = gpus.ceil() as i64; bundles.push(RayBundle { cpu: (cpus.ceil() as i64).max(1), - gpu, - memory, + gpu: (gpu > 0).then_some(gpu), + memory: (memory > 0).then_some(memory), }); + } else if gpus > 0.0 { + // Sub-GPU task: pack into shared whole-GPU bundles. Its small CPU is + // covered by the GPU node, so it is not also counted as CPU demand. + fractional_gpu_sum += gpus; } else if cpus > 0.0 { - // `cpus > 0.0` is false for NaN, so a poisoned value is dropped - // rather than zeroing the whole batch's CPU request. + // Sub-CPU, CPU-only task: pack into shared whole-CPU bundles. + // `> 0.0` is false for NaN, so a poisoned value is dropped rather + // than zeroing the batch's request. fractional_cpu_sum += cpus; } } @@ -100,9 +100,45 @@ pub(crate) fn aggregate_ray_bundles(requests: &[&TaskResourceRequest]) -> Vec Option> { + let mut cpu_sum = 0.0; + let mut gpu_sum = 0.0; + let mut memory_sum = 0usize; + let mut selected: Vec<&TaskResourceRequest> = Vec::new(); + for request in pending { + cpu_sum += request.num_cpus(); + gpu_sum += request.num_gpus(); + memory_sum += request.memory_bytes(); + selected.push(request); + if cpu_sum > high_water_cpus + || gpu_sum > high_water_gpus + || memory_sum > high_water_memory + { + return Some(aggregate_ray_bundles(&selected)); + } + } + None +} + pub(crate) type TaskID = u32; pub(crate) type TaskName = String; @@ -761,6 +797,45 @@ pub(super) mod tests { })); } + #[test] + fn aggregate_ray_bundles_packs_fractional_gpu() { + // 11 tasks at 0.1 GPU -> ceil(1.1) = 2 GPU bundles, not 11. + let rr = ResourceRequest::try_new_internal(Some(0.1), Some(0.1), None).unwrap(); + let g = TaskResourceRequest::new(rr, 1.0); + let refs: Vec<_> = (0..11).map(|_| &g).collect(); + let bundles = aggregate_ray_bundles(&refs); + let total_gpu: i64 = bundles.iter().filter_map(|b| b.gpu).sum(); + assert_eq!(total_gpu, 2); + assert!(bundles.iter().all(|b| b.gpu == Some(1) && b.cpu == 1)); + } + + #[test] + fn next_autoscale_request_escalates_by_whole_cpu() { + // mark=1.0 -> select 11 tasks (cpu_sum>1.0) -> ceil(1.1) = 2 CPUs. + let r = TaskResourceRequest::new(ResourceRequest::default(), 0.1); + let pending: Vec<_> = (0..50).map(|_| r.clone()).collect(); + let bundles = next_autoscale_request(&pending, 1.0, 0.0, 0).unwrap(); + let total_cpu: i64 = bundles.iter().map(|b| b.cpu).sum(); + assert_eq!(total_cpu, 2); + } + + #[test] + fn next_autoscale_request_packs_fractional_gpu() { + let rr = ResourceRequest::try_new_internal(Some(0.1), Some(0.1), None).unwrap(); + let g = TaskResourceRequest::new(rr, 1.0); + let pending: Vec<_> = (0..30).map(|_| g.clone()).collect(); + let bundles = next_autoscale_request(&pending, 100.0, 1.0, 0).unwrap(); + let total_gpu: i64 = bundles.iter().filter_map(|b| b.gpu).sum(); + assert_eq!(total_gpu, 2); + } + + #[test] + fn next_autoscale_request_skips_below_mark() { + let r = TaskResourceRequest::new(ResourceRequest::default(), 0.1); + let pending = vec![r.clone(), r]; + assert!(next_autoscale_request(&pending, 1.0, 0.0, 0).is_none()); + } + #[derive(Debug)] pub struct MockPartition { num_rows: usize, From 467f0b72941dfcebd2772ad3cd1340d3188dbffc Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sat, 20 Jun 2026 20:30:01 -0700 Subject: [PATCH 16/21] fix(flotilla): carry GPU tasks' CPU into packed autoscaler bundles MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Packing sub-GPU tasks dropped their CPU demand entirely: a task with num_gpus<1 and the min_cpu_per_task fallback (e.g. @daft.cls(gpus=0.5) -> num_cpus=None -> 1.0 CPU) only contributed to fractional_gpu_sum, so the {CPU:1,GPU:1} bundles under-counted CPU. Worse, next_autoscale_request selects on raw cpu_sum while the high-water mark records the post-aggregation totals, so two such tasks at mark 1/1 kept re-requesting {CPU:1,GPU:1} without the mark ever growing — the cluster could stall and never provision the second CPU, leaving tasks unschedulable. Accumulate the packed tasks' CPU (gpu_cpu_sum) and spread it across the GPU bundles as ceil(gpu_cpu_sum / gpu_bundles). The request now reflects real CPU demand, so the recorded mark grows whenever selection triggers and the ramp converges. Add a regression test for the gpus=0.5 + fallback-CPU stall. Verified: cargo test -p daft-distributed --lib (13 task tests) and cargo check/clippy --features python clean. --- src/daft-distributed/src/scheduling/task.rs | 50 ++++++++++++++++----- 1 file changed, 38 insertions(+), 12 deletions(-) diff --git a/src/daft-distributed/src/scheduling/task.rs b/src/daft-distributed/src/scheduling/task.rs index 89e6c39856c..fbeb7c6a303 100644 --- a/src/daft-distributed/src/scheduling/task.rs +++ b/src/daft-distributed/src/scheduling/task.rs @@ -62,14 +62,17 @@ pub(crate) struct RayBundle { /// Aggregate per-task requests into integer Ray autoscaler bundles (Ray's /// `request_resources` rejects non-integer values). /// -/// Sub-unit demand is packed: sub-GPU tasks sum into `ceil(sum)` `{CPU:1,GPU:1}` +/// Sub-unit demand is packed: sub-GPU tasks sum into `ceil(sum)` `{CPU,GPU:1}` /// bundles, sub-CPU CPU-only tasks into `ceil(sum)` `{CPU:1}` bundles — so N tasks -/// at 0.1 request `ceil(0.1*N)` units, not N (issue #7123). Tasks with memory or a -/// whole unit (`> 1`) keep an individual, placement-pinned bundle. Non-positive -/// values are dropped; non-finite is rejected by `try_new_internal`. +/// at 0.1 request `ceil(0.1*N)` units, not N (issue #7123). The packed tasks' CPU +/// is carried on the GPU bundles (spread across them) so their co-located CPU is +/// provisioned, not dropped. Tasks with memory or a whole unit (`> 1`) keep an +/// individual, placement-pinned bundle. Non-positive values are dropped; non-finite +/// is rejected by `try_new_internal`. pub(crate) fn aggregate_ray_bundles(requests: &[&TaskResourceRequest]) -> Vec { let mut fractional_cpu_sum = 0.0; let mut fractional_gpu_sum = 0.0; + let mut gpu_cpu_sum = 0.0; let mut bundles = Vec::new(); for request in requests { let cpus = request.num_cpus(); @@ -83,9 +86,12 @@ pub(crate) fn aggregate_ray_bundles(requests: &[&TaskResourceRequest]) -> Vec 0).then_some(memory), }); } else if gpus > 0.0 { - // Sub-GPU task: pack into shared whole-GPU bundles. Its small CPU is - // covered by the GPU node, so it is not also counted as CPU demand. + // Sub-GPU task: pack into shared whole-GPU bundles, carrying its CPU so + // the CPU it needs co-located with the GPU is still provisioned. fractional_gpu_sum += gpus; + if cpus > 0.0 { + gpu_cpu_sum += cpus; + } } else if cpus > 0.0 { // Sub-CPU, CPU-only task: pack into shared whole-CPU bundles. // `> 0.0` is false for NaN, so a poisoned value is dropped rather @@ -100,12 +106,16 @@ pub(crate) fn aggregate_ray_bundles(requests: &[&TaskResourceRequest]) -> Vec 0 { + let cpu_per_bundle = (gpu_cpu_sum / gpu_bundles as f64).ceil().max(1.0) as i64; + for _ in 0..gpu_bundles { + bundles.push(RayBundle { + cpu: cpu_per_bundle, + gpu: Some(1), + memory: None, + }); + } } bundles } @@ -836,6 +846,22 @@ pub(super) mod tests { assert!(next_autoscale_request(&pending, 1.0, 0.0, 0).is_none()); } + #[test] + fn next_autoscale_request_grows_for_fractional_gpu_with_cpu_fallback() { + // @daft.cls(gpus=0.5): num_cpus=None -> fallback 1.0 CPU, num_gpus=0.5. + // At mark 1 CPU / 1 GPU the request must actually grow, not restate 1/1. + let rr = ResourceRequest::try_new_internal(None, Some(0.5), None).unwrap(); + let t = TaskResourceRequest::new(rr, 1.0); + let pending = vec![t.clone(), t]; + let bundles = next_autoscale_request(&pending, 1.0, 1.0, 0).unwrap(); + let total_cpu: i64 = bundles.iter().map(|b| b.cpu).sum(); + let total_gpu: i64 = bundles.iter().filter_map(|b| b.gpu).sum(); + assert!( + total_cpu > 1 || total_gpu > 1, + "request must grow past 1/1, got {total_cpu}/{total_gpu}" + ); + } + #[derive(Debug)] pub struct MockPartition { num_rows: usize, From 5f25d9138ab0b1a56c72baa9c88d9a17748e5c9c Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sat, 20 Jun 2026 20:45:26 -0700 Subject: [PATCH 17/21] refactor(daft-config): share min_cpu_per_task validity rule The "finite and > 0" check for min_cpu_per_task was duplicated in the env path and the Python setter, which could drift. Extract it into a single DaftExecutionConfig::is_valid_min_cpu_per_task and call it from both. --- src/common/daft-config/src/lib.rs | 7 ++++++- src/common/daft-config/src/python.rs | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/common/daft-config/src/lib.rs b/src/common/daft-config/src/lib.rs index 28fdc04fb08..8af27026925 100644 --- a/src/common/daft-config/src/lib.rs +++ b/src/common/daft-config/src/lib.rs @@ -209,6 +209,11 @@ impl DaftExecutionConfig { const ENV_DAFT_SCANTASK_MAX_PARALLEL: &'static str = "DAFT_SCANTASK_MAX_PARALLEL"; const ENV_DAFT_NATIVE_PARQUET_WRITER: &'static str = "DAFT_NATIVE_PARQUET_WRITER"; const ENV_DAFT_MIN_CPU_PER_TASK: &'static str = "DAFT_MIN_CPU_PER_TASK"; + + /// Single validity rule (finite, positive) shared by the env and Python setter. + pub(crate) fn is_valid_min_cpu_per_task(value: f64) -> bool { + value.is_finite() && value > 0.0 + } const ENV_DAFT_ACTOR_UDF_READY_TIMEOUT: &'static str = "DAFT_ACTOR_UDF_READY_TIMEOUT"; const ENV_PARQUET_INFLATION_FACTOR: &'static str = "DAFT_PARQUET_INFLATION_FACTOR"; const ENV_CSV_INFLATION_FACTOR: &'static str = "DAFT_CSV_INFLATION_FACTOR"; @@ -241,7 +246,7 @@ impl DaftExecutionConfig { if let Some(val) = parse_number_from_env(Self::ENV_DAFT_MIN_CPU_PER_TASK, cfg.min_cpu_per_task) { - if val.is_finite() && val > 0.0 { + if Self::is_valid_min_cpu_per_task(val) { cfg.min_cpu_per_task = val; } else { eprintln!( diff --git a/src/common/daft-config/src/python.rs b/src/common/daft-config/src/python.rs index 5af6c04a2db..0f03beac037 100644 --- a/src/common/daft-config/src/python.rs +++ b/src/common/daft-config/src/python.rs @@ -269,7 +269,7 @@ impl PyDaftExecutionConfig { } if let Some(min_cpu_per_task) = min_cpu_per_task { - if !min_cpu_per_task.is_finite() || min_cpu_per_task <= 0.0 { + if !DaftExecutionConfig::is_valid_min_cpu_per_task(min_cpu_per_task) { return Err(PyErr::new::(format!( "min_cpu_per_task must be a finite number > 0, got {min_cpu_per_task}" ))); From 953b6be1491626d37b333eafac5a7581f0e0af58 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sat, 20 Jun 2026 21:07:44 -0700 Subject: [PATCH 18/21] fix(flotilla): emit unit GPU autoscaler bundles, not oversized shapes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Carrying GPU tasks' CPU as ceil(gpu_cpu_sum / gpu_bundles) produced bundles like {CPU:2, GPU:1}. As a single Ray request_resources shape that fits no standard 1-CPU/1-GPU node, so the autoscaler can't scale up — and the value is recorded as the high-water mark, stalling further attempts. Emit unit {CPU:1, GPU:1} bundles instead (a sub-GPU task's cpu and gpu are each <= 1, so one always fits a standard GPU node), with the count covering both dimensions: ceil(max(gpu_sum, gpu_cpu_sum)). Two 1-CPU/0.5-GPU tasks now request two {CPU:1,GPU:1} shapes (2 CPU / 2 GPU) rather than one unschedulable {CPU:2}. Assert the schedulable shape in the regression test. --- src/daft-distributed/src/scheduling/task.rs | 46 ++++++++++----------- 1 file changed, 21 insertions(+), 25 deletions(-) diff --git a/src/daft-distributed/src/scheduling/task.rs b/src/daft-distributed/src/scheduling/task.rs index fbeb7c6a303..86b2e483fad 100644 --- a/src/daft-distributed/src/scheduling/task.rs +++ b/src/daft-distributed/src/scheduling/task.rs @@ -59,16 +59,12 @@ pub(crate) struct RayBundle { pub memory: Option, } -/// Aggregate per-task requests into integer Ray autoscaler bundles (Ray's -/// `request_resources` rejects non-integer values). -/// -/// Sub-unit demand is packed: sub-GPU tasks sum into `ceil(sum)` `{CPU,GPU:1}` -/// bundles, sub-CPU CPU-only tasks into `ceil(sum)` `{CPU:1}` bundles — so N tasks -/// at 0.1 request `ceil(0.1*N)` units, not N (issue #7123). The packed tasks' CPU -/// is carried on the GPU bundles (spread across them) so their co-located CPU is -/// provisioned, not dropped. Tasks with memory or a whole unit (`> 1`) keep an -/// individual, placement-pinned bundle. Non-positive values are dropped; non-finite -/// is rejected by `try_new_internal`. +/// Aggregate per-task requests into integer Ray bundles (Ray rejects non-integer +/// values). Sub-unit demand packs into unit bundles — `{CPU:1}` for CPU-only, +/// `{CPU:1,GPU:1}` for GPU — so N tasks at 0.1 request `ceil(0.1*N)` units, not N +/// (issue #7123); the GPU count covers both GPU and the tasks' co-located CPU. +/// Unit shapes stay schedulable; `{CPU:N,GPU:1}` might fit no node. Memory / +/// whole-unit (`> 1`) tasks keep an individual bundle. pub(crate) fn aggregate_ray_bundles(requests: &[&TaskResourceRequest]) -> Vec { let mut fractional_cpu_sum = 0.0; let mut fractional_gpu_sum = 0.0; @@ -86,16 +82,13 @@ pub(crate) fn aggregate_ray_bundles(requests: &[&TaskResourceRequest]) -> Vec 0).then_some(memory), }); } else if gpus > 0.0 { - // Sub-GPU task: pack into shared whole-GPU bundles, carrying its CPU so - // the CPU it needs co-located with the GPU is still provisioned. + // Sub-GPU: track GPU and its co-located CPU for the bundle count. fractional_gpu_sum += gpus; if cpus > 0.0 { gpu_cpu_sum += cpus; } } else if cpus > 0.0 { - // Sub-CPU, CPU-only task: pack into shared whole-CPU bundles. - // `> 0.0` is false for NaN, so a poisoned value is dropped rather - // than zeroing the batch's request. + // `> 0.0` is false for NaN, dropping a poisoned value. fractional_cpu_sum += cpus; } } @@ -106,16 +99,12 @@ pub(crate) fn aggregate_ray_bundles(requests: &[&TaskResourceRequest]) -> Vec 0 { - let cpu_per_bundle = (gpu_cpu_sum / gpu_bundles as f64).ceil().max(1.0) as i64; - for _ in 0..gpu_bundles { - bundles.push(RayBundle { - cpu: cpu_per_bundle, - gpu: Some(1), - memory: None, - }); - } + for _ in 0..fractional_gpu_sum.max(gpu_cpu_sum).ceil() as i64 { + bundles.push(RayBundle { + cpu: 1, + gpu: Some(1), + memory: None, + }); } bundles } @@ -860,6 +849,13 @@ pub(super) mod tests { total_cpu > 1 || total_gpu > 1, "request must grow past 1/1, got {total_cpu}/{total_gpu}" ); + // Each bundle must fit a standard 1-CPU/1-GPU node. + assert!( + bundles + .iter() + .all(|b| b.cpu <= 1 && b.gpu.unwrap_or(0) <= 1), + "bundles must be single-node schedulable, got {bundles:?}" + ); } #[derive(Debug)] From 1483cee0aa633f6ed39502a85fafcc1960aefc1c Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sat, 20 Jun 2026 23:11:48 -0700 Subject: [PATCH 19/21] docs(flotilla): tighten autoscaler bundle comments --- .../src/python/ray/worker_manager.rs | 5 ++--- src/daft-distributed/src/scheduling/task.rs | 15 +++++---------- 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/src/daft-distributed/src/python/ray/worker_manager.rs b/src/daft-distributed/src/python/ray/worker_manager.rs index 17dc0f7c995..e1ff415a641 100644 --- a/src/daft-distributed/src/python/ray/worker_manager.rs +++ b/src/daft-distributed/src/python/ray/worker_manager.rs @@ -302,9 +302,8 @@ impl WorkerManager for RayWorkerManager { state.pending_release_blacklist.clear(); state.last_refresh = None; - // 5. Record the new high-water mark as the aggregated integer totals we just - // requested — same units Ray sees, so each cycle escalates by a whole CPU/GPU - // instead of stalling for ~1/min_cpu_per_task cycles before the ceil bumps. + // 5. Record the new high-water mark as the aggregated integer totals just + // requested — same units Ray sees, so each cycle escalates by a whole unit. let requested_cpus: i64 = ray_bundles.iter().map(|b| b.cpu).sum(); let requested_gpus: i64 = ray_bundles.iter().filter_map(|b| b.gpu).sum(); let requested_memory: i64 = ray_bundles.iter().filter_map(|b| b.memory).sum(); diff --git a/src/daft-distributed/src/scheduling/task.rs b/src/daft-distributed/src/scheduling/task.rs index 86b2e483fad..3d74ebc18f5 100644 --- a/src/daft-distributed/src/scheduling/task.rs +++ b/src/daft-distributed/src/scheduling/task.rs @@ -50,8 +50,7 @@ impl TaskResourceRequest { } } -/// An integer-valued resource bundle for Ray's autoscaler `request_resources`, -/// which (as of Ray 2.55) rejects non-integer bundle values. +/// Integer-valued bundle for Ray's `request_resources` (rejects non-integer values). #[derive(Debug, Clone, PartialEq)] pub(crate) struct RayBundle { pub cpu: i64, @@ -60,11 +59,9 @@ pub(crate) struct RayBundle { } /// Aggregate per-task requests into integer Ray bundles (Ray rejects non-integer -/// values). Sub-unit demand packs into unit bundles — `{CPU:1}` for CPU-only, -/// `{CPU:1,GPU:1}` for GPU — so N tasks at 0.1 request `ceil(0.1*N)` units, not N -/// (issue #7123); the GPU count covers both GPU and the tasks' co-located CPU. -/// Unit shapes stay schedulable; `{CPU:N,GPU:1}` might fit no node. Memory / -/// whole-unit (`> 1`) tasks keep an individual bundle. +/// values). Sub-unit demand packs into unit `{CPU:1}` / `{CPU:1,GPU:1}` bundles — +/// N tasks at 0.1 request `ceil(0.1*N)` units, not N (issue #7123); unit shapes +/// stay single-node schedulable. Memory / whole-unit (`> 1`) tasks stay individual. pub(crate) fn aggregate_ray_bundles(requests: &[&TaskResourceRequest]) -> Vec { let mut fractional_cpu_sum = 0.0; let mut fractional_gpu_sum = 0.0; @@ -110,9 +107,7 @@ pub(crate) fn aggregate_ray_bundles(requests: &[&TaskResourceRequest]) -> Vec Date: Sat, 20 Jun 2026 23:22:07 -0700 Subject: [PATCH 20/21] fix(flotilla): honor explicit num_cpus=0 in autoscaler bundles MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit aggregate_ray_bundles forced CPU >= 1 (via .max(1) on individual bundles and a fixed CPU:1 on every GPU bundle), so a task that explicitly sets num_cpus=0 still requested a CPU — breaking "explicit num_cpus passes through unchanged" and over-requesting CPU for GPU-only / memory-only workloads. Drop the .max(1); give GPU bundles CPU only when the packed tasks actually need it (gpu_cpu_sum > 0); and omit the CPU key from the Ray bundle dict when it is zero. Add a test for num_cpus=0 GPU-only and memory-only tasks. --- .../src/python/ray/worker_manager.rs | 4 ++- src/daft-distributed/src/scheduling/task.rs | 35 ++++++++++++++++--- 2 files changed, 33 insertions(+), 6 deletions(-) diff --git a/src/daft-distributed/src/python/ray/worker_manager.rs b/src/daft-distributed/src/python/ray/worker_manager.rs index e1ff415a641..b8b5e8ff5f2 100644 --- a/src/daft-distributed/src/python/ray/worker_manager.rs +++ b/src/daft-distributed/src/python/ray/worker_manager.rs @@ -281,7 +281,9 @@ impl WorkerManager for RayWorkerManager { .iter() .map(|bundle| { let dict = pyo3::types::PyDict::new(py); - dict.set_item("CPU", bundle.cpu)?; + if bundle.cpu > 0 { + dict.set_item("CPU", bundle.cpu)?; + } if let Some(gpu) = bundle.gpu { dict.set_item("GPU", gpu)?; } diff --git a/src/daft-distributed/src/scheduling/task.rs b/src/daft-distributed/src/scheduling/task.rs index 3d74ebc18f5..7973b1339af 100644 --- a/src/daft-distributed/src/scheduling/task.rs +++ b/src/daft-distributed/src/scheduling/task.rs @@ -74,7 +74,7 @@ pub(crate) fn aggregate_ray_bundles(requests: &[&TaskResourceRequest]) -> Vec 0 || cpus > 1.0 || gpus > 1.0 { let gpu = gpus.ceil() as i64; bundles.push(RayBundle { - cpu: (cpus.ceil() as i64).max(1), + cpu: cpus.ceil() as i64, gpu: (gpu > 0).then_some(gpu), memory: (memory > 0).then_some(memory), }); @@ -96,9 +96,10 @@ pub(crate) fn aggregate_ray_bundles(requests: &[&TaskResourceRequest]) -> Vec 0.0); for _ in 0..fractional_gpu_sum.max(gpu_cpu_sum).ceil() as i64 { bundles.push(RayBundle { - cpu: 1, + cpu: gpu_bundle_cpu, gpu: Some(1), memory: None, }); @@ -123,9 +124,7 @@ pub(crate) fn next_autoscale_request( gpu_sum += request.num_gpus(); memory_sum += request.memory_bytes(); selected.push(request); - if cpu_sum > high_water_cpus - || gpu_sum > high_water_gpus - || memory_sum > high_water_memory + if cpu_sum > high_water_cpus || gpu_sum > high_water_gpus || memory_sum > high_water_memory { return Some(aggregate_ray_bundles(&selected)); } @@ -803,6 +802,32 @@ pub(super) mod tests { assert!(bundles.iter().all(|b| b.gpu == Some(1) && b.cpu == 1)); } + #[test] + fn aggregate_ray_bundles_respects_explicit_zero_cpu() { + let gpu = TaskResourceRequest::new( + ResourceRequest::try_new_internal(Some(0.0), Some(0.5), None).unwrap(), + 1.0, + ); + assert!( + aggregate_ray_bundles(&[&gpu, &gpu]) + .iter() + .all(|b| b.cpu == 0 && b.gpu == Some(1)) + ); + + let mem = TaskResourceRequest::new( + ResourceRequest::try_new_internal(Some(0.0), None, Some(1024)).unwrap(), + 1.0, + ); + assert_eq!( + aggregate_ray_bundles(&[&mem]), + vec![RayBundle { + cpu: 0, + gpu: None, + memory: Some(1024) + }] + ); + } + #[test] fn next_autoscale_request_escalates_by_whole_cpu() { // mark=1.0 -> select 11 tasks (cpu_sum>1.0) -> ceil(1.1) = 2 CPUs. From 5f9a8a6f510a94fca8d2313886de3d7a2787394d Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Wed, 24 Jun 2026 01:17:44 -0700 Subject: [PATCH 21/21] deprecate min_cpu_per_task execution config --- daft/context.py | 12 +- src/common/daft-config/src/lib.rs | 50 +-- src/common/daft-config/src/python.rs | 5 - src/common/resource-request/src/lib.rs | 36 +-- .../src/python/ray/worker_manager.rs | 93 +++--- src/daft-distributed/src/scheduling/task.rs | 304 +----------------- src/daft-local-plan/src/plan.rs | 10 +- tests/test_context.py | 13 +- 8 files changed, 85 insertions(+), 438 deletions(-) diff --git a/daft/context.py b/daft/context.py index c801cdd12b9..e7e88c5e6f2 100644 --- a/daft/context.py +++ b/daft/context.py @@ -3,6 +3,7 @@ import contextlib import logging import threading +import warnings from dataclasses import dataclass from typing import TYPE_CHECKING, Any, ClassVar @@ -285,7 +286,8 @@ def set_execution_config( pre_shuffle_merge_partition_threshold: Number of partitions threshold to enable pre-shuffle merge when shuffle_algorithm is "auto". Defaults to 200. scantask_max_parallel: Set the max parallelism for running scan tasks simultaneously. Currently, this only works for Native Runner. If set to 0, all available CPUs will be used. Defaults to 8. native_parquet_writer: Whether to use the native parquet writer vs the pyarrow parquet writer. Defaults to `True`. - min_cpu_per_task: Default CPU per task when a plan does not specify num_cpus. Explicit num_cpus on a plan passes through unchanged. Used by the flotilla scheduler for autoscaler bundle requests. Must be > 0. Defaults to 1.0. + min_cpu_per_task: Deprecated. This was used by the old Ray runner and has no effect on + Flotilla scheduling. It will be removed in the next minor version. actor_udf_ready_timeout: Timeout for UDF actors to be ready. Defaults to 120 seconds. maintain_order: Whether to maintain order during execution. Defaults to True. Some blocking sink operators (e.g. write_parquet) won't respect this flag and will always keep maintain_order as false, and propagate to child operators. It's useful to set this to False for running df.collect() when no ordering is required. enable_dynamic_batching: Whether to enable dynamic batching. Defaults to False. @@ -295,6 +297,14 @@ def set_execution_config( enable_multi_glob_path_tasks: Whether to create multiple glob path tasks in Ray Runner to achieve parallel glob. Defaults to False. """ # Replace values in the DaftExecutionConfig with user-specified overrides + if min_cpu_per_task is not None: + warnings.warn( + "`min_cpu_per_task` is deprecated and has no effect on Flotilla scheduling. " + "It will be removed in the next minor version.", + DeprecationWarning, + stacklevel=2, + ) + ctx = get_context() with ctx._lock: old_daft_execution_config = ctx._ctx._daft_execution_config if config is None else config diff --git a/src/common/daft-config/src/lib.rs b/src/common/daft-config/src/lib.rs index 8af27026925..93563f8a0f6 100644 --- a/src/common/daft-config/src/lib.rs +++ b/src/common/daft-config/src/lib.rs @@ -192,7 +192,7 @@ impl Default for DaftExecutionConfig { pre_shuffle_merge_partition_threshold: 200, scantask_max_parallel: 8, native_parquet_writer: true, - min_cpu_per_task: 1.0, + min_cpu_per_task: 0.5, actor_udf_ready_timeout: 120, maintain_order: true, enable_dynamic_batching: false, @@ -209,11 +209,6 @@ impl DaftExecutionConfig { const ENV_DAFT_SCANTASK_MAX_PARALLEL: &'static str = "DAFT_SCANTASK_MAX_PARALLEL"; const ENV_DAFT_NATIVE_PARQUET_WRITER: &'static str = "DAFT_NATIVE_PARQUET_WRITER"; const ENV_DAFT_MIN_CPU_PER_TASK: &'static str = "DAFT_MIN_CPU_PER_TASK"; - - /// Single validity rule (finite, positive) shared by the env and Python setter. - pub(crate) fn is_valid_min_cpu_per_task(value: f64) -> bool { - value.is_finite() && value > 0.0 - } const ENV_DAFT_ACTOR_UDF_READY_TIMEOUT: &'static str = "DAFT_ACTOR_UDF_READY_TIMEOUT"; const ENV_PARQUET_INFLATION_FACTOR: &'static str = "DAFT_PARQUET_INFLATION_FACTOR"; const ENV_CSV_INFLATION_FACTOR: &'static str = "DAFT_CSV_INFLATION_FACTOR"; @@ -246,16 +241,11 @@ impl DaftExecutionConfig { if let Some(val) = parse_number_from_env(Self::ENV_DAFT_MIN_CPU_PER_TASK, cfg.min_cpu_per_task) { - if Self::is_valid_min_cpu_per_task(val) { - cfg.min_cpu_per_task = val; - } else { - eprintln!( - "Invalid {} value: {}, must be a finite number > 0, using default {}", - Self::ENV_DAFT_MIN_CPU_PER_TASK, - val, - cfg.min_cpu_per_task - ); - } + eprintln!( + "{} is deprecated and has no effect on Flotilla scheduling. It will be removed in the next minor version.", + Self::ENV_DAFT_MIN_CPU_PER_TASK + ); + cfg.min_cpu_per_task = val; } if let Some(val) = parse_number_from_env( @@ -527,7 +517,7 @@ mod tests { // ENV_DAFT_MIN_CPU_PER_TASK { let cfg = DaftExecutionConfig::from_env(); - assert_eq!(cfg.min_cpu_per_task, 1.0); + assert_eq!(cfg.min_cpu_per_task, 0.5); unsafe { std::env::set_var(DaftExecutionConfig::ENV_DAFT_MIN_CPU_PER_TASK, "0.1"); @@ -539,31 +529,7 @@ mod tests { std::env::set_var(DaftExecutionConfig::ENV_DAFT_MIN_CPU_PER_TASK, "invalid"); } let cfg = DaftExecutionConfig::from_env(); - assert_eq!(cfg.min_cpu_per_task, 1.0); - - unsafe { - std::env::set_var(DaftExecutionConfig::ENV_DAFT_MIN_CPU_PER_TASK, "0"); - } - let cfg = DaftExecutionConfig::from_env(); - assert_eq!(cfg.min_cpu_per_task, 1.0); - - unsafe { - std::env::set_var(DaftExecutionConfig::ENV_DAFT_MIN_CPU_PER_TASK, "-0.5"); - } - let cfg = DaftExecutionConfig::from_env(); - assert_eq!(cfg.min_cpu_per_task, 1.0); - - unsafe { - std::env::set_var(DaftExecutionConfig::ENV_DAFT_MIN_CPU_PER_TASK, "nan"); - } - let cfg = DaftExecutionConfig::from_env(); - assert_eq!(cfg.min_cpu_per_task, 1.0); - - unsafe { - std::env::set_var(DaftExecutionConfig::ENV_DAFT_MIN_CPU_PER_TASK, "inf"); - } - let cfg = DaftExecutionConfig::from_env(); - assert_eq!(cfg.min_cpu_per_task, 1.0); + assert_eq!(cfg.min_cpu_per_task, 0.5); unsafe { std::env::remove_var(DaftExecutionConfig::ENV_DAFT_MIN_CPU_PER_TASK); diff --git a/src/common/daft-config/src/python.rs b/src/common/daft-config/src/python.rs index 0f03beac037..3f4218e69f5 100644 --- a/src/common/daft-config/src/python.rs +++ b/src/common/daft-config/src/python.rs @@ -269,11 +269,6 @@ impl PyDaftExecutionConfig { } if let Some(min_cpu_per_task) = min_cpu_per_task { - if !DaftExecutionConfig::is_valid_min_cpu_per_task(min_cpu_per_task) { - return Err(PyErr::new::(format!( - "min_cpu_per_task must be a finite number > 0, got {min_cpu_per_task}" - ))); - } config.min_cpu_per_task = min_cpu_per_task; } diff --git a/src/common/resource-request/src/lib.rs b/src/common/resource-request/src/lib.rs index bf6e144ac87..e055bc73489 100644 --- a/src/common/resource-request/src/lib.rs +++ b/src/common/resource-request/src/lib.rs @@ -30,19 +30,10 @@ impl ResourceRequest { num_gpus: Option, memory_bytes: Option, ) -> DaftResult { - if let Some(num_cpus) = num_cpus - && !(num_cpus.is_finite() && num_cpus >= 0.0) - { - return Err(DaftError::ValueError(format!( - "ResourceRequest num_cpus must be a finite, nonnegative number, got {}", - num_cpus - ))); - } - if let Some(num_gpus) = num_gpus { - if !(num_gpus.is_finite() && num_gpus >= 0.0) { + if num_gpus < 0.0 { return Err(DaftError::ValueError(format!( - "ResourceRequest num_gpus must be a finite, nonnegative number, got {}", + "ResourceRequest num_gpus must be nonnegative, got {}", num_gpus ))); } @@ -284,26 +275,3 @@ pub fn register_modules(parent: &Bound) -> PyResult<()> { parent.add_class::()?; Ok(()) } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn rejects_non_finite_or_negative_cpus() { - for bad in [f64::NAN, f64::INFINITY, f64::NEG_INFINITY, -0.5] { - assert!(ResourceRequest::try_new_internal(Some(bad), None, None).is_err()); - } - assert!(ResourceRequest::try_new_internal(Some(0.0), None, None).is_ok()); - assert!(ResourceRequest::try_new_internal(Some(0.25), None, None).is_ok()); - } - - #[test] - fn rejects_non_finite_or_negative_gpus() { - for bad in [f64::NAN, f64::INFINITY, -1.0] { - assert!(ResourceRequest::try_new_internal(None, Some(bad), None).is_err()); - } - assert!(ResourceRequest::try_new_internal(None, Some(0.5), None).is_ok()); - assert!(ResourceRequest::try_new_internal(None, Some(2.0), None).is_ok()); - } -} diff --git a/src/daft-distributed/src/python/ray/worker_manager.rs b/src/daft-distributed/src/python/ray/worker_manager.rs index b8b5e8ff5f2..4ac2c616ef7 100644 --- a/src/daft-distributed/src/python/ray/worker_manager.rs +++ b/src/daft-distributed/src/python/ray/worker_manager.rs @@ -11,7 +11,7 @@ use pyo3::prelude::*; use super::{task::RayTaskResultHandle, worker::RaySwordfishWorker}; use crate::scheduling::{ scheduler::WorkerSnapshot, - task::{SwordfishTask, TaskContext, TaskResourceRequest, next_autoscale_request}, + task::{SwordfishTask, TaskContext, TaskResourceRequest}, worker::{Worker, WorkerId, WorkerManager}, }; @@ -262,38 +262,57 @@ impl WorkerManager for RayWorkerManager { .unwrap_or(0) .max(cluster_memory_bytes); - // 3. Select the next request: one bundle larger than the previous high-water - // mark, so the cluster ramps up gradually. None means demand hasn't grown - // past the last request, so skip — Ray still holds that (larger) request. - let Some(ray_bundles) = next_autoscale_request( - &bundles, - high_water_mark_cpus, - high_water_mark_gpus, - high_water_mark_memory, - ) else { + // 3. Accumulate bundles one at a time until the running total surpasses the + // high-water mark in any resource dimension (CPU, GPU, or memory). This ensures + // each cycle's request is exactly one bundle larger than the previous max — + // gradual enough to avoid exceeding an unknown cluster capacity limit. + let mut cpu_sum = 0.0; + let mut gpu_sum = 0.0; + let mut memory_sum = 0; + let mut surpassed = false; + let mut selected_bundles = Vec::new(); + for bundle in &bundles { + cpu_sum += bundle.resource_request.num_cpus().unwrap_or(0.0); + gpu_sum += bundle.resource_request.num_gpus().unwrap_or(0.0); + memory_sum += bundle.resource_request.memory_bytes().unwrap_or(0); + selected_bundles.push(bundle); + if cpu_sum > high_water_mark_cpus + || gpu_sum > high_water_mark_gpus + || memory_sum > high_water_mark_memory + { + surpassed = true; + break; + } + } + + // 4. If we went through all pending bundles without surpassing the high-water mark, + // the remaining demand is smaller than what we previously requested. Skip this + // cycle — Ray still holds our previous (larger) request, so no downscale occurs. + if !surpassed { return Ok(()); - }; + } - // 4. Send the bundles to Ray's autoscaler. Zero-valued GPU/memory keys are - // omitted so Ray doesn't demand zero-resource bundles on specialized nodes. - Python::attach(|py| -> DaftResult<()> { - let python_bundles = ray_bundles - .iter() - .map(|bundle| { - let dict = pyo3::types::PyDict::new(py); - if bundle.cpu > 0 { - dict.set_item("CPU", bundle.cpu)?; - } - if let Some(gpu) = bundle.gpu { - dict.set_item("GPU", gpu)?; - } - if let Some(memory) = bundle.memory { - dict.set_item("memory", memory)?; - } - Ok::<_, PyErr>(dict) - }) - .collect::, _>>()?; + // 5. Send the selected bundles to Ray's autoscaler via request_resources(). + // Strip zero-valued GPU/memory keys so Ray doesn't interpret them as a demand + // for zero-resource bundles on specialized nodes. + let python_bundles = selected_bundles + .iter() + .map(|bundle| { + let mut dict = HashMap::new(); + dict.insert("CPU", bundle.num_cpus().ceil() as i64); + let gpu = bundle.num_gpus().ceil() as i64; + if gpu > 0 { + dict.insert("GPU", gpu); + } + let memory = bundle.memory_bytes() as i64; + if memory > 0 { + dict.insert("memory", memory); + } + dict + }) + .collect::>(); + Python::attach(|py| -> DaftResult<()> { let flotilla_module = py.import(pyo3::intern!(py, "daft.runners.flotilla"))?; flotilla_module.call_method1(pyo3::intern!(py, "try_autoscale"), (python_bundles,))?; Ok(()) @@ -304,16 +323,10 @@ impl WorkerManager for RayWorkerManager { state.pending_release_blacklist.clear(); state.last_refresh = None; - // 5. Record the new high-water mark as the aggregated integer totals just - // requested — same units Ray sees, so each cycle escalates by a whole unit. - let requested_cpus: i64 = ray_bundles.iter().map(|b| b.cpu).sum(); - let requested_gpus: i64 = ray_bundles.iter().filter_map(|b| b.gpu).sum(); - let requested_memory: i64 = ray_bundles.iter().filter_map(|b| b.memory).sum(); - state.max_resources_requested = ResourceRequest::try_new_internal( - Some(requested_cpus as f64), - Some(requested_gpus as f64), - Some(requested_memory as usize), - )?; + // 6. Record this request as the new high-water mark so the next cycle will + // request exactly one bundle more, and so we never send a smaller request. + state.max_resources_requested = + ResourceRequest::try_new_internal(Some(cpu_sum), Some(gpu_sum), Some(memory_sum))?; state.last_autoscale_request_time = Some(Instant::now()); Ok(()) diff --git a/src/daft-distributed/src/scheduling/task.rs b/src/daft-distributed/src/scheduling/task.rs index 7973b1339af..1be22465e8e 100644 --- a/src/daft-distributed/src/scheduling/task.rs +++ b/src/daft-distributed/src/scheduling/task.rs @@ -23,22 +23,15 @@ use crate::{ #[derive(Debug, Clone)] pub(crate) struct TaskResourceRequest { pub resource_request: ResourceRequest, - /// Fallback for `num_cpus()` when the plan leaves it unset. - min_cpu_per_task: f64, } impl TaskResourceRequest { - pub fn new(resource_request: ResourceRequest, min_cpu_per_task: f64) -> Self { - Self { - resource_request, - min_cpu_per_task, - } + pub fn new(resource_request: ResourceRequest) -> Self { + Self { resource_request } } pub fn num_cpus(&self) -> f64 { - self.resource_request - .num_cpus() - .unwrap_or(self.min_cpu_per_task) + self.resource_request.num_cpus().unwrap_or(1.0) } pub fn num_gpus(&self) -> f64 { @@ -50,88 +43,6 @@ impl TaskResourceRequest { } } -/// Integer-valued bundle for Ray's `request_resources` (rejects non-integer values). -#[derive(Debug, Clone, PartialEq)] -pub(crate) struct RayBundle { - pub cpu: i64, - pub gpu: Option, - pub memory: Option, -} - -/// Aggregate per-task requests into integer Ray bundles (Ray rejects non-integer -/// values). Sub-unit demand packs into unit `{CPU:1}` / `{CPU:1,GPU:1}` bundles — -/// N tasks at 0.1 request `ceil(0.1*N)` units, not N (issue #7123); unit shapes -/// stay single-node schedulable. Memory / whole-unit (`> 1`) tasks stay individual. -pub(crate) fn aggregate_ray_bundles(requests: &[&TaskResourceRequest]) -> Vec { - let mut fractional_cpu_sum = 0.0; - let mut fractional_gpu_sum = 0.0; - let mut gpu_cpu_sum = 0.0; - let mut bundles = Vec::new(); - for request in requests { - let cpus = request.num_cpus(); - let gpus = request.num_gpus(); - let memory = request.memory_bytes() as i64; - if memory > 0 || cpus > 1.0 || gpus > 1.0 { - let gpu = gpus.ceil() as i64; - bundles.push(RayBundle { - cpu: cpus.ceil() as i64, - gpu: (gpu > 0).then_some(gpu), - memory: (memory > 0).then_some(memory), - }); - } else if gpus > 0.0 { - // Sub-GPU: track GPU and its co-located CPU for the bundle count. - fractional_gpu_sum += gpus; - if cpus > 0.0 { - gpu_cpu_sum += cpus; - } - } else if cpus > 0.0 { - // `> 0.0` is false for NaN, dropping a poisoned value. - fractional_cpu_sum += cpus; - } - } - for _ in 0..(fractional_cpu_sum.ceil() as i64) { - bundles.push(RayBundle { - cpu: 1, - gpu: None, - memory: None, - }); - } - let gpu_bundle_cpu = i64::from(gpu_cpu_sum > 0.0); - for _ in 0..fractional_gpu_sum.max(gpu_cpu_sum).ceil() as i64 { - bundles.push(RayBundle { - cpu: gpu_bundle_cpu, - gpu: Some(1), - memory: None, - }); - } - bundles -} - -/// Minimal prefix of `pending` whose fractional demand exceeds the high-water mark, -/// aggregated into integer bundles; `None` if it hasn't grown (skip the cycle). -pub(crate) fn next_autoscale_request( - pending: &[TaskResourceRequest], - high_water_cpus: f64, - high_water_gpus: f64, - high_water_memory: usize, -) -> Option> { - let mut cpu_sum = 0.0; - let mut gpu_sum = 0.0; - let mut memory_sum = 0usize; - let mut selected: Vec<&TaskResourceRequest> = Vec::new(); - for request in pending { - cpu_sum += request.num_cpus(); - gpu_sum += request.num_gpus(); - memory_sum += request.memory_bytes(); - selected.push(request); - if cpu_sum > high_water_cpus || gpu_sum > high_water_gpus || memory_sum > high_water_memory - { - return Some(aggregate_ray_bundles(&selected)); - } - } - None -} - pub(crate) type TaskID = u32; pub(crate) type TaskName = String; @@ -597,8 +508,7 @@ impl SwordfishTaskBuilder { context.insert("plan_fingerprint".to_string(), plan_fingerprint.to_string()); // Extract resource_request from plan - let resource_request = - TaskResourceRequest::new(self.plan.resource_request(), self.config.min_cpu_per_task); + let resource_request = TaskResourceRequest::new(self.plan.resource_request()); // Mark the root of the local plan so the worker's NodeInfo carries // `is_task_root` on the StatSnapshot it ships back. `is_task_leaf` is @@ -680,204 +590,6 @@ pub(super) mod tests { use super::*; use crate::utils::channel::OneshotSender; - #[test] - fn num_cpus_uses_min_cpu_per_task_when_unset() { - let r = TaskResourceRequest::new(ResourceRequest::default(), 0.1); - assert_eq!(r.num_cpus(), 0.1); - - let r = TaskResourceRequest::new( - ResourceRequest::default(), - DaftExecutionConfig::default().min_cpu_per_task, - ); - assert_eq!(r.num_cpus(), 1.0); - } - - #[test] - fn num_cpus_honors_explicit_request() { - let explicit = ResourceRequest::try_new_internal(Some(2.0), None, None).unwrap(); - let r = TaskResourceRequest::new(explicit, 0.5); - assert_eq!(r.num_cpus(), 2.0); - - let explicit = ResourceRequest::try_new_internal(Some(0.25), None, None).unwrap(); - let r = TaskResourceRequest::new(explicit, 0.5); - assert_eq!(r.num_cpus(), 0.25); - } - - #[test] - fn aggregate_ray_bundles_packs_fractional_cpu() { - // 4 CPU-only tasks at 0.25 -> one {CPU:1} bundle, not 4 (issue #7123). - let r = TaskResourceRequest::new(ResourceRequest::default(), 0.25); - let refs = vec![&r, &r, &r, &r]; - assert_eq!( - aggregate_ray_bundles(&refs), - vec![RayBundle { - cpu: 1, - gpu: None, - memory: None - }] - ); - } - - #[test] - fn aggregate_ray_bundles_rounds_partial_cpu_up() { - // 0.1 * 5 = 0.5 -> ceil -> 1 bundle. - let r = TaskResourceRequest::new(ResourceRequest::default(), 0.1); - let refs = vec![&r, &r, &r, &r, &r]; - assert_eq!(aggregate_ray_bundles(&refs).len(), 1); - } - - #[test] - fn aggregate_ray_bundles_keeps_multi_cpu_tasks_individual() { - // A task needing 4 CPUs must stay one bundle, not 4 spread unit bundles, - // or the autoscaler could provision 4 single-CPU nodes and leave it - // unschedulable. - let multi = TaskResourceRequest::new( - ResourceRequest::try_new_internal(Some(4.0), None, None).unwrap(), - 1.0, - ); - let frac = TaskResourceRequest::new(ResourceRequest::default(), 0.5); - let refs = vec![&multi, &frac, &frac]; - let bundles = aggregate_ray_bundles(&refs); - assert!(bundles.contains(&RayBundle { - cpu: 4, - gpu: None, - memory: None - })); - // 0.5 + 0.5 = 1.0 -> one unit bundle. - assert_eq!( - bundles - .iter() - .filter(|b| **b - == RayBundle { - cpu: 1, - gpu: None, - memory: None - }) - .count(), - 1 - ); - assert_eq!(bundles.len(), 2); - } - - #[test] - fn aggregate_ray_bundles_keeps_gpu_and_memory_bundles() { - let cpu_only = TaskResourceRequest::new(ResourceRequest::default(), 0.5); - let gpu = TaskResourceRequest::new( - ResourceRequest::try_new_internal(Some(0.1), Some(1.0), None).unwrap(), - 1.0, - ); - let mem = TaskResourceRequest::new( - ResourceRequest::try_new_internal(Some(0.1), None, Some(1024)).unwrap(), - 1.0, - ); - let refs = vec![&cpu_only, &gpu, &mem]; - let bundles = aggregate_ray_bundles(&refs); - assert_eq!(bundles.len(), 3); - assert!(bundles.contains(&RayBundle { - cpu: 1, - gpu: Some(1), - memory: None - })); - assert!(bundles.contains(&RayBundle { - cpu: 1, - gpu: None, - memory: Some(1024) - })); - assert!(bundles.contains(&RayBundle { - cpu: 1, - gpu: None, - memory: None - })); - } - - #[test] - fn aggregate_ray_bundles_packs_fractional_gpu() { - // 11 tasks at 0.1 GPU -> ceil(1.1) = 2 GPU bundles, not 11. - let rr = ResourceRequest::try_new_internal(Some(0.1), Some(0.1), None).unwrap(); - let g = TaskResourceRequest::new(rr, 1.0); - let refs: Vec<_> = (0..11).map(|_| &g).collect(); - let bundles = aggregate_ray_bundles(&refs); - let total_gpu: i64 = bundles.iter().filter_map(|b| b.gpu).sum(); - assert_eq!(total_gpu, 2); - assert!(bundles.iter().all(|b| b.gpu == Some(1) && b.cpu == 1)); - } - - #[test] - fn aggregate_ray_bundles_respects_explicit_zero_cpu() { - let gpu = TaskResourceRequest::new( - ResourceRequest::try_new_internal(Some(0.0), Some(0.5), None).unwrap(), - 1.0, - ); - assert!( - aggregate_ray_bundles(&[&gpu, &gpu]) - .iter() - .all(|b| b.cpu == 0 && b.gpu == Some(1)) - ); - - let mem = TaskResourceRequest::new( - ResourceRequest::try_new_internal(Some(0.0), None, Some(1024)).unwrap(), - 1.0, - ); - assert_eq!( - aggregate_ray_bundles(&[&mem]), - vec![RayBundle { - cpu: 0, - gpu: None, - memory: Some(1024) - }] - ); - } - - #[test] - fn next_autoscale_request_escalates_by_whole_cpu() { - // mark=1.0 -> select 11 tasks (cpu_sum>1.0) -> ceil(1.1) = 2 CPUs. - let r = TaskResourceRequest::new(ResourceRequest::default(), 0.1); - let pending: Vec<_> = (0..50).map(|_| r.clone()).collect(); - let bundles = next_autoscale_request(&pending, 1.0, 0.0, 0).unwrap(); - let total_cpu: i64 = bundles.iter().map(|b| b.cpu).sum(); - assert_eq!(total_cpu, 2); - } - - #[test] - fn next_autoscale_request_packs_fractional_gpu() { - let rr = ResourceRequest::try_new_internal(Some(0.1), Some(0.1), None).unwrap(); - let g = TaskResourceRequest::new(rr, 1.0); - let pending: Vec<_> = (0..30).map(|_| g.clone()).collect(); - let bundles = next_autoscale_request(&pending, 100.0, 1.0, 0).unwrap(); - let total_gpu: i64 = bundles.iter().filter_map(|b| b.gpu).sum(); - assert_eq!(total_gpu, 2); - } - - #[test] - fn next_autoscale_request_skips_below_mark() { - let r = TaskResourceRequest::new(ResourceRequest::default(), 0.1); - let pending = vec![r.clone(), r]; - assert!(next_autoscale_request(&pending, 1.0, 0.0, 0).is_none()); - } - - #[test] - fn next_autoscale_request_grows_for_fractional_gpu_with_cpu_fallback() { - // @daft.cls(gpus=0.5): num_cpus=None -> fallback 1.0 CPU, num_gpus=0.5. - // At mark 1 CPU / 1 GPU the request must actually grow, not restate 1/1. - let rr = ResourceRequest::try_new_internal(None, Some(0.5), None).unwrap(); - let t = TaskResourceRequest::new(rr, 1.0); - let pending = vec![t.clone(), t]; - let bundles = next_autoscale_request(&pending, 1.0, 1.0, 0).unwrap(); - let total_cpu: i64 = bundles.iter().map(|b| b.cpu).sum(); - let total_gpu: i64 = bundles.iter().filter_map(|b| b.gpu).sum(); - assert!( - total_cpu > 1 || total_gpu > 1, - "request must grow past 1/1, got {total_cpu}/{total_gpu}" - ); - // Each bundle must fit a standard 1-CPU/1-GPU node. - assert!( - bundles - .iter() - .all(|b| b.cpu <= 1 && b.gpu.unwrap_or(0) <= 1), - "bundles must be single-node schedulable, got {bundles:?}" - ); - } - #[derive(Debug)] pub struct MockPartition { num_rows: usize, @@ -968,10 +680,7 @@ pub(super) mod tests { task_name: String::new(), priority: MockTaskPriority { priority: 0 }, scheduling_strategy: SchedulingStrategy::Spread, - resource_request: TaskResourceRequest::new( - ResourceRequest::default(), - DaftExecutionConfig::default().min_cpu_per_task, - ), + resource_request: TaskResourceRequest::new(ResourceRequest::default()), task_result: crate::pipeline_node::MaterializedOutput::new( vec![partition_ref], "".into(), @@ -990,8 +699,7 @@ pub(super) mod tests { } pub fn with_resource_request(mut self, resource_request: ResourceRequest) -> Self { - self.resource_request = - TaskResourceRequest::new(resource_request, self.resource_request.min_cpu_per_task); + self.resource_request = TaskResourceRequest::new(resource_request); self } diff --git a/src/daft-local-plan/src/plan.rs b/src/daft-local-plan/src/plan.rs index 10415fe1ae1..d3ca66b2180 100644 --- a/src/daft-local-plan/src/plan.rs +++ b/src/daft-local-plan/src/plan.rs @@ -1231,8 +1231,7 @@ impl LocalPhysicalPlan { } pub fn resource_request(self: &Arc) -> ResourceRequest { - // Leave num_cpus unset so task layer can fall back to min_cpu_per_task. - let mut base = ResourceRequest::default(); + let mut base = ResourceRequest::default_cpu(); self.apply(|plan| match plan.as_ref() { Self::UDFProject(UDFProject { expr, @@ -2512,13 +2511,6 @@ mod task_topology_tests { ) } - #[test] - fn resource_request_leaves_num_cpus_unset_for_plain_plans() { - let plan = limit(scan(), 5); - let rr = plan.resource_request(); - assert!(rr.num_cpus().is_none()); - } - /// Constructors set `is_task_leaf` based on the node's children — leaves /// (no children) get `true`, others get `false` — regardless of how the /// caller-supplied `LocalNodeContext` was initialised. diff --git a/tests/test_context.py b/tests/test_context.py index 2affde04a2f..72621137fe2 100644 --- a/tests/test_context.py +++ b/tests/test_context.py @@ -377,12 +377,7 @@ def test_set_scantask_max_parallelism_greater_than_partition_num(): assert "Num Parallel Scan Tasks = 17" in str_io.getvalue().strip() -def test_min_cpu_per_task_accepts_fractional(): - with daft.execution_config_ctx(min_cpu_per_task=0.1): - assert daft.context.get_context().daft_execution_config.min_cpu_per_task == 0.1 - - -@pytest.mark.parametrize("value", [0.0, -0.5, float("nan"), float("inf")]) -def test_min_cpu_per_task_rejects_invalid(value): - with pytest.raises(ValueError, match="min_cpu_per_task"): - daft.set_execution_config(min_cpu_per_task=value) +def test_min_cpu_per_task_is_deprecated(): + with pytest.warns(DeprecationWarning, match="min_cpu_per_task"): + with daft.execution_config_ctx(min_cpu_per_task=0.1): + assert daft.context.get_context().daft_execution_config.min_cpu_per_task == 0.1