Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from dagster._record import ImportFrom, record
from dagster_shared.record import replace

from dagster_dbt.core.dbt_env import _get_dbt_env_var
from dagster_dbt.dbt_project import DbtProject
from dagster_dbt.metadata_set import DbtMetadataSet
from dagster_dbt.utils import ASSET_RESOURCE_TYPES, dagster_name_fn, select_unique_ids
Expand Down Expand Up @@ -524,7 +525,7 @@ def get_updated_cli_invocation_params_for_context(
assets_def = None

selection_args: list[str] = []
indirect_selection = os.getenv(DBT_INDIRECT_SELECTION_ENV, None)
indirect_selection = _get_dbt_env_var(DBT_INDIRECT_SELECTION_ENV, None)
dbt_project = None
used_generated_selector = False
if context and assets_def is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
DbtCoreCliEventMessage,
DbtFusionCliEventMessage,
)
from dagster_dbt.core.dbt_env import _get_dbt_env_var
from dagster_dbt.core.dbt_event_iterator import DbtDagsterEventType, DbtEventIterator
from dagster_dbt.dagster_dbt_translator import DagsterDbtTranslator
from dagster_dbt.dbt_project import DbtProject
Expand All @@ -47,7 +48,7 @@


def _get_dbt_target_path() -> Path:
return Path(os.getenv("DBT_TARGET_PATH", "target"))
return Path(_get_dbt_env_var("DBT_TARGET_PATH", "target"))


class RelationKey(NamedTuple):
Expand Down
18 changes: 18 additions & 0 deletions python_modules/libraries/dagster-dbt/dagster_dbt/core/dbt_env.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import os


def _get_dbt_env_var(key: str, default: str | None = None) -> str | None:
"""Return a dbt global config environment variable with dbt's precedence.

The helper accepts the legacy `DBT_*` environment variable name and checks the corresponding
`DBT_ENGINE_*` name first before falling back to `DBT_*`. This mirrors dbt's behavior for global
config environment variables while preserving the call shape of `os.getenv`.

`DBT_CLOUD_*` variables are not global config variables, so they are read without applying
the engine prefix.
"""
if not key.startswith("DBT_") or key.startswith("DBT_CLOUD_"):
return os.getenv(key, default)

engine_value = os.getenv(f"DBT_ENGINE_{key.removeprefix('DBT_')}")
return engine_value if engine_value is not None else os.getenv(key, default)
40 changes: 22 additions & 18 deletions python_modules/libraries/dagster-dbt/dagster_dbt/core/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -651,16 +651,7 @@ def my_dbt_op(dbt: DbtCliResource):
if updated_params.dbt_project
else self.project_dir
)
env = {
# Allow IO streaming when running in Windows.
# Also, allow it to be overriden by the current environment.
"PYTHONLEGACYWINDOWSSTDIO": "1",
# Pass the current environment variables to the dbt CLI invocation.
**os.environ.copy(),
# An environment variable to indicate that the dbt CLI is being invoked from Dagster.
"DAGSTER_DBT_CLI": "true",
# Run dbt with unbuffered output.
"PYTHONUNBUFFERED": "1",
dbt_global_config_env = {
# Disable anonymous usage statistics for performance.
"DBT_SEND_ANONYMOUS_USAGE_STATS": "false",
# The DBT_LOG_FORMAT environment variable must be set to `json`. We use this
Expand All @@ -674,20 +665,33 @@ def my_dbt_op(dbt: DbtCliResource):
# The DBT_LOG_PATH environment variable is set to the same value as DBT_TARGET_PATH
# so that logs for each dbt invocation has separate log files.
"DBT_LOG_PATH": os.fspath(target_path),
# The DBT_PROFILES_DIR environment variable is set to the path containing the dbt
# profiles.yml file.
# See https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles#advanced-customizing-a-profile-directory
# for more information.
**({"DBT_PROFILES_DIR": self.profiles_dir} if self.profiles_dir else {}),
# The DBT_PROJECT_DIR environment variable is set to the path containing the dbt project
# See https://docs.getdbt.com/reference/dbt_project.yml for more information.
"DBT_PROJECT_DIR": str(project_dir),
**({"DBT_PROFILES_DIR": self.profiles_dir} if self.profiles_dir else {}),
}

# set dbt indirect selection if needed to execute specific dbt tests due to asset check
# selection
if indirect_selection:
env[DBT_INDIRECT_SELECTION_ENV] = indirect_selection
dbt_global_config_env[DBT_INDIRECT_SELECTION_ENV] = indirect_selection

env = {
# Allow IO streaming when running in Windows.
# Also, allow it to be overriden by the current environment.
"PYTHONLEGACYWINDOWSSTDIO": "1",
# Pass the current environment variables to the dbt CLI invocation.
**os.environ.copy(),
# An environment variable to indicate that the dbt CLI is being invoked from Dagster.
"DAGSTER_DBT_CLI": "true",
# Run dbt with unbuffered output.
"PYTHONUNBUFFERED": "1",
# Set both DBT_ and DBT_ENGINE_ prefixed variables to account for changes in
# dbt since version 1.11. Otherwise a set DBT_ENGINE_ variable takes precedence.
**dbt_global_config_env,
**{
f"DBT_ENGINE_{key.removeprefix('DBT_')}": value
for key, value in dbt_global_config_env.items()
},
}

# TODO: verify that args does not have any selection flags if the context and manifest
# are passed to this function.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@
from dagster._core.errors import DagsterExecutionInterruptedError
from dagster._core.execution.context.compute import AssetExecutionContext, OpExecutionContext
from dagster_dbt import dbt_assets
from dagster_dbt.asset_utils import build_dbt_asset_selection
from dagster_dbt.asset_utils import (
build_dbt_asset_selection,
get_updated_cli_invocation_params_for_context,
)
from dagster_dbt.compat import DBT_PYTHON_VERSION
from dagster_dbt.core.dbt_cli_invocation import PARTIAL_PARSE_FILE_NAME
from dagster_dbt.core.dbt_cli_invocation import PARTIAL_PARSE_FILE_NAME, _get_dbt_target_path
from dagster_dbt.core.dbt_env import _get_dbt_env_var
from dagster_dbt.core.resource import DbtCliResource
from dagster_dbt.dagster_dbt_translator import DagsterDbtTranslator, DagsterDbtTranslatorSettings
from dagster_dbt.dbt_project import DbtProject
Expand Down Expand Up @@ -334,6 +338,105 @@ def test_dbt_project_dir_conflicting_env_var(monkeypatch: pytest.MonkeyPatch) ->
)


