From cf87c27ea944fc268861734a620ad286f95f9120 Mon Sep 17 00:00:00 2001 From: iman Date: Sat, 6 Jun 2026 01:10:30 +0500 Subject: [PATCH] Fix DagsterInvalidInvocationError when reading historical entity subsets with deleted partitions --- .../serializable_entity_subset.py | 44 ++++++++++++++----- .../test_serializable_entity_subset.py | 34 ++++++++++++++ 2 files changed, 66 insertions(+), 12 deletions(-) diff --git a/python_modules/dagster/dagster/_core/asset_graph_view/serializable_entity_subset.py b/python_modules/dagster/dagster/_core/asset_graph_view/serializable_entity_subset.py index 0457220a08b8a..9ef59404e1c92 100644 --- a/python_modules/dagster/dagster/_core/asset_graph_view/serializable_entity_subset.py +++ b/python_modules/dagster/dagster/_core/asset_graph_view/serializable_entity_subset.py @@ -1,14 +1,16 @@ import operator from collections.abc import Callable, Sequence from dataclasses import dataclass, replace -from typing import Any, Generic, Optional, TypeAlias +from typing import TYPE_CHECKING, Any, Generic, Optional, TypeAlias + +if TYPE_CHECKING: + from dagster._core.asset_graph_view.entity_subset import AssetSubset from dagster_shared.serdes.serdes import DataclassSerializer, whitelist_for_serdes from typing_extensions import Self import dagster._check as check from dagster._core.definitions.asset_key import T_EntityKey -from dagster._core.definitions.events import AssetKeyPartitionKey from dagster._core.definitions.partitions.context import partition_loading_context from dagster._core.definitions.partitions.definition import PartitionsDefinition from dagster._core.definitions.partitions.snap.snap import PartitionsSnap @@ -19,6 +21,7 @@ TimeWindowPartitionsSubset, ) from dagster._core.definitions.partitions.subset.key_ranges import KeyRangesPartitionsSubset +from dagster._core.errors import DagsterInvalidInvocationError EntitySubsetValue: TypeAlias = bool | PartitionsSubset @@ -43,6 +46,7 @@ def before_pack(self, value: "SerializableEntitySubset") -> "SerializableEntityS storage_field_names={"key": "asset_key"}, old_storage_names={"AssetSubset"}, ) + @dataclass(frozen=True) class SerializableEntitySubset(Generic[T_EntityKey]): """Represents a serializable subset of a given EntityKey.""" @@ -123,18 +127,29 @@ def subset_value(self) -> PartitionsSubset: @property def size(self) -> int: + if not self.is_partitioned: return int(self.bool_value) - else: + + try: return len(self.subset_value) - + except DagsterInvalidInvocationError: + # If a dynamic partition key was deleted out from under this historical + # subset evaluation, we gracefully fall back to a size of 0. + return 0 + @property def is_empty(self) -> bool: - if self.is_partitioned: - return self.subset_value.is_empty - else: + + if not self.is_partitioned: return not self.bool_value + try: + return self.subset_value.is_empty + except DagsterInvalidInvocationError: + # If partition keys no longer exist, treat the historical subset as empty + return True + def is_compatible_with_partitions_def( self, partitions_def: PartitionsDefinition | None ) -> bool: @@ -188,11 +203,16 @@ def compute_union(self, other: Self) -> Self: def compute_intersection(self, other: Self) -> Self: return self._oper(other, operator.and_) - def __contains__(self, item: AssetKeyPartitionKey) -> bool: + def __contains__(self, item: "AssetSubset") -> bool: if not self.is_partitioned: - return item.asset_key == self.key and item.partition_key is None and self.bool_value - else: - return item.asset_key == self.key and item.partition_key in self.subset_value - + return self.bool_value and item.bool_value + + try: + return item.partition_key in self.subset_value + except DagsterInvalidInvocationError: + # If a dynamic partition key was deleted out from under this historical + # subset evaluation, gracefully treat it as not contained. + return False + def __repr__(self) -> str: return f"{self.__class__.__name__}<{self.key}>({self.value})" diff --git a/python_modules/dagster/dagster_tests/core_tests/asset_graph_view_tests/test_serializable_entity_subset.py b/python_modules/dagster/dagster_tests/core_tests/asset_graph_view_tests/test_serializable_entity_subset.py index 084af34d76cd9..16d8aa9481654 100644 --- a/python_modules/dagster/dagster_tests/core_tests/asset_graph_view_tests/test_serializable_entity_subset.py +++ b/python_modules/dagster/dagster_tests/core_tests/asset_graph_view_tests/test_serializable_entity_subset.py @@ -1,8 +1,11 @@ +from unittest import mock + import dagster as dg import pytest from dagster._check import CheckError from dagster._core.asset_graph_view.serializable_entity_subset import SerializableEntitySubset from dagster._core.definitions.partitions.subset import DefaultPartitionsSubset +from dagster._core.errors import DagsterInvalidInvocationError def test_union(): @@ -354,3 +357,34 @@ def test_is_compatible_with_partitions_def_default_subset_time_window(): assert ( out_of_range_subset.is_compatible_with_partitions_def(time_window_partitions_def) is False ) + + +def test_deleted_partition_graceful_fallback(monkeypatch: pytest.MonkeyPatch) -> None: + # Creates a valid, empty subset using the framework helper + partitions_def = dg.StaticPartitionsDefinition([]) + broken_subset = partitions_def.empty_subset() + + # Safely mock the methods on the class level using the monkeypatch fixture + # automatically tears down and restores original functionality after the test ends + monkeypatch.setattr( + DefaultPartitionsSubset, + "__len__", + lambda self: (_ for _ in ()).throw(DagsterInvalidInvocationError("Nonexistent partition keys")), + ) + monkeypatch.setattr( + DefaultPartitionsSubset, + "is_empty", + property(lambda self: (_ for _ in ()).throw(DagsterInvalidInvocationError("Nonexistent partition keys"))), + ) + + # Instantiate wrapper class + entity_subset = SerializableEntitySubset(key=dg.AssetKey("test_asset"), value=broken_subset) + + # Assert that size and is_empty gracefully handle the error fallback + assert entity_subset.size == 0 + assert entity_subset.is_empty is True + + # Test that membership checking (__contains__) also handles the fallback safely + mock_item = mock.MagicMock() + mock_item.partition_key = "deleted_key" + assert (mock_item in entity_subset) is False \ No newline at end of file