From 7012fe79ec4eb5912afffec31aae6ef0560fc87d Mon Sep 17 00:00:00 2001 From: Ritwij Aryan Parmar Date: Wed, 3 Jun 2026 13:36:59 -0400 Subject: [PATCH 1/4] fix(tracing): honor OTEL resource env config --- src/common/tracing/src/config.rs | 125 +++++++++++++++++++++++++++++++ src/common/tracing/src/lib.rs | 11 +-- 2 files changed, 127 insertions(+), 9 deletions(-) diff --git a/src/common/tracing/src/config.rs b/src/common/tracing/src/config.rs index e0b568a7ee3..964bb5bb6cb 100644 --- a/src/common/tracing/src/config.rs +++ b/src/common/tracing/src/config.rs @@ -1,5 +1,6 @@ use std::time::Duration; +use opentelemetry::{Key, KeyValue}; // These match the OTEL_EXPORTER_OTLP_* environment variables from: // https://opentelemetry.io/docs/languages/sdk-configuration/otlp-exporter/#endpoint-configuration use opentelemetry_otlp::{ @@ -7,6 +8,7 @@ use opentelemetry_otlp::{ OTEL_EXPORTER_OTLP_METRICS_ENDPOINT, OTEL_EXPORTER_OTLP_PROTOCOL, OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, Protocol, }; +use opentelemetry_sdk::{Resource, resource::EnvResourceDetector}; /// Environment variable for the general OTLP endpoint. pub const ENV_OLD_OTLP_ENDPOINT: &str = "DAFT_DEV_OTEL_EXPORTER_OTLP_ENDPOINT"; @@ -107,4 +109,127 @@ impl Config { .map(Duration::from_millis) .unwrap_or(Duration::from_millis(500)) } + + pub fn resource(&self) -> Resource { + let env_resource = Resource::builder_empty() + .with_detector(Box::new(EnvResourceDetector::new())) + .build(); + let service_name = std::env::var("OTEL_SERVICE_NAME") + .ok() + .filter(|name| !name.is_empty()) + .or_else(|| { + env_resource + .get(&Key::new("service.name")) + .map(|value| value.to_string()) + }) + .unwrap_or_else(|| "daft".to_string()); + + Resource::builder_empty() + .with_attributes( + env_resource + .iter() + .map(|(key, value)| KeyValue::new(key.clone(), value.clone())), + ) + .with_service_name(service_name) + .build() + } +} + +#[cfg(test)] +mod tests { + use std::sync::Mutex; + + use opentelemetry::{Key, Value}; + + use super::*; + + static ENV_LOCK: Mutex<()> = Mutex::new(()); + + fn with_env_vars(vars: &[(&str, Option<&str>)], f: impl FnOnce() -> R) -> R { + let _guard = ENV_LOCK.lock().unwrap(); + let previous_values: Vec<_> = vars + .iter() + .map(|(name, _)| (*name, std::env::var(name).ok())) + .collect(); + + for (name, value) in vars { + match value { + Some(value) => unsafe { std::env::set_var(name, value) }, + None => unsafe { std::env::remove_var(name) }, + } + } + + let result = f(); + + for (name, value) in previous_values { + match value { + Some(value) => unsafe { std::env::set_var(name, value) }, + None => unsafe { std::env::remove_var(name) }, + } + } + + result + } + + #[test] + fn resource_defaults_to_daft_service_name() { + with_env_vars( + &[ + ("OTEL_SERVICE_NAME", None), + ("OTEL_RESOURCE_ATTRIBUTES", None), + ], + || { + let resource = Config::from_env().resource(); + assert_eq!( + resource.get(&Key::new("service.name")), + Some(Value::from("daft")) + ); + }, + ); + } + + #[test] + fn resource_uses_otel_service_name_env() { + with_env_vars( + &[ + ("OTEL_SERVICE_NAME", Some("daft-ray-worker")), + ("OTEL_RESOURCE_ATTRIBUTES", None), + ], + || { + let resource = Config::from_env().resource(); + assert_eq!( + resource.get(&Key::new("service.name")), + Some(Value::from("daft-ray-worker")) + ); + }, + ); + } + + #[test] + fn resource_includes_otel_resource_attributes() { + with_env_vars( + &[ + ("OTEL_SERVICE_NAME", None), + ( + "OTEL_RESOURCE_ATTRIBUTES", + Some("deployment.environment=staging,service.version=0.7.14"), + ), + ], + || { + let resource = Config::from_env().resource(); + assert_eq!( + resource.get(&Key::new("deployment.environment")), + Some(Value::from("staging")) + ); + assert_eq!( + resource.get(&Key::new("service.version")), + Some(Value::from("0.7.14")) + ); + assert_eq!( + resource.get(&Key::new("service.name")), + Some(Value::from("daft")) + ); + }, + ); + } } diff --git a/src/common/tracing/src/lib.rs b/src/common/tracing/src/lib.rs index a6117aadcce..adeee503ca0 100644 --- a/src/common/tracing/src/lib.rs +++ b/src/common/tracing/src/lib.rs @@ -1,9 +1,6 @@ mod config; -use std::{ - sync::{LazyLock, Mutex}, - time::Duration, -}; +use std::sync::{LazyLock, Mutex}; use common_runtime::get_io_runtime; pub use config::Config; @@ -53,7 +50,7 @@ impl TraceFormat { pub fn init_tracing() { let config = Config::from_env(); - let resource = Resource::builder().with_service_name("daft").build(); + let resource = config.resource(); let tracer_provider = if config.enabled() { let runtime = get_io_runtime(true); runtime.block_on_current_thread(async { @@ -158,7 +155,6 @@ fn init_otlp_logger_provider(config: &Config, otlp_endpoint: &str, resource: Res let log_exporter = opentelemetry_otlp::LogExporter::builder() .with_tonic() .with_endpoint(otlp_endpoint) - .with_timeout(Duration::from_secs(10)) .build() .expect("Failed to build OTLP logger exporter."); @@ -188,14 +184,12 @@ fn init_otlp_metrics_provider(config: &Config, endpoint: &str, resource: Resourc opentelemetry_otlp::MetricExporter::builder() .with_tonic() .with_endpoint(endpoint) - .with_timeout(Duration::from_secs(10)) .build() } Protocol::HttpBinary => { opentelemetry_otlp::MetricExporter::builder() .with_http() .with_endpoint(endpoint) - .with_timeout(Duration::from_secs(10)) .with_protocol(config.otlp_protocol) .build() } @@ -242,7 +236,6 @@ fn init_otlp_tracer_provider( let exporter = opentelemetry_otlp::SpanExporter::builder() .with_tonic() .with_endpoint(otlp_endpoint) - .with_timeout(Duration::from_secs(10)) .build() .expect("Failed to build OTLP span exporter for tracing"); From c452c4e24e8f3f9c2db4cf15cd53ae7fd9019e38 Mon Sep 17 00:00:00 2001 From: Ritwij Aryan Parmar Date: Tue, 16 Jun 2026 01:53:42 -0400 Subject: [PATCH 2/4] fix(tracing): preserve default resource detectors --- src/common/tracing/src/config.rs | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/src/common/tracing/src/config.rs b/src/common/tracing/src/config.rs index 964bb5bb6cb..ac163c30995 100644 --- a/src/common/tracing/src/config.rs +++ b/src/common/tracing/src/config.rs @@ -1,6 +1,6 @@ use std::time::Duration; -use opentelemetry::{Key, KeyValue}; +use opentelemetry::Key; // These match the OTEL_EXPORTER_OTLP_* environment variables from: // https://opentelemetry.io/docs/languages/sdk-configuration/otlp-exporter/#endpoint-configuration use opentelemetry_otlp::{ @@ -124,14 +124,7 @@ impl Config { }) .unwrap_or_else(|| "daft".to_string()); - Resource::builder_empty() - .with_attributes( - env_resource - .iter() - .map(|(key, value)| KeyValue::new(key.clone(), value.clone())), - ) - .with_service_name(service_name) - .build() + Resource::builder().with_service_name(service_name).build() } } From 0b4c16ba379303ccf9142791abd6f146d6d134c5 Mon Sep 17 00:00:00 2001 From: Ritwij Aryan Parmar Date: Fri, 19 Jun 2026 16:24:38 -0400 Subject: [PATCH 3/4] fix(tracing): preserve default resource detectors --- src/common/tracing/src/config.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/common/tracing/src/config.rs b/src/common/tracing/src/config.rs index ac163c30995..63e9f0518a7 100644 --- a/src/common/tracing/src/config.rs +++ b/src/common/tracing/src/config.rs @@ -8,7 +8,7 @@ use opentelemetry_otlp::{ OTEL_EXPORTER_OTLP_METRICS_ENDPOINT, OTEL_EXPORTER_OTLP_PROTOCOL, OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, Protocol, }; -use opentelemetry_sdk::{Resource, resource::EnvResourceDetector}; +use opentelemetry_sdk::Resource; /// Environment variable for the general OTLP endpoint. pub const ENV_OLD_OTLP_ENDPOINT: &str = "DAFT_DEV_OTEL_EXPORTER_OTLP_ENDPOINT"; @@ -111,16 +111,15 @@ impl Config { } pub fn resource(&self) -> Resource { - let env_resource = Resource::builder_empty() - .with_detector(Box::new(EnvResourceDetector::new())) - .build(); + let detected_resource = Resource::builder().build(); let service_name = std::env::var("OTEL_SERVICE_NAME") .ok() .filter(|name| !name.is_empty()) .or_else(|| { - env_resource + detected_resource .get(&Key::new("service.name")) .map(|value| value.to_string()) + .filter(|name| name != "unknown_service") }) .unwrap_or_else(|| "daft".to_string()); @@ -214,6 +213,10 @@ mod tests { resource.get(&Key::new("deployment.environment")), Some(Value::from("staging")) ); + assert_eq!( + resource.get(&Key::new("telemetry.sdk.language")), + Some(Value::from("rust")) + ); assert_eq!( resource.get(&Key::new("service.version")), Some(Value::from("0.7.14")) From 9cf86e0f3a254c6b9bacf75ca4a86a464c2bf76c Mon Sep 17 00:00:00 2001 From: Ritwij Aryan Parmar Date: Mon, 22 Jun 2026 18:29:29 -0400 Subject: [PATCH 4/4] fix(tracing): document service name fallback --- src/common/tracing/src/config.rs | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/src/common/tracing/src/config.rs b/src/common/tracing/src/config.rs index 63e9f0518a7..3056b78fbc4 100644 --- a/src/common/tracing/src/config.rs +++ b/src/common/tracing/src/config.rs @@ -112,15 +112,11 @@ impl Config { pub fn resource(&self) -> Resource { let detected_resource = Resource::builder().build(); - let service_name = std::env::var("OTEL_SERVICE_NAME") - .ok() - .filter(|name| !name.is_empty()) - .or_else(|| { - detected_resource - .get(&Key::new("service.name")) - .map(|value| value.to_string()) - .filter(|name| name != "unknown_service") - }) + let service_name = detected_resource + .get(&Key::new("service.name")) + .map(|value| value.to_string()) + // OpenTelemetry uses this placeholder when no service name is configured. + .filter(|name| name != "unknown_service") .unwrap_or_else(|| "daft".to_string()); Resource::builder().with_service_name(service_name).build()