diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py b/python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py index e21fecaef5cc0..33d1d84dcf7cf 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py @@ -38,6 +38,7 @@ from dagster._record import ImportFrom, record from dagster_shared.record import replace +from dagster_dbt.core.dbt_env import _get_dbt_env_var from dagster_dbt.dbt_project import DbtProject from dagster_dbt.metadata_set import DbtMetadataSet from dagster_dbt.utils import ASSET_RESOURCE_TYPES, dagster_name_fn, select_unique_ids @@ -524,7 +525,7 @@ def get_updated_cli_invocation_params_for_context( assets_def = None selection_args: list[str] = [] - indirect_selection = os.getenv(DBT_INDIRECT_SELECTION_ENV, None) + indirect_selection = _get_dbt_env_var(DBT_INDIRECT_SELECTION_ENV, None) dbt_project = None used_generated_selector = False if context and assets_def is not None: diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt/core/dbt_cli_invocation.py b/python_modules/libraries/dagster-dbt/dagster_dbt/core/dbt_cli_invocation.py index 54ab4e2362570..a8760aecbc118 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt/core/dbt_cli_invocation.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt/core/dbt_cli_invocation.py @@ -31,6 +31,7 @@ DbtCoreCliEventMessage, DbtFusionCliEventMessage, ) +from dagster_dbt.core.dbt_env import _get_dbt_env_var from dagster_dbt.core.dbt_event_iterator import DbtDagsterEventType, DbtEventIterator from dagster_dbt.dagster_dbt_translator import DagsterDbtTranslator from dagster_dbt.dbt_project import DbtProject @@ -47,7 +48,7 @@ def _get_dbt_target_path() -> Path: - return Path(os.getenv("DBT_TARGET_PATH", "target")) + return Path(_get_dbt_env_var("DBT_TARGET_PATH", "target")) class RelationKey(NamedTuple): diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt/core/dbt_env.py b/python_modules/libraries/dagster-dbt/dagster_dbt/core/dbt_env.py new file mode 100644 index 0000000000000..1e9580c16d255 --- /dev/null +++ b/python_modules/libraries/dagster-dbt/dagster_dbt/core/dbt_env.py @@ -0,0 +1,18 @@ +import os + + +def _get_dbt_env_var(key: str, default: str | None = None) -> str | None: + """Return a dbt global config environment variable with dbt's precedence. + + The helper accepts the legacy `DBT_*` environment variable name and checks the corresponding + `DBT_ENGINE_*` name first before falling back to `DBT_*`. This mirrors dbt's behavior for global + config environment variables while preserving the call shape of `os.getenv`. + + `DBT_CLOUD_*` variables are not global config variables, so they are read without applying + the engine prefix. + """ + if not key.startswith("DBT_") or key.startswith("DBT_CLOUD_"): + return os.getenv(key, default) + + engine_value = os.getenv(f"DBT_ENGINE_{key.removeprefix('DBT_')}") + return engine_value if engine_value is not None else os.getenv(key, default) diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt/core/resource.py b/python_modules/libraries/dagster-dbt/dagster_dbt/core/resource.py index f7ca297ccd5c8..0642204758f2f 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt/core/resource.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt/core/resource.py @@ -651,16 +651,7 @@ def my_dbt_op(dbt: DbtCliResource): if updated_params.dbt_project else self.project_dir ) - env = { - # Allow IO streaming when running in Windows. - # Also, allow it to be overriden by the current environment. - "PYTHONLEGACYWINDOWSSTDIO": "1", - # Pass the current environment variables to the dbt CLI invocation. - **os.environ.copy(), - # An environment variable to indicate that the dbt CLI is being invoked from Dagster. - "DAGSTER_DBT_CLI": "true", - # Run dbt with unbuffered output. - "PYTHONUNBUFFERED": "1", + dbt_global_config_env = { # Disable anonymous usage statistics for performance. "DBT_SEND_ANONYMOUS_USAGE_STATS": "false", # The DBT_LOG_FORMAT environment variable must be set to `json`. We use this @@ -674,20 +665,33 @@ def my_dbt_op(dbt: DbtCliResource): # The DBT_LOG_PATH environment variable is set to the same value as DBT_TARGET_PATH # so that logs for each dbt invocation has separate log files. "DBT_LOG_PATH": os.fspath(target_path), - # The DBT_PROFILES_DIR environment variable is set to the path containing the dbt - # profiles.yml file. - # See https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles#advanced-customizing-a-profile-directory - # for more information. - **({"DBT_PROFILES_DIR": self.profiles_dir} if self.profiles_dir else {}), # The DBT_PROJECT_DIR environment variable is set to the path containing the dbt project # See https://docs.getdbt.com/reference/dbt_project.yml for more information. "DBT_PROJECT_DIR": str(project_dir), + **({"DBT_PROFILES_DIR": self.profiles_dir} if self.profiles_dir else {}), } - # set dbt indirect selection if needed to execute specific dbt tests due to asset check - # selection if indirect_selection: - env[DBT_INDIRECT_SELECTION_ENV] = indirect_selection + dbt_global_config_env[DBT_INDIRECT_SELECTION_ENV] = indirect_selection + + env = { + # Allow IO streaming when running in Windows. + # Also, allow it to be overriden by the current environment. + "PYTHONLEGACYWINDOWSSTDIO": "1", + # Pass the current environment variables to the dbt CLI invocation. + **os.environ.copy(), + # An environment variable to indicate that the dbt CLI is being invoked from Dagster. + "DAGSTER_DBT_CLI": "true", + # Run dbt with unbuffered output. + "PYTHONUNBUFFERED": "1", + # Set both DBT_ and DBT_ENGINE_ prefixed variables to account for changes in + # dbt since version 1.11. Otherwise a set DBT_ENGINE_ variable takes precedence. + **dbt_global_config_env, + **{ + f"DBT_ENGINE_{key.removeprefix('DBT_')}": value + for key, value in dbt_global_config_env.items() + }, + } # TODO: verify that args does not have any selection flags if the context and manifest # are passed to this function. diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_resource.py b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_resource.py index c5d467a79e855..3b23d51fa9e5e 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_resource.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_resource.py @@ -18,9 +18,13 @@ from dagster._core.errors import DagsterExecutionInterruptedError from dagster._core.execution.context.compute import AssetExecutionContext, OpExecutionContext from dagster_dbt import dbt_assets -from dagster_dbt.asset_utils import build_dbt_asset_selection +from dagster_dbt.asset_utils import ( + build_dbt_asset_selection, + get_updated_cli_invocation_params_for_context, +) from dagster_dbt.compat import DBT_PYTHON_VERSION -from dagster_dbt.core.dbt_cli_invocation import PARTIAL_PARSE_FILE_NAME +from dagster_dbt.core.dbt_cli_invocation import PARTIAL_PARSE_FILE_NAME, _get_dbt_target_path +from dagster_dbt.core.dbt_env import _get_dbt_env_var from dagster_dbt.core.resource import DbtCliResource from dagster_dbt.dagster_dbt_translator import DagsterDbtTranslator, DagsterDbtTranslatorSettings from dagster_dbt.dbt_project import DbtProject @@ -334,6 +338,105 @@ def test_dbt_project_dir_conflicting_env_var(monkeypatch: pytest.MonkeyPatch) -> ) +def test_dbt_engine_env_vars_override_conflicting_legacy_env_vars( + monkeypatch: pytest.MonkeyPatch, mocker: MockerFixture +) -> None: + monkeypatch.setenv("DBT_TARGET_PATH", "legacy-target") + monkeypatch.setenv("DBT_ENGINE_TARGET_PATH", "engine-target") + monkeypatch.setenv("DBT_ENGINE_PROJECT_DIR", "wrong-project") + monkeypatch.setenv("DBT_ENGINE_PROFILES_DIR", "wrong-profiles") + monkeypatch.setenv("DBT_ENGINE_PROFILE", "wrong-profile") + monkeypatch.setenv("DBT_ENGINE_TARGET", "wrong-target") + monkeypatch.setenv("DBT_CLOUD_ACCOUNT_ID", "123") + monkeypatch.delenv("DBT_PROFILE", raising=False) + monkeypatch.delenv("DBT_TARGET", raising=False) + + mocker.patch("dagster_dbt.core.resource.check_output", return_value=b"2.0.0") + run_mock = mocker.patch("dagster_dbt.core.resource.DbtCliInvocation.run") + + DbtCliResource( + project_dir=os.fspath(test_jaffle_shop_path), + profiles_dir=os.fspath(test_jaffle_shop_path), + profile="jaffle_shop", + target="dev", + ).cli(["parse"]) + + env = run_mock.call_args.kwargs["env"] + assert env["DBT_TARGET_PATH"].startswith("engine-target/") + assert env["DBT_ENGINE_TARGET_PATH"] == env["DBT_TARGET_PATH"] + assert env["DBT_PROJECT_DIR"] == os.fspath(test_jaffle_shop_path) + assert env["DBT_ENGINE_PROJECT_DIR"] == env["DBT_PROJECT_DIR"] + assert env["DBT_PROFILES_DIR"] == os.fspath(test_jaffle_shop_path) + assert env["DBT_ENGINE_PROFILES_DIR"] == env["DBT_PROFILES_DIR"] + assert "DBT_PROFILE" not in env + assert env["DBT_ENGINE_PROFILE"] == "wrong-profile" + assert "DBT_TARGET" not in env + assert env["DBT_ENGINE_TARGET"] == "wrong-target" + assert env["DBT_CLOUD_ACCOUNT_ID"] == "123" + assert "DBT_ENGINE_CLOUD_ACCOUNT_ID" not in env + assert run_mock.call_args.kwargs["args"] == [ + "dbt", + "parse", + "--profile", + "jaffle_shop", + "--target", + "dev", + ] + + +def test_dbt_engine_target_path_env_var_takes_precedence( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("DBT_TARGET_PATH", "legacy-target") + monkeypatch.setenv("DBT_ENGINE_TARGET_PATH", "engine-target") + + assert _get_dbt_env_var("DBT_TARGET_PATH", "default-target") == "engine-target" + assert _get_dbt_target_path() == Path("engine-target") + + +def test_dbt_engine_env_var_empty_string_takes_precedence( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("DBT_TARGET_PATH", "legacy-target") + monkeypatch.setenv("DBT_ENGINE_TARGET_PATH", "") + + assert _get_dbt_env_var("DBT_TARGET_PATH", "default-target") == "" + + +def test_dbt_env_var_uses_legacy_value_and_default(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.delenv("DBT_TARGET_PATH", raising=False) + monkeypatch.delenv("DBT_ENGINE_TARGET_PATH", raising=False) + assert _get_dbt_env_var("DBT_TARGET_PATH", "default-target") == "default-target" + + monkeypatch.setenv("DBT_TARGET_PATH", "legacy-target") + assert _get_dbt_env_var("DBT_TARGET_PATH", "default-target") == "legacy-target" + + +def test_dbt_env_var_does_not_transform_dbt_cloud_env_vars( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("DBT_CLOUD_ACCOUNT_ID", "123") + monkeypatch.setenv("DBT_ENGINE_CLOUD_ACCOUNT_ID", "456") + + assert _get_dbt_env_var("DBT_CLOUD_ACCOUNT_ID") == "123" + + +def test_dbt_engine_indirect_selection_env_var_takes_precedence( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("DBT_INDIRECT_SELECTION", "eager") + monkeypatch.setenv("DBT_ENGINE_INDIRECT_SELECTION", "empty") + + assert ( + get_updated_cli_invocation_params_for_context( + context=None, + manifest={}, + dagster_dbt_translator=DagsterDbtTranslator(), + ).indirect_selection + == "empty" + ) + + def test_dbt_partial_parse(dbt: DbtCliResource) -> None: test_jaffle_shop_path.joinpath("target", PARTIAL_PARSE_FILE_NAME).unlink(missing_ok=True)