diff --git a/python_modules/dagster-webserver/dagster_webserver/external_assets.py b/python_modules/dagster-webserver/dagster_webserver/external_assets.py index 14bfc7daea095..d5ac29e1496d1 100644 --- a/python_modules/dagster-webserver/dagster_webserver/external_assets.py +++ b/python_modules/dagster-webserver/dagster_webserver/external_assets.py @@ -90,6 +90,32 @@ async def handle_report_asset_materialization_request( return unauthorized tags = context.get_reporting_user_tags() + + user_tags = None + if ReportAssetMatParam.tags in json_body: + user_tags = json_body[ReportAssetMatParam.tags] + elif ReportAssetMatParam.tags in request.query_params: + try: + user_tags = json.loads(request.query_params[ReportAssetMatParam.tags]) + except Exception as exc: + return JSONResponse( + { + "error": f"Error parsing tags json: {exc}", + }, + status_code=400, + ) + if user_tags is not None: + if not isinstance(user_tags, dict): + return JSONResponse( + { + "error": "Expected tags to be a json object.", + }, + status_code=400, + ) + # merged before the dedicated data_version param so that the explicit + # param takes precedence over a conflicting tag + tags.update(user_tags) + data_version = _value_from_body_or_params(ReportAssetMatParam.data_version, request, json_body) if data_version is not None: tags[DATA_VERSION_TAG] = data_version @@ -297,6 +323,32 @@ async def handle_report_asset_observation_request( description = _value_from_body_or_params(ReportAssetObsParam.description, request, json_body) tags = context.get_reporting_user_tags() + + user_tags = None + if ReportAssetObsParam.tags in json_body: + user_tags = json_body[ReportAssetObsParam.tags] + elif ReportAssetObsParam.tags in request.query_params: + try: + user_tags = json.loads(request.query_params[ReportAssetObsParam.tags]) + except Exception as exc: + return JSONResponse( + { + "error": f"Error parsing tags json: {exc}", + }, + status_code=400, + ) + if user_tags is not None: + if not isinstance(user_tags, dict): + return JSONResponse( + { + "error": "Expected tags to be a json object.", + }, + status_code=400, + ) + # merged before the dedicated data_version param so that the explicit + # param takes precedence over a conflicting tag + tags.update(user_tags) + data_version = _value_from_body_or_params(ReportAssetObsParam.data_version, request, json_body) if data_version is not None: tags[DATA_VERSION_TAG] = data_version @@ -313,7 +365,7 @@ async def handle_report_asset_observation_request( except Exception as exc: return JSONResponse( { - "error": f"Error constructing AssetMaterialization: {exc}", + "error": f"Error constructing AssetObservation: {exc}", }, status_code=400, ) @@ -334,6 +386,7 @@ class ReportAssetMatParam: metadata = "metadata" description = "description" partition = "partition" + tags = "tags" class ReportAssetCheckEvalParam: @@ -359,3 +412,4 @@ class ReportAssetObsParam: metadata = "metadata" description = "description" partition = "partition" + tags = "tags" diff --git a/python_modules/dagster-webserver/dagster_webserver_tests/webserver/test_asset_events.py b/python_modules/dagster-webserver/dagster_webserver_tests/webserver/test_asset_events.py index 24ce9c4c0b21c..ebe23b504fa16 100644 --- a/python_modules/dagster-webserver/dagster_webserver_tests/webserver/test_asset_events.py +++ b/python_modules/dagster-webserver/dagster_webserver_tests/webserver/test_asset_events.py @@ -130,6 +130,70 @@ def test_report_asset_materialization_endpoint(instance: DagsterInstance, test_c in response.json()["error"] ) + # user-provided tags (json body) — e.g. data-version provenance tags set by + # external writers reporting materializations over REST + response = test_client.post( + f"/report_asset_materialization/{my_asset_key}", + json={ + "data_version": "v2", + "tags": { + "dagster/input_data_version/upstream/key": "12345", + "my_tag": "my_value", + }, + }, + ) + assert response.status_code == 200, response.json() + evt = instance.get_latest_materialization_event(AssetKey(my_asset_key)) + assert evt and evt.asset_materialization + tags = evt.asset_materialization.tags + assert tags + assert tags["dagster/input_data_version/upstream/key"] == "12345" + assert tags["my_tag"] == "my_value" + assert tags[DATA_VERSION_TAG] == "v2" + + # tags via query params (json encoded) + response = test_client.post( + f"/report_asset_materialization/{my_asset_key}", + params={"tags": json.dumps({"my_tag": "param_value"})}, + ) + assert response.status_code == 200, response.json() + evt = instance.get_latest_materialization_event(AssetKey(my_asset_key)) + assert evt and evt.asset_materialization + tags = evt.asset_materialization.tags + assert tags + assert tags["my_tag"] == "param_value" + + # the dedicated data_version param takes precedence over a conflicting tag + response = test_client.post( + f"/report_asset_materialization/{my_asset_key}", + json={ + "data_version": "param_wins", + "tags": {DATA_VERSION_TAG: "tag_loses"}, + }, + ) + assert response.status_code == 200, response.json() + evt = instance.get_latest_materialization_event(AssetKey(my_asset_key)) + assert evt and evt.asset_materialization + tags = evt.asset_materialization.tags + assert tags + assert tags[DATA_VERSION_TAG] == "param_wins" + + # bad tags: query param not json encoded + response = test_client.post( + f"/report_asset_materialization/{my_asset_key}", + params={"tags": "not json {"}, + ) + assert response.status_code == 400 + assert "Error parsing tags json" in response.json()["error"] + + # bad tags: not an object + response = test_client.post( + f"/report_asset_materialization/{my_asset_key}", + json={"tags": "im_just_a_string"}, + ) + assert response.status_code == 400 + assert "Expected tags to be a json object" in response.json()["error"] + def test_report_asset_materialization_apis_consistent( instance: DagsterInstance, test_client: TestClient @@ -141,6 +205,7 @@ def test_report_asset_materialization_apis_consistent( "data_version": "so_new", "partition": "2023-09-23", "description": "boo", + "tags": {"dagster/input_data_version/up/stream": "42", "my_tag": "my_value"}, } # sample has entry for all supported params (banking on usage of enum) @@ -171,6 +236,12 @@ def test_report_asset_materialization_apis_consistent( assert mat.partition == v elif k == "description": assert mat.description == v + elif k == "tags": + assert isinstance(v, dict) + tags = mat.tags + assert tags + for tag_key, tag_value in v.items(): + assert tags[tag_key] == tag_value else: assert False, ( "need to add validation that sample payload content was written successfully" @@ -181,7 +252,7 @@ def test_report_asset_materialization_apis_consistent( skip_set = {"self"} params = [p for p in sig.parameters if p not in skip_set] - KNOWN_DIFF = {"partition", "description"} + KNOWN_DIFF = {"partition", "description", "tags"} assert set(sample_payload.keys()).difference(set(params)) == KNOWN_DIFF @@ -322,6 +393,45 @@ def test_report_asset_obs_endpoint(instance: DagsterInstance, test_client: TestC obs = _assert_stored_obs(instance, my_asset_key) assert obs.data_version == "fresh" + # user-provided tags (json body); dedicated data_version param wins over a conflicting tag + response = test_client.post( + f"/report_asset_observation/{my_asset_key}", + json={ + "data_version": "param_wins", + "tags": { + "dagster/input_data_version/upstream/key": "12345", + "my_tag": "my_value", + DATA_VERSION_TAG: "tag_loses", + }, + }, + ) + assert response.status_code == 200, response.json() + obs = _assert_stored_obs(instance, my_asset_key) + tags = obs.tags + assert tags + assert tags["dagster/input_data_version/upstream/key"] == "12345" + assert tags["my_tag"] == "my_value" + assert tags[DATA_VERSION_TAG] == "param_wins" + + # tags via query params (json encoded) + response = test_client.post( + f"/report_asset_observation/{my_asset_key}", + params={"tags": json.dumps({"my_tag": "param_value"})}, + ) + assert response.status_code == 200, response.json() + obs = _assert_stored_obs(instance, my_asset_key) + tags = obs.tags + assert tags + assert tags["my_tag"] == "param_value" + + # bad tags: not an object + response = test_client.post( + f"/report_asset_observation/{my_asset_key}", + json={"tags": "im_just_a_string"}, + ) + assert response.status_code == 400 + assert "Expected tags to be a json object" in response.json()["error"] + def test_report_asset_observation_apis_consistent( instance: DagsterInstance, test_client: TestClient @@ -332,6 +442,7 @@ def test_report_asset_observation_apis_consistent( "data_version": "so_new", "partition": "2023-09-23", "description": "boo", + "tags": {"dagster/input_data_version/up/stream": "42", "my_tag": "my_value"}, } # sample has entry for all supported params (banking on usage of enum) @@ -359,6 +470,12 @@ def test_report_asset_observation_apis_consistent( assert obs.partition == v elif k == "description": assert obs.description == v + elif k == "tags": + assert isinstance(v, dict) + tags = obs.tags + assert tags + for tag_key, tag_value in v.items(): + assert tags[tag_key] == tag_value else: assert False, ( "need to add validation that sample payload content was written successfully"