diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/asset_condition_evaluations.py b/python_modules/dagster-graphql/dagster_graphql/schema/asset_condition_evaluations.py index 6ec8bbf2b9cba..3bf886e13c7c5 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/asset_condition_evaluations.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/asset_condition_evaluations.py @@ -11,6 +11,7 @@ AutomationConditionEvaluation, get_expanded_label, ) +from dagster._core.errors import DagsterInvalidInvocationError from dagster._core.scheduler.instigation import AutoMaterializeAssetEvaluationRecord from dagster_graphql.implementation.events import iterate_metadata_entries @@ -49,10 +50,10 @@ class Meta: def __init__(self, evaluation: AutomationConditionEvaluation): self._evaluation = evaluation - if evaluation.true_subset.size > 0: + if _get_historical_subset_size(evaluation.true_subset) > 0: status = AssetConditionEvaluationStatus.TRUE elif isinstance(evaluation.candidate_subset, SerializableEntitySubset) and ( - evaluation.candidate_subset.size > 0 + _get_historical_subset_size(evaluation.candidate_subset) > 0 ): status = AssetConditionEvaluationStatus.FALSE else: @@ -110,11 +111,11 @@ def __init__(self, evaluation: AutomationConditionEvaluation): ) def resolve_numTrue(self, graphene_info: ResolveInfo) -> int | None: - return self._evaluation.true_subset.size + return _get_historical_subset_size(self._evaluation.true_subset) def resolve_numCandidates(self, graphene_info: ResolveInfo) -> int | None: return ( - self._evaluation.candidate_subset.size + _get_historical_subset_size(self._evaluation.candidate_subset) if isinstance(self._evaluation.candidate_subset, SerializableEntitySubset) else None ) @@ -143,12 +144,11 @@ def __init__(self, evaluation: AutomationConditionEvaluation, partition_key: str # where a sub-condition is not partitioned. In these cases, we can treat the expression # as SKIPPED status = AssetConditionEvaluationStatus.SKIPPED - elif partition_key in evaluation.true_subset.subset_value: + elif _historical_subset_contains_partition(evaluation.true_subset, partition_key): status = AssetConditionEvaluationStatus.TRUE - elif ( - not isinstance(evaluation.candidate_subset, SerializableEntitySubset) - or partition_key in evaluation.candidate_subset.subset_value - ): + elif not isinstance( + evaluation.candidate_subset, SerializableEntitySubset + ) or _historical_subset_contains_partition(evaluation.candidate_subset, partition_key): status = AssetConditionEvaluationStatus.FALSE else: status = AssetConditionEvaluationStatus.SKIPPED @@ -171,7 +171,7 @@ def resolve_metadataEntries( ( subset.metadata for subset in self._evaluation.subsets_with_metadata - if self._partition_key in subset.subset.subset_value + if _historical_subset_contains_partition(subset.subset, self._partition_key) ), {}, ) @@ -281,11 +281,11 @@ def __init__(self, evaluation: AutomationConditionEvaluation): ) def resolve_numTrue(self, graphene_info: ResolveInfo) -> int | None: - return self._evaluation.true_subset.size + return _get_historical_subset_size(self._evaluation.true_subset) def resolve_numCandidates(self, graphene_info: ResolveInfo) -> int | None: return ( - self._evaluation.candidate_subset.size + _get_historical_subset_size(self._evaluation.candidate_subset) if isinstance(self._evaluation.candidate_subset, SerializableEntitySubset) else None ) @@ -362,7 +362,7 @@ def __init__( ) def resolve_numRequested(self, graphene_info: ResolveInfo) -> int | None: - return self._root_evaluation.true_subset.size + return _get_historical_subset_size(self._root_evaluation.true_subset) class GrapheneAssetConditionEvaluationRecords(graphene.ObjectType): @@ -388,3 +388,19 @@ def _flatten_evaluation( import itertools return list(itertools.chain([e], *(_flatten_evaluation(ce) for ce in e.child_evaluations))) + + +def _get_historical_subset_size(subset: SerializableEntitySubset) -> int: + try: + return subset.size + except DagsterInvalidInvocationError: + return 0 + + +def _historical_subset_contains_partition( + subset: SerializableEntitySubset, partition_key: str +) -> bool: + try: + return subset.is_partitioned and partition_key in subset.subset_value + except DagsterInvalidInvocationError: + return False diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_condition_evaluations.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_condition_evaluations.py index 524f683f00519..25635d1197324 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_condition_evaluations.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_condition_evaluations.py @@ -17,11 +17,16 @@ HistoricalAllPartitionsSubsetSentinel, ) from dagster._core.definitions.partitions.definition import ( + DynamicPartitionsDefinition, PartitionsDefinition, StaticPartitionsDefinition, ) +from dagster._core.definitions.partitions.partition_key_range import PartitionKeyRange +from dagster._core.definitions.partitions.snap import PartitionsSnap +from dagster._core.definitions.partitions.subset.key_ranges import KeyRangesPartitionsSubset from dagster._core.definitions.run_request import InstigatorType from dagster._core.definitions.sensor_definition import SensorType +from dagster._core.errors import DagsterInvalidInvocationError from dagster._core.instance import DagsterInstance from dagster._core.remote_origin import RemoteInstigatorOrigin from dagster._core.scheduler.instigation import ( @@ -527,6 +532,34 @@ def _get_condition_evaluation( subsets_with_metadata=[], ) + def _get_key_range_condition_evaluation( + self, + asset_key: AssetKey, + description: str, + partition_key_ranges: Sequence[PartitionKeyRange], + ) -> AutomationConditionEvaluation: + partitions_snap = PartitionsSnap.from_def(DynamicPartitionsDefinition(name="dynamic")) + subset = SerializableEntitySubset( + key=asset_key, + value=KeyRangesPartitionsSubset( + partitions_snap=partitions_snap, + key_ranges=partition_key_ranges, + ), + ) + return AutomationConditionEvaluation( + condition_snapshot=AutomationConditionNodeSnapshot( + class_name="...", + description=description, + unique_id=str(random.randint(0, 100000000)), + ), + true_subset=subset, + candidate_subset=HistoricalAllPartitionsSubsetSentinel(), + start_timestamp=123, + end_timestamp=456, + child_evaluations=[], + subsets_with_metadata=[], + ) + def test_get_evaluations_with_partitions(self, graphql_context: WorkspaceRequestContext): asset_key = AssetKey("upstream_static_partitioned_asset") partitions_def = StaticPartitionsDefinition(["a", "b", "c", "d", "e", "f"]) @@ -822,6 +855,86 @@ def _get_node(id): "d", } + def test_get_evaluations_tolerates_deleted_historical_partition_size( + self, graphql_context: WorkspaceRequestContext + ): + asset_key = AssetKey("dynamic_partitioned_asset") + graphql_context.instance.add_dynamic_partitions("dynamic", ["a"]) + evaluation = self._get_key_range_condition_evaluation( + asset_key, + "stale historical subset", + [PartitionKeyRange("a", "a")], + ) + + check.not_none( + graphql_context.instance.schedule_storage + ).add_auto_materialize_asset_evaluations( + evaluation_id=201, + asset_evaluations=[ + AutomationConditionEvaluationWithRunIds(evaluation=evaluation, run_ids=frozenset()) + ], + ) + + with patch.object( + KeyRangesPartitionsSubset, + "__len__", + side_effect=DagsterInvalidInvocationError("Partition key was deleted."), + ): + results = execute_dagster_graphql( + graphql_context, + QUERY, + variables={ + "assetKey": {"path": ["dynamic_partitioned_asset"]}, + "limit": 10, + "cursor": None, + }, + ) + + assert not results.errors + records = results.data["assetConditionEvaluationRecordsOrError"]["records"] + assert len(records) == 1 + assert records[0]["numRequested"] == 0 + assert records[0]["evaluationNodes"][0]["numTrue"] == 0 + + def test_get_partition_evaluation_tolerates_deleted_historical_partition_membership( + self, graphql_context: WorkspaceRequestContext + ): + asset_key = AssetKey("dynamic_partitioned_asset") + graphql_context.instance.add_dynamic_partitions("dynamic", ["a"]) + evaluation = self._get_key_range_condition_evaluation( + asset_key, + "stale historical subset", + [PartitionKeyRange("a", "a")], + ) + + check.not_none( + graphql_context.instance.schedule_storage + ).add_auto_materialize_asset_evaluations( + evaluation_id=202, + asset_evaluations=[ + AutomationConditionEvaluationWithRunIds(evaluation=evaluation, run_ids=frozenset()) + ], + ) + + with patch.object( + KeyRangesPartitionsSubset, + "__contains__", + side_effect=DagsterInvalidInvocationError("Partition key was deleted."), + ): + results = execute_dagster_graphql( + graphql_context, + LEGACY_QUERY_FOR_SPECIFIC_PARTITION, + variables={ + "assetKey": {"path": ["dynamic_partitioned_asset"]}, + "partition": "a", + "evaluationId": 202, + }, + ) + + assert not results.errors + evaluation = results.data["assetConditionEvaluationForPartition"] + assert evaluation["evaluationNodes"][0]["status"] == "FALSE" + def test_since_metadata_field(self, graphql_context: WorkspaceRequestContext): """Test that the sinceMetadata field is correctly populated for SinceCondition evaluations.""" asset_key = AssetKey("test_asset")