Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion python_modules/libraries/dagster-dbt/dagster_dbt/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,33 @@ def __getattr__(self, item):

child_map = manifest_json["child_map"]

graph = graph_selector.Graph(DiGraph(incoming_graph_data=child_map))
digraph = DiGraph(incoming_graph_data=child_map)

# dbt-fusion manifests (manifest schema v2+) omit nodes that have neither
# parents nor children from `child_map`, whereas dbt-core keys `child_map`
# by every node. As a result a node with no `source()`/`ref()` calls (and
# nothing referencing it) never lands in the selection graph, so
# `NodeSelector` silently drops it from the asset graph with no error.
# Add any selectable unique_ids that are missing from the graph so isolated
# nodes stay selectable. This is a no-op for well-formed dbt-core manifests.
# See https://github.com/dagster-io/dagster/issues/33801.
selectable_unique_ids = {
unique_id
for collection in (
"nodes",
"sources",
"exposures",
"metrics",
"semantic_models",
"saved_queries",
"unit_tests",
"functions",
)
for unique_id in manifest_json.get(collection, {})
}
digraph.add_nodes_from(selectable_unique_ids - set(digraph.nodes))

graph = graph_selector.Graph(digraph)

# create a parsed selection from the select string
_set_flag_attrs(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,3 +432,128 @@ def selected_dbt_assets(): ...
selector="fake_selector_does_not_exist",
)
def selected_dbt_assets(): ...


def _model_node(unique_id: str, name: str) -> dict[str, Any]:
return {
"unique_id": unique_id,
"resource_type": "model",
"name": name,
"package_name": "test",
"fqn": ["test", name],
"path": f"{name}.sql",
"original_file_path": f"models/{name}.sql",
"tags": [],
"config": {"enabled": True, "tags": [], "materialized": "table"},
"depends_on": {"nodes": [], "macros": []},
}


def _source_node(unique_id: str, source_name: str, name: str) -> dict[str, Any]:
return {
"unique_id": unique_id,
"resource_type": "source",
"source_name": source_name,
"name": name,
"package_name": "test",
"fqn": ["test", source_name, name],
"path": "sources.yml",
"original_file_path": "models/sources.yml",
"tags": [],
"config": {"enabled": True, "tags": []},
}


@pytest.mark.parametrize(
"select, expected_unique_ids",
[
pytest.param("isolated", {"model.test.isolated"}, id="isolated-model-alone"),
pytest.param(
"fqn:*",
{
"model.test.parent",
"model.test.child",
"model.test.isolated",
},
id="broader-selector",
),
],
)
def test_select_unique_ids_includes_isolated_fusion_models(
select: str, expected_unique_ids: set[str]
) -> None:
"""A dbt model with no ``source()``/``ref()`` calls (and nothing referencing it)
is omitted from ``child_map`` by dbt-fusion manifests, unlike dbt-core which keys
``child_map`` by every node. Selection must still surface such isolated nodes
rather than silently dropping them.

Regression test for https://github.com/dagster-io/dagster/issues/33801.
"""
from dagster_dbt.utils import _select_unique_ids_from_manifest

manifest_json: dict[str, Any] = {
"nodes": {
"model.test.parent": _model_node("model.test.parent", "parent"),
"model.test.child": _model_node("model.test.child", "child"),
"model.test.isolated": _model_node("model.test.isolated", "isolated"),
},
"sources": {},
"metrics": {},
"exposures": {},
# dbt-fusion-style child_map: the isolated model appears neither as a key
# nor as a value, because it has no parents and no children.
"child_map": {
"model.test.parent": ["model.test.child"],
"model.test.child": [],
},
"parent_map": {
"model.test.parent": [],
"model.test.child": ["model.test.parent"],
},
}

selected = _select_unique_ids_from_manifest(
select=select, exclude="", selector="", manifest_json=manifest_json
)

assert selected == expected_unique_ids


def test_select_unique_ids_includes_isolated_fusion_models_with_ref_source_graph() -> None:
from dagster_dbt.utils import _select_unique_ids_from_manifest

manifest_json: dict[str, Any] = {
"nodes": {
"model.test.uses_source": {
**_model_node("model.test.uses_source", "uses_source"),
"depends_on": {"nodes": ["source.test.raw.customers"], "macros": []},
},
"model.test.isolated": _model_node("model.test.isolated", "isolated"),
},
"sources": {
"source.test.raw.customers": _source_node(
"source.test.raw.customers", "raw", "customers"
),
},
"metrics": {},
"exposures": {},
# Mixed dbt-fusion-style graph: one model is connected to a source, while the
# isolated model is absent from child_map because it has no parents or children.
"child_map": {
"source.test.raw.customers": ["model.test.uses_source"],
"model.test.uses_source": [],
},
"parent_map": {
"source.test.raw.customers": [],
"model.test.uses_source": ["source.test.raw.customers"],
},
}

selected = _select_unique_ids_from_manifest(
select="fqn:*", exclude="", selector="", manifest_json=manifest_json
)

assert selected == {
"model.test.uses_source",
"model.test.isolated",
}