diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java index 33299fb944d..2f033394a53 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java @@ -191,8 +191,7 @@ public void setupOpenLineage(DDTraceId traceId) { openLineageSparkConf.set( "spark.openlineage.transport.transports.agent.endpoint", AGENT_OL_ENDPOINT); openLineageSparkConf.set("spark.openlineage.transport.transports.agent.compression", "gzip"); - openLineageSparkConf.set( - "spark.openlineage.run.tags", + String runTags = "_dd.trace_id:" + traceId.toString() + ";_dd.ol_intake.emit_spans:false;_dd.ol_service:" @@ -200,7 +199,17 @@ public void setupOpenLineage(DDTraceId traceId) { + ";_dd.ol_intake.process_tags:" + ProcessTags.getTagsForSerialization() + ";_dd.ol_app_id:" - + appId); + + appId; + // _dd.ol_env carries the run environment so the lineage-processor can use it + // as the Spark application's UGP namespace, letting the OpenLineage-created + // node and the tracer-only node (djm-span-processor) resolve to the same + // entity_id. Omitted when env is unset so the consumer falls back to the + // OpenLineage namespace. + String olEnv = Config.get().getEnv(); + if (!olEnv.isEmpty()) { + runTags += ";_dd.ol_env:" + olEnv; + } + openLineageSparkConf.set("spark.openlineage.run.tags", runTags); setupOpenLineageCircuitBreaker(); return; } diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy b/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy index a5ebb71915d..fa4ef5f699a 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy +++ b/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy @@ -642,6 +642,22 @@ abstract class AbstractSparkListenerTest extends InstrumentationSpecification { .contains("_dd.ol_service:databricks.job-cluster.some-run-name") } + def "test setupOpenLineage sets ol_env from dd.env"() { + setup: + injectSysConfig("dd.env", "my-env") + def listener = getTestDatadogSparkListener() + listener.openLineageSparkListener = Mock(SparkListenerInterface) + listener.openLineageSparkConf = new SparkConf() + listener.setupOpenLineage(Mock(DDTraceId)) + + expect: + assert listener + .openLineageSparkConf + .get("spark.openlineage.run.tags") + .split(";") + .contains("_dd.ol_env:my-env") + } + def "test setupOpenLineage fills ProcessTags"() { setup: def listener = getTestDatadogSparkListener()