Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
AutomationConditionEvaluation,
get_expanded_label,
)
from dagster._core.errors import DagsterInvalidInvocationError
from dagster._core.scheduler.instigation import AutoMaterializeAssetEvaluationRecord

from dagster_graphql.implementation.events import iterate_metadata_entries
Expand Down Expand Up @@ -49,10 +50,10 @@ class Meta:

def __init__(self, evaluation: AutomationConditionEvaluation):
self._evaluation = evaluation
if evaluation.true_subset.size > 0:
if _get_historical_subset_size(evaluation.true_subset) > 0:
status = AssetConditionEvaluationStatus.TRUE
elif isinstance(evaluation.candidate_subset, SerializableEntitySubset) and (
evaluation.candidate_subset.size > 0
_get_historical_subset_size(evaluation.candidate_subset) > 0
):
status = AssetConditionEvaluationStatus.FALSE
else:
Expand Down Expand Up @@ -110,11 +111,11 @@ def __init__(self, evaluation: AutomationConditionEvaluation):
)

def resolve_numTrue(self, graphene_info: ResolveInfo) -> int | None:
return self._evaluation.true_subset.size
return _get_historical_subset_size(self._evaluation.true_subset)

def resolve_numCandidates(self, graphene_info: ResolveInfo) -> int | None:
return (
self._evaluation.candidate_subset.size
_get_historical_subset_size(self._evaluation.candidate_subset)
if isinstance(self._evaluation.candidate_subset, SerializableEntitySubset)
else None
)
Expand Down Expand Up @@ -143,12 +144,11 @@ def __init__(self, evaluation: AutomationConditionEvaluation, partition_key: str
# where a sub-condition is not partitioned. In these cases, we can treat the expression
# as SKIPPED
status = AssetConditionEvaluationStatus.SKIPPED
elif partition_key in evaluation.true_subset.subset_value:
elif _historical_subset_contains_partition(evaluation.true_subset, partition_key):
status = AssetConditionEvaluationStatus.TRUE
elif (
not isinstance(evaluation.candidate_subset, SerializableEntitySubset)
or partition_key in evaluation.candidate_subset.subset_value
):
elif not isinstance(
evaluation.candidate_subset, SerializableEntitySubset
) or _historical_subset_contains_partition(evaluation.candidate_subset, partition_key):
status = AssetConditionEvaluationStatus.FALSE
else:
status = AssetConditionEvaluationStatus.SKIPPED
Expand All @@ -171,7 +171,7 @@ def resolve_metadataEntries(
(
subset.metadata
for subset in self._evaluation.subsets_with_metadata
if self._partition_key in subset.subset.subset_value
if _historical_subset_contains_partition(subset.subset, self._partition_key)
),
{},
)
Expand Down Expand Up @@ -281,11 +281,11 @@ def __init__(self, evaluation: AutomationConditionEvaluation):
)

def resolve_numTrue(self, graphene_info: ResolveInfo) -> int | None:
return self._evaluation.true_subset.size
return _get_historical_subset_size(self._evaluation.true_subset)

def resolve_numCandidates(self, graphene_info: ResolveInfo) -> int | None:
return (
self._evaluation.candidate_subset.size
_get_historical_subset_size(self._evaluation.candidate_subset)
if isinstance(self._evaluation.candidate_subset, SerializableEntitySubset)
else None
)
Expand Down Expand Up @@ -362,7 +362,7 @@ def __init__(
)

def resolve_numRequested(self, graphene_info: ResolveInfo) -> int | None:
return self._root_evaluation.true_subset.size
return _get_historical_subset_size(self._root_evaluation.true_subset)


