diff --git a/daft/context.py b/daft/context.py index 8d53a8e4a11..e7e88c5e6f2 100644 --- a/daft/context.py +++ b/daft/context.py @@ -3,6 +3,7 @@ import contextlib import logging import threading +import warnings from dataclasses import dataclass from typing import TYPE_CHECKING, Any, ClassVar @@ -285,7 +286,8 @@ def set_execution_config( pre_shuffle_merge_partition_threshold: Number of partitions threshold to enable pre-shuffle merge when shuffle_algorithm is "auto". Defaults to 200. scantask_max_parallel: Set the max parallelism for running scan tasks simultaneously. Currently, this only works for Native Runner. If set to 0, all available CPUs will be used. Defaults to 8. native_parquet_writer: Whether to use the native parquet writer vs the pyarrow parquet writer. Defaults to `True`. - min_cpu_per_task: Minimum CPU per task in the Ray runner. Defaults to 0.5. + min_cpu_per_task: Deprecated. This was used by the old Ray runner and has no effect on + Flotilla scheduling. It will be removed in the next minor version. actor_udf_ready_timeout: Timeout for UDF actors to be ready. Defaults to 120 seconds. maintain_order: Whether to maintain order during execution. Defaults to True. Some blocking sink operators (e.g. write_parquet) won't respect this flag and will always keep maintain_order as false, and propagate to child operators. It's useful to set this to False for running df.collect() when no ordering is required. enable_dynamic_batching: Whether to enable dynamic batching. Defaults to False. @@ -295,6 +297,14 @@ def set_execution_config( enable_multi_glob_path_tasks: Whether to create multiple glob path tasks in Ray Runner to achieve parallel glob. Defaults to False. """ # Replace values in the DaftExecutionConfig with user-specified overrides + if min_cpu_per_task is not None: + warnings.warn( + "`min_cpu_per_task` is deprecated and has no effect on Flotilla scheduling. " + "It will be removed in the next minor version.", + DeprecationWarning, + stacklevel=2, + ) + ctx = get_context() with ctx._lock: old_daft_execution_config = ctx._ctx._daft_execution_config if config is None else config diff --git a/src/common/daft-config/src/lib.rs b/src/common/daft-config/src/lib.rs index 53c61bf79b5..93563f8a0f6 100644 --- a/src/common/daft-config/src/lib.rs +++ b/src/common/daft-config/src/lib.rs @@ -241,6 +241,10 @@ impl DaftExecutionConfig { if let Some(val) = parse_number_from_env(Self::ENV_DAFT_MIN_CPU_PER_TASK, cfg.min_cpu_per_task) { + eprintln!( + "{} is deprecated and has no effect on Flotilla scheduling. It will be removed in the next minor version.", + Self::ENV_DAFT_MIN_CPU_PER_TASK + ); cfg.min_cpu_per_task = val; } diff --git a/tests/test_context.py b/tests/test_context.py index 971f017e6dd..72621137fe2 100644 --- a/tests/test_context.py +++ b/tests/test_context.py @@ -375,3 +375,9 @@ def test_set_scantask_max_parallelism_greater_than_partition_num(): df = daft.range(start=0, end=1024, partitions=10) df.explain(show_all=True, file=str_io) assert "Num Parallel Scan Tasks = 17" in str_io.getvalue().strip() + + +def test_min_cpu_per_task_is_deprecated(): + with pytest.warns(DeprecationWarning, match="min_cpu_per_task"): + with daft.execution_config_ctx(min_cpu_per_task=0.1): + assert daft.context.get_context().daft_execution_config.min_cpu_per_task == 0.1