def test_dbt_engine_env_vars_override_conflicting_legacy_env_vars(
monkeypatch: pytest.MonkeyPatch, mocker: MockerFixture
) -> None:
monkeypatch.setenv("DBT_TARGET_PATH", "legacy-target")
monkeypatch.setenv("DBT_ENGINE_TARGET_PATH", "engine-target")
monkeypatch.setenv("DBT_ENGINE_PROJECT_DIR", "wrong-project")
monkeypatch.setenv("DBT_ENGINE_PROFILES_DIR", "wrong-profiles")
monkeypatch.setenv("DBT_ENGINE_PROFILE", "wrong-profile")
monkeypatch.setenv("DBT_ENGINE_TARGET", "wrong-target")
monkeypatch.setenv("DBT_CLOUD_ACCOUNT_ID", "123")
monkeypatch.delenv("DBT_PROFILE", raising=False)
monkeypatch.delenv("DBT_TARGET", raising=False)

mocker.patch("dagster_dbt.core.resource.check_output", return_value=b"2.0.0")
run_mock = mocker.patch("dagster_dbt.core.resource.DbtCliInvocation.run")

DbtCliResource(
project_dir=os.fspath(test_jaffle_shop_path),
profiles_dir=os.fspath(test_jaffle_shop_path),
profile="jaffle_shop",
target="dev",
).cli(["parse"])

env = run_mock.call_args.kwargs["env"]
assert env["DBT_TARGET_PATH"].startswith("engine-target/")
assert env["DBT_ENGINE_TARGET_PATH"] == env["DBT_TARGET_PATH"]
assert env["DBT_PROJECT_DIR"] == os.fspath(test_jaffle_shop_path)
assert env["DBT_ENGINE_PROJECT_DIR"] == env["DBT_PROJECT_DIR"]
assert env["DBT_PROFILES_DIR"] == os.fspath(test_jaffle_shop_path)
assert env["DBT_ENGINE_PROFILES_DIR"] == env["DBT_PROFILES_DIR"]
assert "DBT_PROFILE" not in env
assert env["DBT_ENGINE_PROFILE"] == "wrong-profile"
assert "DBT_TARGET" not in env
assert env["DBT_ENGINE_TARGET"] == "wrong-target"
assert env["DBT_CLOUD_ACCOUNT_ID"] == "123"
assert "DBT_ENGINE_CLOUD_ACCOUNT_ID" not in env
assert run_mock.call_args.kwargs["args"] == [
"dbt",
"parse",
"--profile",
"jaffle_shop",
"--target",
"dev",
]


def test_dbt_engine_target_path_env_var_takes_precedence(
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setenv("DBT_TARGET_PATH", "legacy-target")
monkeypatch.setenv("DBT_ENGINE_TARGET_PATH", "engine-target")

assert _get_dbt_env_var("DBT_TARGET_PATH", "default-target") == "engine-target"
assert _get_dbt_target_path() == Path("engine-target")


def test_dbt_engine_env_var_empty_string_takes_precedence(
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setenv("DBT_TARGET_PATH", "legacy-target")
monkeypatch.setenv("DBT_ENGINE_TARGET_PATH", "")

assert _get_dbt_env_var("DBT_TARGET_PATH", "default-target") == ""


def test_dbt_env_var_uses_legacy_value_and_default(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.delenv("DBT_TARGET_PATH", raising=False)
monkeypatch.delenv("DBT_ENGINE_TARGET_PATH", raising=False)
assert _get_dbt_env_var("DBT_TARGET_PATH", "default-target") == "default-target"

monkeypatch.setenv("DBT_TARGET_PATH", "legacy-target")
assert _get_dbt_env_var("DBT_TARGET_PATH", "default-target") == "legacy-target"


def test_dbt_env_var_does_not_transform_dbt_cloud_env_vars(
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setenv("DBT_CLOUD_ACCOUNT_ID", "123")
monkeypatch.setenv("DBT_ENGINE_CLOUD_ACCOUNT_ID", "456")

assert _get_dbt_env_var("DBT_CLOUD_ACCOUNT_ID") == "123"


def test_dbt_engine_indirect_selection_env_var_takes_precedence(
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setenv("DBT_INDIRECT_SELECTION", "eager")
monkeypatch.setenv("DBT_ENGINE_INDIRECT_SELECTION", "empty")

assert (
get_updated_cli_invocation_params_for_context(
context=None,
manifest={},
dagster_dbt_translator=DagsterDbtTranslator(),
).indirect_selection
== "empty"
)


def test_dbt_partial_parse(dbt: DbtCliResource) -> None:
test_jaffle_shop_path.joinpath("target", PARTIAL_PARSE_FILE_NAME).unlink(missing_ok=True)

Expand Down