class GrapheneAssetConditionEvaluationRecords(graphene.ObjectType):
Expand All @@ -388,3 +388,19 @@ def _flatten_evaluation(
import itertools

return list(itertools.chain([e], *(_flatten_evaluation(ce) for ce in e.child_evaluations)))


def _get_historical_subset_size(subset: SerializableEntitySubset) -> int:
try:
return subset.size
except DagsterInvalidInvocationError:
return 0
Comment on lines +393 to +397

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Narrow exception scope may miss the loading-context error

KeyRangesPartitionsSubset.__len__ and __contains__ are both decorated with @require_full_partition_loading_context, which calls check.invariant(...) and raises DagsterInvariantViolationError — not DagsterInvalidInvocationError — when no partition loading context is active. If a GraphQL code path ever reaches these helpers without a context (e.g. in a test that bypasses the normal request setup), the DagsterInvariantViolationError would propagate unhandled and re-expose the same 500 crash. In normal production paths the context is always set, so this is low risk today, but worth documenting or catching the broader DagsterError base class if hardening is desired.



def _historical_subset_contains_partition(
subset: SerializableEntitySubset, partition_key: str
) -> bool:
try:
return subset.is_partitioned and partition_key in subset.subset_value
except DagsterInvalidInvocationError:
return False
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,16 @@
HistoricalAllPartitionsSubsetSentinel,
)
from dagster._core.definitions.partitions.definition import (
DynamicPartitionsDefinition,
PartitionsDefinition,
StaticPartitionsDefinition,
)
from dagster._core.definitions.partitions.partition_key_range import PartitionKeyRange
from dagster._core.definitions.partitions.snap import PartitionsSnap
from dagster._core.definitions.partitions.subset.key_ranges import KeyRangesPartitionsSubset
from dagster._core.definitions.run_request import InstigatorType
from dagster._core.definitions.sensor_definition import SensorType
from dagster._core.errors import DagsterInvalidInvocationError
from dagster._core.instance import DagsterInstance
from dagster._core.remote_origin import RemoteInstigatorOrigin
from dagster._core.scheduler.instigation import (
Expand Down Expand Up @@ -527,6 +532,34 @@ def _get_condition_evaluation(
subsets_with_metadata=[],
)

def _get_key_range_condition_evaluation(
self,
asset_key: AssetKey,
description: str,
partition_key_ranges: Sequence[PartitionKeyRange],
) -> AutomationConditionEvaluation:
partitions_snap = PartitionsSnap.from_def(DynamicPartitionsDefinition(name="dynamic"))
subset = SerializableEntitySubset(
key=asset_key,
value=KeyRangesPartitionsSubset(
partitions_snap=partitions_snap,
key_ranges=partition_key_ranges,
),
)
return AutomationConditionEvaluation(
condition_snapshot=AutomationConditionNodeSnapshot(
class_name="...",
description=description,
unique_id=str(random.randint(0, 100000000)),
),
true_subset=subset,
candidate_subset=HistoricalAllPartitionsSubsetSentinel(),
start_timestamp=123,
end_timestamp=456,
child_evaluations=[],
subsets_with_metadata=[],
)

def test_get_evaluations_with_partitions(self, graphql_context: WorkspaceRequestContext):
asset_key = AssetKey("upstream_static_partitioned_asset")
partitions_def = StaticPartitionsDefinition(["a", "b", "c", "d", "e", "f"])
Expand Down Expand Up @@ -822,6 +855,86 @@ def _get_node(id):
"d",
}

def test_get_evaluations_tolerates_deleted_historical_partition_size(
self, graphql_context: WorkspaceRequestContext
):
asset_key = AssetKey("dynamic_partitioned_asset")
graphql_context.instance.add_dynamic_partitions("dynamic", ["a"])
evaluation = self._get_key_range_condition_evaluation(
asset_key,
"stale historical subset",
[PartitionKeyRange("a", "a")],
)

check.not_none(
graphql_context.instance.schedule_storage
).add_auto_materialize_asset_evaluations(
evaluation_id=201,
asset_evaluations=[
AutomationConditionEvaluationWithRunIds(evaluation=evaluation, run_ids=frozenset())
],
)

with patch.object(
KeyRangesPartitionsSubset,
"__len__",
side_effect=DagsterInvalidInvocationError("Partition key was deleted."),
):
results = execute_dagster_graphql(
graphql_context,
QUERY,
variables={
"assetKey": {"path": ["dynamic_partitioned_asset"]},
"limit": 10,
"cursor": None,
},
)

assert not results.errors
records = results.data["assetConditionEvaluationRecordsOrError"]["records"]
assert len(records) == 1
assert records[0]["numRequested"] == 0
assert records[0]["evaluationNodes"][0]["numTrue"] == 0

def test_get_partition_evaluation_tolerates_deleted_historical_partition_membership(
self, graphql_context: WorkspaceRequestContext
):
asset_key = AssetKey("dynamic_partitioned_asset")
graphql_context.instance.add_dynamic_partitions("dynamic", ["a"])
evaluation = self._get_key_range_condition_evaluation(
asset_key,
"stale historical subset",
[PartitionKeyRange("a", "a")],
)

check.not_none(
graphql_context.instance.schedule_storage
).add_auto_materialize_asset_evaluations(
evaluation_id=202,
asset_evaluations=[
AutomationConditionEvaluationWithRunIds(evaluation=evaluation, run_ids=frozenset())
],
)

with patch.object(
KeyRangesPartitionsSubset,
"__contains__",
side_effect=DagsterInvalidInvocationError("Partition key was deleted."),
):
results = execute_dagster_graphql(
graphql_context,
LEGACY_QUERY_FOR_SPECIFIC_PARTITION,
variables={
"assetKey": {"path": ["dynamic_partitioned_asset"]},
"partition": "a",
"evaluationId": 202,
},
)

assert not results.errors
evaluation = results.data["assetConditionEvaluationForPartition"]
assert evaluation["evaluationNodes"][0]["status"] == "FALSE"
Comment on lines +934 to +936

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 "FALSE" status assertion may surprise future readers

When __contains__ is patched to throw for true_subset, the code falls through to check candidate_subset. Because candidate_subset is HistoricalAllPartitionsSubsetSentinel() — which is not a SerializableEntitySubset — the not isinstance(...) branch fires and yields FALSE. This is correct given the current logic, but it means a deleted partition shows up as FALSE (evaluated-and-rejected) rather than SKIPPED (not evaluated). A brief comment in the test explaining why FALSE (not SKIPPED) is expected would help future maintainers understand the assertion isn't a mistake.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!


def test_since_metadata_field(self, graphql_context: WorkspaceRequestContext):
"""Test that the sinceMetadata field is correctly populated for SinceCondition evaluations."""
asset_key = AssetKey("test_asset")
Expand Down