diff --git a/src/common/tracing/src/config.rs b/src/common/tracing/src/config.rs index e0b568a7ee3..3056b78fbc4 100644 --- a/src/common/tracing/src/config.rs +++ b/src/common/tracing/src/config.rs @@ -1,5 +1,6 @@ use std::time::Duration; +use opentelemetry::Key; // These match the OTEL_EXPORTER_OTLP_* environment variables from: // https://opentelemetry.io/docs/languages/sdk-configuration/otlp-exporter/#endpoint-configuration use opentelemetry_otlp::{ @@ -7,6 +8,7 @@ use opentelemetry_otlp::{ OTEL_EXPORTER_OTLP_METRICS_ENDPOINT, OTEL_EXPORTER_OTLP_PROTOCOL, OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, Protocol, }; +use opentelemetry_sdk::Resource; /// Environment variable for the general OTLP endpoint. pub const ENV_OLD_OTLP_ENDPOINT: &str = "DAFT_DEV_OTEL_EXPORTER_OTLP_ENDPOINT"; @@ -107,4 +109,119 @@ impl Config { .map(Duration::from_millis) .unwrap_or(Duration::from_millis(500)) } + + pub fn resource(&self) -> Resource { + let detected_resource = Resource::builder().build(); + let service_name = detected_resource + .get(&Key::new("service.name")) + .map(|value| value.to_string()) + // OpenTelemetry uses this placeholder when no service name is configured. + .filter(|name| name != "unknown_service") + .unwrap_or_else(|| "daft".to_string()); + + Resource::builder().with_service_name(service_name).build() + } +} + +#[cfg(test)] +mod tests { + use std::sync::Mutex; + + use opentelemetry::{Key, Value}; + + use super::*; + + static ENV_LOCK: Mutex<()> = Mutex::new(()); + + fn with_env_vars(vars: &[(&str, Option<&str>)], f: impl FnOnce() -> R) -> R { + let _guard = ENV_LOCK.lock().unwrap(); + let previous_values: Vec<_> = vars + .iter() + .map(|(name, _)| (*name, std::env::var(name).ok())) + .collect(); + + for (name, value) in vars { + match value { + Some(value) => unsafe { std::env::set_var(name, value) }, + None => unsafe { std::env::remove_var(name) }, + } + } + + let result = f(); + + for (name, value) in previous_values { + match value { + Some(value) => unsafe { std::env::set_var(name, value) }, + None => unsafe { std::env::remove_var(name) }, + } + } + + result + } + + #[test] + fn resource_defaults_to_daft_service_name() { + with_env_vars( + &[ + ("OTEL_SERVICE_NAME", None), + ("OTEL_RESOURCE_ATTRIBUTES", None), + ], + || { + let resource = Config::from_env().resource(); + assert_eq!( + resource.get(&Key::new("service.name")), + Some(Value::from("daft")) + ); + }, + ); + } + + #[test] + fn resource_uses_otel_service_name_env() { + with_env_vars( + &[ + ("OTEL_SERVICE_NAME", Some("daft-ray-worker")), + ("OTEL_RESOURCE_ATTRIBUTES", None), + ], + || { + let resource = Config::from_env().resource(); + assert_eq!( + resource.get(&Key::new("service.name")), + Some(Value::from("daft-ray-worker")) + ); + }, + ); + } + + #[test] + fn resource_includes_otel_resource_attributes() { + with_env_vars( + &[ + ("OTEL_SERVICE_NAME", None), + ( + "OTEL_RESOURCE_ATTRIBUTES", + Some("deployment.environment=staging,service.version=0.7.14"), + ), + ], + || { + let resource = Config::from_env().resource(); + assert_eq!( + resource.get(&Key::new("deployment.environment")), + Some(Value::from("staging")) + ); + assert_eq!( + resource.get(&Key::new("telemetry.sdk.language")), + Some(Value::from("rust")) + ); + assert_eq!( + resource.get(&Key::new("service.version")), + Some(Value::from("0.7.14")) + ); + assert_eq!( + resource.get(&Key::new("service.name")), + Some(Value::from("daft")) + ); + }, + ); + } } diff --git a/src/common/tracing/src/lib.rs b/src/common/tracing/src/lib.rs index a6117aadcce..adeee503ca0 100644 --- a/src/common/tracing/src/lib.rs +++ b/src/common/tracing/src/lib.rs @@ -1,9 +1,6 @@ mod config; -use std::{ - sync::{LazyLock, Mutex}, - time::Duration, -}; +use std::sync::{LazyLock, Mutex}; use common_runtime::get_io_runtime; pub use config::Config; @@ -53,7 +50,7 @@ impl TraceFormat { pub fn init_tracing() { let config = Config::from_env(); - let resource = Resource::builder().with_service_name("daft").build(); + let resource = config.resource(); let tracer_provider = if config.enabled() { let runtime = get_io_runtime(true); runtime.block_on_current_thread(async { @@ -158,7 +155,6 @@ fn init_otlp_logger_provider(config: &Config, otlp_endpoint: &str, resource: Res let log_exporter = opentelemetry_otlp::LogExporter::builder() .with_tonic() .with_endpoint(otlp_endpoint) - .with_timeout(Duration::from_secs(10)) .build() .expect("Failed to build OTLP logger exporter."); @@ -188,14 +184,12 @@ fn init_otlp_metrics_provider(config: &Config, endpoint: &str, resource: Resourc opentelemetry_otlp::MetricExporter::builder() .with_tonic() .with_endpoint(endpoint) - .with_timeout(Duration::from_secs(10)) .build() } Protocol::HttpBinary => { opentelemetry_otlp::MetricExporter::builder() .with_http() .with_endpoint(endpoint) - .with_timeout(Duration::from_secs(10)) .with_protocol(config.otlp_protocol) .build() } @@ -242,7 +236,6 @@ fn init_otlp_tracer_provider( let exporter = opentelemetry_otlp::SpanExporter::builder() .with_tonic() .with_endpoint(otlp_endpoint) - .with_timeout(Duration::from_secs(10)) .build() .expect("Failed to build OTLP span exporter for tracing");