Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,32 @@ async def handle_report_asset_materialization_request(
return unauthorized

tags = context.get_reporting_user_tags()

user_tags = None
if ReportAssetMatParam.tags in json_body:
user_tags = json_body[ReportAssetMatParam.tags]
elif ReportAssetMatParam.tags in request.query_params:
try:
user_tags = json.loads(request.query_params[ReportAssetMatParam.tags])
except Exception as exc:
return JSONResponse(
{
"error": f"Error parsing tags json: {exc}",
},
status_code=400,
)
if user_tags is not None:
if not isinstance(user_tags, dict):
return JSONResponse(
{
"error": "Expected tags to be a json object.",
},
status_code=400,
)
# merged before the dedicated data_version param so that the explicit
# param takes precedence over a conflicting tag
tags.update(user_tags)

data_version = _value_from_body_or_params(ReportAssetMatParam.data_version, request, json_body)
if data_version is not None:
tags[DATA_VERSION_TAG] = data_version
Expand Down Expand Up @@ -297,6 +323,32 @@ async def handle_report_asset_observation_request(
description = _value_from_body_or_params(ReportAssetObsParam.description, request, json_body)

tags = context.get_reporting_user_tags()

user_tags = None
if ReportAssetObsParam.tags in json_body:
user_tags = json_body[ReportAssetObsParam.tags]
elif ReportAssetObsParam.tags in request.query_params:
try:
user_tags = json.loads(request.query_params[ReportAssetObsParam.tags])
except Exception as exc:
return JSONResponse(
{
"error": f"Error parsing tags json: {exc}",
},
status_code=400,
)
if user_tags is not None:
if not isinstance(user_tags, dict):
return JSONResponse(
{
"error": "Expected tags to be a json object.",
},
status_code=400,
)
# merged before the dedicated data_version param so that the explicit
# param takes precedence over a conflicting tag
tags.update(user_tags)

data_version = _value_from_body_or_params(ReportAssetObsParam.data_version, request, json_body)
if data_version is not None:
tags[DATA_VERSION_TAG] = data_version
Expand All @@ -313,7 +365,7 @@ async def handle_report_asset_observation_request(
except Exception as exc:
return JSONResponse(
{
"error": f"Error constructing AssetMaterialization: {exc}",
"error": f"Error constructing AssetObservation: {exc}",
},
status_code=400,
)
Expand All @@ -334,6 +386,7 @@ class ReportAssetMatParam:
metadata = "metadata"
description = "description"
partition = "partition"
tags = "tags"


class ReportAssetCheckEvalParam:
Expand All @@ -359,3 +412,4 @@ class ReportAssetObsParam:
metadata = "metadata"
description = "description"
partition = "partition"
tags = "tags"
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,70 @@ def test_report_asset_materialization_endpoint(instance: DagsterInstance, test_c
in response.json()["error"]
)

# user-provided tags (json body) — e.g. data-version provenance tags set by
# external writers reporting materializations over REST
response = test_client.post(
f"/report_asset_materialization/{my_asset_key}",
json={
"data_version": "v2",
"tags": {
"dagster/input_data_version/upstream/key": "12345",
"my_tag": "my_value",
},
},
)
assert response.status_code == 200, response.json()
evt = instance.get_latest_materialization_event(AssetKey(my_asset_key))
assert evt and evt.asset_materialization
tags = evt.asset_materialization.tags
assert tags
assert tags["dagster/input_data_version/upstream/key"] == "12345"
assert tags["my_tag"] == "my_value"
assert tags[DATA_VERSION_TAG] == "v2"

# tags via query params (json encoded)
response = test_client.post(
f"/report_asset_materialization/{my_asset_key}",
params={"tags": json.dumps({"my_tag": "param_value"})},
)
assert response.status_code == 200, response.json()
evt = instance.get_latest_materialization_event(AssetKey(my_asset_key))
assert evt and evt.asset_materialization
tags = evt.asset_materialization.tags
assert tags
assert tags["my_tag"] == "param_value"

# the dedicated data_version param takes precedence over a conflicting tag
response = test_client.post(
f"/report_asset_materialization/{my_asset_key}",
json={
"data_version": "param_wins",
"tags": {DATA_VERSION_TAG: "tag_loses"},
},
)
assert response.status_code == 200, response.json()
evt = instance.get_latest_materialization_event(AssetKey(my_asset_key))
assert evt and evt.asset_materialization
tags = evt.asset_materialization.tags
assert tags
assert tags[DATA_VERSION_TAG] == "param_wins"

# bad tags: query param not json encoded
response = test_client.post(
f"/report_asset_materialization/{my_asset_key}",
params={"tags": "not json {"},
)
assert response.status_code == 400
assert "Error parsing tags json" in response.json()["error"]

# bad tags: not an object
response = test_client.post(
f"/report_asset_materialization/{my_asset_key}",
json={"tags": "im_just_a_string"},
)
assert response.status_code == 400
assert "Expected tags to be a json object" in response.json()["error"]


def test_report_asset_materialization_apis_consistent(
instance: DagsterInstance, test_client: TestClient
Expand All @@ -141,6 +205,7 @@ def test_report_asset_materialization_apis_consistent(
"data_version": "so_new",
"partition": "2023-09-23",
"description": "boo",
"tags": {"dagster/input_data_version/up/stream": "42", "my_tag": "my_value"},
}

# sample has entry for all supported params (banking on usage of enum)
Expand Down Expand Up @@ -171,6 +236,12 @@ def test_report_asset_materialization_apis_consistent(
assert mat.partition == v
elif k == "description":
assert mat.description == v
elif k == "tags":
assert isinstance(v, dict)
tags = mat.tags
assert tags
for tag_key, tag_value in v.items():
assert tags[tag_key] == tag_value
else:
assert False, (
"need to add validation that sample payload content was written successfully"
Expand All @@ -181,7 +252,7 @@ def test_report_asset_materialization_apis_consistent(
skip_set = {"self"}
params = [p for p in sig.parameters if p not in skip_set]

KNOWN_DIFF = {"partition", "description"}
KNOWN_DIFF = {"partition", "description", "tags"}

assert set(sample_payload.keys()).difference(set(params)) == KNOWN_DIFF

Expand Down Expand Up @@ -322,6 +393,45 @@ def test_report_asset_obs_endpoint(instance: DagsterInstance, test_client: TestC
obs = _assert_stored_obs(instance, my_asset_key)
assert obs.data_version == "fresh"

# user-provided tags (json body); dedicated data_version param wins over a conflicting tag
response = test_client.post(
f"/report_asset_observation/{my_asset_key}",
json={
"data_version": "param_wins",
"tags": {
"dagster/input_data_version/upstream/key": "12345",
"my_tag": "my_value",
DATA_VERSION_TAG: "tag_loses",
},
},
)
assert response.status_code == 200, response.json()
obs = _assert_stored_obs(instance, my_asset_key)
tags = obs.tags
assert tags
assert tags["dagster/input_data_version/upstream/key"] == "12345"
assert tags["my_tag"] == "my_value"
assert tags[DATA_VERSION_TAG] == "param_wins"

# tags via query params (json encoded)
response = test_client.post(
f"/report_asset_observation/{my_asset_key}",
params={"tags": json.dumps({"my_tag": "param_value"})},
)
assert response.status_code == 200, response.json()
obs = _assert_stored_obs(instance, my_asset_key)
tags = obs.tags
assert tags
assert tags["my_tag"] == "param_value"

# bad tags: not an object
response = test_client.post(
f"/report_asset_observation/{my_asset_key}",
json={"tags": "im_just_a_string"},
)
assert response.status_code == 400
assert "Expected tags to be a json object" in response.json()["error"]
Comment on lines 393 to +433

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Missing query-param tags coverage for observation

The materialization tests exercise both the JSON-body path and the JSON-encoded query-param path for tags, but the observation tests only cover the JSON-body path. The handler code at lines 330–339 of external_assets.py does include the elif ReportAssetObsParam.tags in request.query_params branch for observation — a quick params={"tags": json.dumps(...)} case analogous to the materialization test would confirm that path is wired correctly.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!



def test_report_asset_observation_apis_consistent(
instance: DagsterInstance, test_client: TestClient
Expand All @@ -332,6 +442,7 @@ def test_report_asset_observation_apis_consistent(
"data_version": "so_new",
"partition": "2023-09-23",
"description": "boo",
"tags": {"dagster/input_data_version/up/stream": "42", "my_tag": "my_value"},
}

# sample has entry for all supported params (banking on usage of enum)
Expand Down Expand Up @@ -359,6 +470,12 @@ def test_report_asset_observation_apis_consistent(
assert obs.partition == v
elif k == "description":
assert obs.description == v
elif k == "tags":
assert isinstance(v, dict)
tags = obs.tags
assert tags
for tag_key, tag_value in v.items():
assert tags[tag_key] == tag_value
else:
assert False, (
"need to add validation that sample payload content was written successfully"
Expand Down