Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import operator
from collections.abc import Callable, Sequence
from dataclasses import dataclass, replace
from typing import Any, Generic, Optional, TypeAlias
from typing import TYPE_CHECKING, Any, Generic, Optional, TypeAlias

if TYPE_CHECKING:
from dagster._core.asset_graph_view.entity_subset import AssetSubset

from dagster_shared.serdes.serdes import DataclassSerializer, whitelist_for_serdes
from typing_extensions import Self

import dagster._check as check
from dagster._core.definitions.asset_key import T_EntityKey
from dagster._core.definitions.events import AssetKeyPartitionKey
from dagster._core.definitions.partitions.context import partition_loading_context
from dagster._core.definitions.partitions.definition import PartitionsDefinition
from dagster._core.definitions.partitions.snap.snap import PartitionsSnap
Expand All @@ -19,6 +21,7 @@
TimeWindowPartitionsSubset,
)
from dagster._core.definitions.partitions.subset.key_ranges import KeyRangesPartitionsSubset
from dagster._core.errors import DagsterInvalidInvocationError

EntitySubsetValue: TypeAlias = bool | PartitionsSubset

Expand All @@ -43,6 +46,7 @@ def before_pack(self, value: "SerializableEntitySubset") -> "SerializableEntityS
storage_field_names={"key": "asset_key"},
old_storage_names={"AssetSubset"},
)

@dataclass(frozen=True)
class SerializableEntitySubset(Generic[T_EntityKey]):
"""Represents a serializable subset of a given EntityKey."""
Expand Down Expand Up @@ -123,18 +127,29 @@ def subset_value(self) -> PartitionsSubset:

@property
def size(self) -> int:

if not self.is_partitioned:
return int(self.bool_value)
else:

try:
return len(self.subset_value)

except DagsterInvalidInvocationError:
# If a dynamic partition key was deleted out from under this historical
# subset evaluation, we gracefully fall back to a size of 0.
return 0

@property
def is_empty(self) -> bool:
if self.is_partitioned:
return self.subset_value.is_empty
else:

if not self.is_partitioned:
return not self.bool_value

try:
return self.subset_value.is_empty
except DagsterInvalidInvocationError:
# If partition keys no longer exist, treat the historical subset as empty
return True

def is_compatible_with_partitions_def(
self, partitions_def: PartitionsDefinition | None
) -> bool:
Expand Down Expand Up @@ -188,11 +203,16 @@ def compute_union(self, other: Self) -> Self:
def compute_intersection(self, other: Self) -> Self:
return self._oper(other, operator.and_)

def __contains__(self, item: AssetKeyPartitionKey) -> bool:
def __contains__(self, item: "AssetSubset") -> bool:
if not self.is_partitioned:
return item.asset_key == self.key and item.partition_key is None and self.bool_value
else:
return item.asset_key == self.key and item.partition_key in self.subset_value

return self.bool_value and item.bool_value

try:
return item.partition_key in self.subset_value
except DagsterInvalidInvocationError:
# If a dynamic partition key was deleted out from under this historical
# subset evaluation, gracefully treat it as not contained.
return False
Comment on lines +206 to +215

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 AttributeError on non-partitioned __contains__ with existing callers

The non-partitioned branch now evaluates item.bool_value, but the known caller in legacy_context.py (line 369: asset_partition in parent_subset.convert_to_serializable_subset()) passes an AssetKeyPartitionKey, which is a NamedTuple with only asset_key and partition_key fields — no bool_value. Any non-partitioned asset going through will_update_asset_partition will raise AttributeError instead of the previous graceful logic. The old implementation checked item.partition_key is None and self.bool_value, which correctly handled AssetKeyPartitionKey without accessing a non-existent attribute.


def __repr__(self) -> str:
return f"{self.__class__.__name__}<{self.key}>({self.value})"
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from unittest import mock

import dagster as dg
import pytest
from dagster._check import CheckError
from dagster._core.asset_graph_view.serializable_entity_subset import SerializableEntitySubset
from dagster._core.definitions.partitions.subset import DefaultPartitionsSubset
from dagster._core.errors import DagsterInvalidInvocationError


def test_union():
Expand Down Expand Up @@ -354,3 +357,34 @@ def test_is_compatible_with_partitions_def_default_subset_time_window():
assert (
out_of_range_subset.is_compatible_with_partitions_def(time_window_partitions_def) is False
)


def test_deleted_partition_graceful_fallback(monkeypatch: pytest.MonkeyPatch) -> None:
# Creates a valid, empty subset using the framework helper
partitions_def = dg.StaticPartitionsDefinition([])
broken_subset = partitions_def.empty_subset()

# Safely mock the methods on the class level using the monkeypatch fixture
# automatically tears down and restores original functionality after the test ends
monkeypatch.setattr(
DefaultPartitionsSubset,
"__len__",
lambda self: (_ for _ in ()).throw(DagsterInvalidInvocationError("Nonexistent partition keys")),
)
monkeypatch.setattr(
DefaultPartitionsSubset,
"is_empty",
property(lambda self: (_ for _ in ()).throw(DagsterInvalidInvocationError("Nonexistent partition keys"))),
)

# Instantiate wrapper class
entity_subset = SerializableEntitySubset(key=dg.AssetKey("test_asset"), value=broken_subset)

# Assert that size and is_empty gracefully handle the error fallback
assert entity_subset.size == 0
assert entity_subset.is_empty is True

# Test that membership checking (__contains__) also handles the fallback safely
mock_item = mock.MagicMock()
mock_item.partition_key = "deleted_key"
assert (mock_item in entity_subset) is False