Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ def _evaluate(sensor_def: "AutomationConditionSensorDefinition", context: Sensor
default_condition=sensor_def.default_condition,
logger=context.log,
)
if evaluation_context.total_keys > MAX_ENTITIES:
if evaluation_context.total_keys > sensor_def.max_entities:
raise DagsterInvalidInvocationError(
f'AutomationConditionSensorDefinition "{sensor_def.name}" targets {evaluation_context.total_keys} '
f"assets or checks, which is more than the limit of {MAX_ENTITIES}. Either set `use_user_code_server` to `False`, "
"or split this sensor into multiple AutomationConditionSensorDefinitions with AssetSelections that target fewer "
"assets or checks."
f"assets or checks, which is more than the limit of {sensor_def.max_entities}. Either set `use_user_code_server` to `False`, "
"increase `max_entities`, or split this sensor into multiple AutomationConditionSensorDefinitions with "
"AssetSelections that target fewer assets or checks."
)

run_requests, new_cursor, updated_evaluations = evaluation_context.evaluate()
Expand All @@ -81,6 +81,7 @@ def not_supported(context) -> None:
@public
@beta_param(param="use_user_code_server")
@beta_param(param="default_condition")
@beta_param(param="max_entities")
class AutomationConditionSensorDefinition(SensorDefinition, IHasInternalInit):
"""Targets a set of assets and repeatedly evaluates all the AutomationConditions on all of
those assets to determine which to request runs for.
Expand All @@ -105,11 +106,16 @@ class AutomationConditionSensorDefinition(SensorDefinition, IHasInternalInit):
use_user_code_server (bool): (Beta) If set to True, this sensor will be evaluated in the user
code server, rather than the AssetDaemon. This enables evaluating custom AutomationCondition
subclasses, and ensures that the condition definitions will remain in sync with your user code
version, eliminating version skew. Note: currently a maximum of 500 assets or checks may be
targeted at a time by a sensor that has this value set.
version, eliminating version skew. Note: by default, a maximum of 500 assets or checks may
be targeted at a time by a sensor that has this value set. This limit can be adjusted via
the `max_entities` parameter.
default_condition (Optional[AutomationCondition]): (Beta) If provided, this condition will
be used for any selected assets or asset checks which do not have an automation condition defined.
Requires `use_user_code_server` to be set to `True`.
max_entities (Optional[int]): (Beta) If provided, overrides the maximum number of assets or
checks that may be targeted by this sensor (500 by default). Evaluating large numbers of
entities in a single sensor evaluation can be resource-intensive, so this limit should be
raised with care. Requires `use_user_code_server` to be set to `True`.

Examples:
.. code-block:: python
Expand Down Expand Up @@ -159,10 +165,23 @@ def __init__(
emit_backfills: bool = True,
use_user_code_server: bool = False,
default_condition: AutomationCondition | None = None,
max_entities: int | None = None,
):
self._use_user_code_server = use_user_code_server
check.bool_param(emit_backfills, "allow_backfills")

self._max_entities = check.opt_int_param(max_entities, "max_entities")
check.param_invariant(
self._max_entities is None or self._max_entities > 0,
"max_entities",
"`max_entities` must be a positive integer.",
)
check.param_invariant(
not (self._max_entities and not self._use_user_code_server),
"max_entities",
"Setting `max_entities` for a non-user-code AutomationConditionSensorDefinition is not supported.",
)

self._default_condition = check.opt_inst_param(
default_condition, "default_condition", AutomationCondition
)
Expand Down Expand Up @@ -211,6 +230,10 @@ def emit_backfills(self) -> bool:
def default_condition(self) -> AutomationCondition | None:
return self._default_condition

@property
def max_entities(self) -> int:
return self._max_entities if self._max_entities is not None else MAX_ENTITIES

@property
def sensor_type(self) -> SensorType:
return SensorType.AUTOMATION if self._use_user_code_server else SensorType.AUTO_MATERIALIZE
Expand All @@ -229,6 +252,7 @@ def dagster_internal_init( # type: ignore
emit_backfills: bool,
use_user_code_server: bool,
default_condition: AutomationCondition | None,
max_entities: int | None,
) -> "AutomationConditionSensorDefinition":
Comment on lines 254 to 256

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Remove the = None default to satisfy the IHasInternalInit contract. The only caller (with_attributes) already passes this kwarg explicitly, so no callers break.

Suggested change
default_condition: AutomationCondition | None,
max_entities: int | None = None,
) -> "AutomationConditionSensorDefinition":
default_condition: AutomationCondition | None,
max_entities: int | None,
) -> "AutomationConditionSensorDefinition":

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

return AutomationConditionSensorDefinition(
name=name,
Expand All @@ -242,6 +266,7 @@ def dagster_internal_init( # type: ignore
emit_backfills=emit_backfills,
use_user_code_server=use_user_code_server,
default_condition=default_condition,
max_entities=max_entities,
)

def with_attributes(
Expand All @@ -266,4 +291,5 @@ def with_attributes(
emit_backfills=self._emit_backfills,
use_user_code_server=self._use_user_code_server,
default_condition=self._default_condition,
max_entities=self._max_entities,
)
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,73 @@ def test_limits() -> None:
repository_def=defs.get_repository_def(),
),
)


def test_max_entities_param_validation() -> None:
# only supported alongside use_user_code_server
with pytest.raises(ParameterCheckError, match="non-user-code"):
dg.AutomationConditionSensorDefinition("foo", target="*", max_entities=100)

# must be positive
with pytest.raises(ParameterCheckError, match="positive"):
dg.AutomationConditionSensorDefinition(
"foo", target="*", use_user_code_server=True, max_entities=0
)

sensor = dg.AutomationConditionSensorDefinition(
"foo", target="*", use_user_code_server=True, max_entities=1000
)
assert sensor.max_entities == 1000
# survives attribute-replacing copies
assert sensor.with_attributes(metadata={}).max_entities == 1000

# defaults to the standard limit when unset
sensor = dg.AutomationConditionSensorDefinition("foo", target="*", use_user_code_server=True)
assert sensor.max_entities == 500


def test_max_entities_overrides_limit() -> None:
defs = dg.Definitions(
assets=build_assets(
"test",
layer_configs=[AssetLayerConfig(600)],
automation_condition=AutomationCondition.eager(),
)
)

# under the default limit of 500, 600 targeted assets raises
sensor = dg.AutomationConditionSensorDefinition("foo", target="*", use_user_code_server=True)
with pytest.raises(
dg.DagsterInvalidInvocationError, match="more than the limit of 500"
):
sensor(
dg.build_sensor_context(
instance=DagsterInstance.ephemeral(),
repository_def=defs.get_repository_def(),
),
)

# raising the limit allows evaluation to proceed
sensor = dg.AutomationConditionSensorDefinition(
"foo", target="*", use_user_code_server=True, max_entities=1000
)
sensor(
dg.build_sensor_context(
instance=DagsterInstance.ephemeral(),
repository_def=defs.get_repository_def(),
),
)

# a lowered limit is also respected
sensor = dg.AutomationConditionSensorDefinition(
"foo", target="*", use_user_code_server=True, max_entities=100
)
with pytest.raises(
dg.DagsterInvalidInvocationError, match="more than the limit of 100"
):
sensor(
dg.build_sensor_context(
instance=DagsterInstance.ephemeral(),
repository_def=defs.get_repository_def(),
),
)