diff --git a/python_modules/dagster/dagster/_core/definitions/automation_condition_sensor_definition.py b/python_modules/dagster/dagster/_core/definitions/automation_condition_sensor_definition.py index d288662ad2697..2f889d51205c0 100644 --- a/python_modules/dagster/dagster/_core/definitions/automation_condition_sensor_definition.py +++ b/python_modules/dagster/dagster/_core/definitions/automation_condition_sensor_definition.py @@ -55,12 +55,12 @@ def _evaluate(sensor_def: "AutomationConditionSensorDefinition", context: Sensor default_condition=sensor_def.default_condition, logger=context.log, ) - if evaluation_context.total_keys > MAX_ENTITIES: + if evaluation_context.total_keys > sensor_def.max_entities: raise DagsterInvalidInvocationError( f'AutomationConditionSensorDefinition "{sensor_def.name}" targets {evaluation_context.total_keys} ' - f"assets or checks, which is more than the limit of {MAX_ENTITIES}. Either set `use_user_code_server` to `False`, " - "or split this sensor into multiple AutomationConditionSensorDefinitions with AssetSelections that target fewer " - "assets or checks." + f"assets or checks, which is more than the limit of {sensor_def.max_entities}. Either set `use_user_code_server` to `False`, " + "increase `max_entities`, or split this sensor into multiple AutomationConditionSensorDefinitions with " + "AssetSelections that target fewer assets or checks." ) run_requests, new_cursor, updated_evaluations = evaluation_context.evaluate() @@ -81,6 +81,7 @@ def not_supported(context) -> None: @public @beta_param(param="use_user_code_server") @beta_param(param="default_condition") +@beta_param(param="max_entities") class AutomationConditionSensorDefinition(SensorDefinition, IHasInternalInit): """Targets a set of assets and repeatedly evaluates all the AutomationConditions on all of those assets to determine which to request runs for. @@ -105,11 +106,16 @@ class AutomationConditionSensorDefinition(SensorDefinition, IHasInternalInit): use_user_code_server (bool): (Beta) If set to True, this sensor will be evaluated in the user code server, rather than the AssetDaemon. This enables evaluating custom AutomationCondition subclasses, and ensures that the condition definitions will remain in sync with your user code - version, eliminating version skew. Note: currently a maximum of 500 assets or checks may be - targeted at a time by a sensor that has this value set. + version, eliminating version skew. Note: by default, a maximum of 500 assets or checks may + be targeted at a time by a sensor that has this value set. This limit can be adjusted via + the `max_entities` parameter. default_condition (Optional[AutomationCondition]): (Beta) If provided, this condition will be used for any selected assets or asset checks which do not have an automation condition defined. Requires `use_user_code_server` to be set to `True`. + max_entities (Optional[int]): (Beta) If provided, overrides the maximum number of assets or + checks that may be targeted by this sensor (500 by default). Evaluating large numbers of + entities in a single sensor evaluation can be resource-intensive, so this limit should be + raised with care. Requires `use_user_code_server` to be set to `True`. Examples: .. code-block:: python @@ -159,10 +165,23 @@ def __init__( emit_backfills: bool = True, use_user_code_server: bool = False, default_condition: AutomationCondition | None = None, + max_entities: int | None = None, ): self._use_user_code_server = use_user_code_server check.bool_param(emit_backfills, "allow_backfills") + self._max_entities = check.opt_int_param(max_entities, "max_entities") + check.param_invariant( + self._max_entities is None or self._max_entities > 0, + "max_entities", + "`max_entities` must be a positive integer.", + ) + check.param_invariant( + not (self._max_entities and not self._use_user_code_server), + "max_entities", + "Setting `max_entities` for a non-user-code AutomationConditionSensorDefinition is not supported.", + ) + self._default_condition = check.opt_inst_param( default_condition, "default_condition", AutomationCondition ) @@ -211,6 +230,10 @@ def emit_backfills(self) -> bool: def default_condition(self) -> AutomationCondition | None: return self._default_condition + @property + def max_entities(self) -> int: + return self._max_entities if self._max_entities is not None else MAX_ENTITIES + @property def sensor_type(self) -> SensorType: return SensorType.AUTOMATION if self._use_user_code_server else SensorType.AUTO_MATERIALIZE @@ -229,6 +252,7 @@ def dagster_internal_init( # type: ignore emit_backfills: bool, use_user_code_server: bool, default_condition: AutomationCondition | None, + max_entities: int | None, ) -> "AutomationConditionSensorDefinition": return AutomationConditionSensorDefinition( name=name, @@ -242,6 +266,7 @@ def dagster_internal_init( # type: ignore emit_backfills=emit_backfills, use_user_code_server=use_user_code_server, default_condition=default_condition, + max_entities=max_entities, ) def with_attributes( @@ -266,4 +291,5 @@ def with_attributes( emit_backfills=self._emit_backfills, use_user_code_server=self._use_user_code_server, default_condition=self._default_condition, + max_entities=self._max_entities, ) diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_automation_condition_sensor_definition.py b/python_modules/dagster/dagster_tests/definitions_tests/test_automation_condition_sensor_definition.py index 3a6821b542e69..8ad9dfc776ead 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_automation_condition_sensor_definition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_automation_condition_sensor_definition.py @@ -92,3 +92,73 @@ def test_limits() -> None: repository_def=defs.get_repository_def(), ), ) + + +def test_max_entities_param_validation() -> None: + # only supported alongside use_user_code_server + with pytest.raises(ParameterCheckError, match="non-user-code"): + dg.AutomationConditionSensorDefinition("foo", target="*", max_entities=100) + + # must be positive + with pytest.raises(ParameterCheckError, match="positive"): + dg.AutomationConditionSensorDefinition( + "foo", target="*", use_user_code_server=True, max_entities=0 + ) + + sensor = dg.AutomationConditionSensorDefinition( + "foo", target="*", use_user_code_server=True, max_entities=1000 + ) + assert sensor.max_entities == 1000 + # survives attribute-replacing copies + assert sensor.with_attributes(metadata={}).max_entities == 1000 + + # defaults to the standard limit when unset + sensor = dg.AutomationConditionSensorDefinition("foo", target="*", use_user_code_server=True) + assert sensor.max_entities == 500 + + +def test_max_entities_overrides_limit() -> None: + defs = dg.Definitions( + assets=build_assets( + "test", + layer_configs=[AssetLayerConfig(600)], + automation_condition=AutomationCondition.eager(), + ) + ) + + # under the default limit of 500, 600 targeted assets raises + sensor = dg.AutomationConditionSensorDefinition("foo", target="*", use_user_code_server=True) + with pytest.raises( + dg.DagsterInvalidInvocationError, match="more than the limit of 500" + ): + sensor( + dg.build_sensor_context( + instance=DagsterInstance.ephemeral(), + repository_def=defs.get_repository_def(), + ), + ) + + # raising the limit allows evaluation to proceed + sensor = dg.AutomationConditionSensorDefinition( + "foo", target="*", use_user_code_server=True, max_entities=1000 + ) + sensor( + dg.build_sensor_context( + instance=DagsterInstance.ephemeral(), + repository_def=defs.get_repository_def(), + ), + ) + + # a lowered limit is also respected + sensor = dg.AutomationConditionSensorDefinition( + "foo", target="*", use_user_code_server=True, max_entities=100 + ) + with pytest.raises( + dg.DagsterInvalidInvocationError, match="more than the limit of 100" + ): + sensor( + dg.build_sensor_context( + instance=DagsterInstance.ephemeral(), + repository_def=defs.get_repository_def(), + ), + )