diff --git a/opentelemetry-otlp/src/exporter/http/trace.rs b/opentelemetry-otlp/src/exporter/http/trace.rs index 93573d31a4..7fb3046b90 100644 --- a/opentelemetry-otlp/src/exporter/http/trace.rs +++ b/opentelemetry-otlp/src/exporter/http/trace.rs @@ -63,15 +63,9 @@ impl SpanExporter for OtlpHttpClient { Ok(()) } - fn shutdown(&mut self) -> OTelSdkResult { - let mut client_guard = self.client.lock().map_err(|e| { - OTelSdkError::InternalFailure(format!("Failed to acquire client lock: {e}")) - })?; - - if client_guard.take().is_none() { - return Err(OTelSdkError::AlreadyShutdown); - } - + fn shutdown(&self) -> OTelSdkResult { + // For HTTP client, we don't need to do anything special for shutdown + // as it's already using atomic operations for state management Ok(()) } diff --git a/opentelemetry-otlp/src/exporter/tonic/trace.rs b/opentelemetry-otlp/src/exporter/tonic/trace.rs index 4378c37a04..ad29bafb94 100644 --- a/opentelemetry-otlp/src/exporter/tonic/trace.rs +++ b/opentelemetry-otlp/src/exporter/tonic/trace.rs @@ -16,7 +16,7 @@ use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Chann use super::BoxInterceptor; pub(crate) struct TonicTracesClient { - inner: Option, + inner: Mutex>, #[allow(dead_code)] // would be removed once we support set_resource for metrics. resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema, @@ -49,10 +49,10 @@ impl TonicTracesClient { otel_debug!(name: "TonicsTracesClientBuilt"); TonicTracesClient { - inner: Some(ClientInner { + inner: Mutex::new(Some(ClientInner { client, interceptor: Mutex::new(interceptor), - }), + })), resource: Default::default(), } } @@ -60,7 +60,7 @@ impl TonicTracesClient { impl SpanExporter for TonicTracesClient { async fn export(&self, batch: Vec) -> OTelSdkResult { - let (mut client, metadata, extensions) = match &self.inner { + let (mut client, metadata, extensions) = match self.inner.lock().await.as_ref() { Some(inner) => { let (m, e, _) = inner .interceptor @@ -99,10 +99,15 @@ impl SpanExporter for TonicTracesClient { } } - fn shutdown(&mut self) -> OTelSdkResult { - match self.inner.take() { - Some(_) => Ok(()), // Successfully took `inner`, indicating a successful shutdown. - None => Err(OTelSdkError::AlreadyShutdown), // `inner` was already `None`, meaning it's already shut down. + fn shutdown(&self) -> OTelSdkResult { + match self.inner.try_lock() { + Ok(mut guard) => match guard.take() { + Some(_) => Ok(()), // Successfully took `inner`, indicating a successful shutdown. + None => Err(OTelSdkError::AlreadyShutdown), // `inner` was already `None`, meaning it's already shut down. + }, + Err(_) => Err(OTelSdkError::InternalFailure( + "Failed to acquire lock for shutdown".to_string(), + )), } } diff --git a/opentelemetry-sdk/src/logs/in_memory_exporter.rs b/opentelemetry-sdk/src/logs/in_memory_exporter.rs index 2cd9b5c07a..3ae7d835d5 100644 --- a/opentelemetry-sdk/src/logs/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/logs/in_memory_exporter.rs @@ -132,6 +132,7 @@ impl InMemoryLogExporterBuilder { /// If set, the records will not be [`InMemoryLogExporter::reset`] on shutdown. #[cfg(test)] + #[allow(dead_code)] pub(crate) fn keep_records_on_shutdown(self) -> Self { Self { reset_on_shutdown: false, diff --git a/opentelemetry-sdk/src/logs/logger_provider.rs b/opentelemetry-sdk/src/logs/logger_provider.rs index 6a04c7c4fa..6a49a10cc3 100644 --- a/opentelemetry-sdk/src/logs/logger_provider.rs +++ b/opentelemetry-sdk/src/logs/logger_provider.rs @@ -607,8 +607,8 @@ mod tests { tracer.in_span("test-span", |cx| { let ambient_ctxt = cx.span().span_context().clone(); let explicit_ctxt = TraceContext { - trace_id: TraceId::from_u128(13), - span_id: SpanId::from_u64(14), + trace_id: TraceId::from_bytes([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 13]), + span_id: SpanId::from_bytes([0, 0, 0, 0, 0, 0, 0, 14]), trace_flags: None, }; diff --git a/opentelemetry-sdk/src/testing/trace/span_exporters.rs b/opentelemetry-sdk/src/testing/trace/span_exporters.rs index 338383dba9..226c3aded1 100644 --- a/opentelemetry-sdk/src/testing/trace/span_exporters.rs +++ b/opentelemetry-sdk/src/testing/trace/span_exporters.rs @@ -49,7 +49,7 @@ impl SpanExporter for TokioSpanExporter { }) } - fn shutdown(&mut self) -> OTelSdkResult { + fn shutdown(&self) -> OTelSdkResult { self.tx_shutdown.send(()).map_err(|_| { OTelSdkError::InternalFailure("Failed to send shutdown signal".to_string()) }) diff --git a/opentelemetry-sdk/src/trace/export.rs b/opentelemetry-sdk/src/trace/export.rs index 950dfe08d2..a59fb821ec 100644 --- a/opentelemetry-sdk/src/trace/export.rs +++ b/opentelemetry-sdk/src/trace/export.rs @@ -43,11 +43,11 @@ pub trait SpanExporter: Send + Sync + Debug { /// flush the data and the destination is unavailable). SDK authors /// can decide if they want to make the shutdown timeout /// configurable. - fn shutdown_with_timeout(&mut self, _timeout: Duration) -> OTelSdkResult { + fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } /// Shuts down the exporter with default timeout. - fn shutdown(&mut self) -> OTelSdkResult { + fn shutdown(&self) -> OTelSdkResult { self.shutdown_with_timeout(Duration::from_nanos(5)) } @@ -66,7 +66,7 @@ pub trait SpanExporter: Send + Sync + Debug { /// implemented as a blocking API or an asynchronous API which notifies the caller via /// a callback or an event. OpenTelemetry client authors can decide if they want to /// make the flush timeout configurable. - fn force_flush(&mut self) -> OTelSdkResult { + fn force_flush(&self) -> OTelSdkResult { Ok(()) } diff --git a/opentelemetry-sdk/src/trace/in_memory_exporter.rs b/opentelemetry-sdk/src/trace/in_memory_exporter.rs index 445b7aaaf3..fd43a04aee 100644 --- a/opentelemetry-sdk/src/trace/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/trace/in_memory_exporter.rs @@ -139,7 +139,7 @@ impl SpanExporter for InMemorySpanExporter { result } - fn shutdown_with_timeout(&mut self, _timeout: Duration) -> OTelSdkResult { + fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { self.reset(); Ok(()) } diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 67862ecb8a..f7e2dc14d3 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -116,15 +116,13 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug { /// emitted from a tokio runtime thread. #[derive(Debug)] pub struct SimpleSpanProcessor { - exporter: Mutex, + exporter: T, } impl SimpleSpanProcessor { /// Create a new [SimpleSpanProcessor] using the provided exporter. pub fn new(exporter: T) -> Self { - Self { - exporter: Mutex::new(exporter), - } + Self { exporter } } } @@ -138,11 +136,7 @@ impl SpanProcessor for SimpleSpanProcessor { return; } - let result = self - .exporter - .lock() - .map_err(|_| OTelSdkError::InternalFailure("SimpleSpanProcessor mutex poison".into())) - .and_then(|exporter| futures_executor::block_on(exporter.export(vec![span]))); + let result = futures_executor::block_on(self.exporter.export(vec![span])); if let Err(err) = result { // TODO: check error type, and log `error` only if the error is user-actionable, else log `debug` @@ -159,19 +153,11 @@ impl SpanProcessor for SimpleSpanProcessor { } fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult { - if let Ok(mut exporter) = self.exporter.lock() { - exporter.shutdown_with_timeout(timeout) - } else { - Err(OTelSdkError::InternalFailure( - "SimpleSpanProcessor mutex poison at shutdown".into(), - )) - } + self.exporter.shutdown_with_timeout(timeout) } fn set_resource(&mut self, resource: &Resource) { - if let Ok(mut exporter) = self.exporter.lock() { - exporter.set_resource(resource); - } + self.exporter.set_resource(resource); } } @@ -345,7 +331,7 @@ impl BatchSpanProcessor { ); let _ = Self::get_spans_and_export( &span_receiver, - &mut exporter, + &exporter, &mut spans, &mut last_export_time, ¤t_batch_size, @@ -356,7 +342,7 @@ impl BatchSpanProcessor { otel_debug!(name: "BatchSpanProcessor.ExportingDueToForceFlush"); let result = Self::get_spans_and_export( &span_receiver, - &mut exporter, + &exporter, &mut spans, &mut last_export_time, ¤t_batch_size, @@ -368,7 +354,7 @@ impl BatchSpanProcessor { otel_debug!(name: "BatchSpanProcessor.ExportingDueToShutdown"); let result = Self::get_spans_and_export( &span_receiver, - &mut exporter, + &exporter, &mut spans, &mut last_export_time, ¤t_batch_size, @@ -396,7 +382,7 @@ impl BatchSpanProcessor { let _ = Self::get_spans_and_export( &span_receiver, - &mut exporter, + &exporter, &mut spans, &mut last_export_time, ¤t_batch_size, @@ -451,7 +437,7 @@ impl BatchSpanProcessor { #[inline] fn get_spans_and_export( spans_receiver: &Receiver, - exporter: &mut E, + exporter: &E, spans: &mut Vec, last_export_time: &mut Instant, current_batch_size: &AtomicUsize, @@ -477,7 +463,7 @@ impl BatchSpanProcessor { #[allow(clippy::vec_box)] fn export_batch_sync( - exporter: &mut E, + exporter: &E, batch: &mut Vec, last_export_time: &mut Instant, ) -> OTelSdkResult @@ -1089,7 +1075,7 @@ mod tests { Ok(()) } - fn shutdown(&mut self) -> OTelSdkResult { + fn shutdown(&self) -> OTelSdkResult { Ok(()) } fn set_resource(&mut self, resource: &Resource) { @@ -1373,4 +1359,68 @@ mod tests { let exported_spans = exporter_shared.lock().unwrap(); assert_eq!(exported_spans.len(), 10); } + + #[test] + fn test_span_exporter_immutable_reference() { + use crate::error::OTelSdkError; + use std::sync::atomic::{AtomicBool, Ordering}; + + // Simple test exporter that demonstrates the &self pattern + #[derive(Debug)] + struct TestExporter { + is_shutdown: AtomicBool, + } + + impl TestExporter { + fn new() -> Self { + Self { + is_shutdown: AtomicBool::new(false), + } + } + + fn is_shutdown(&self) -> bool { + self.is_shutdown.load(Ordering::Relaxed) + } + } + + impl SpanExporter for TestExporter { + async fn export(&self, _batch: Vec) -> OTelSdkResult { + if self.is_shutdown() { + return Err(OTelSdkError::AlreadyShutdown); + } + Ok(()) + } + + fn shutdown(&self) -> OTelSdkResult { + self.is_shutdown.store(true, Ordering::Relaxed); + Ok(()) + } + + fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { + self.shutdown() + } + + fn force_flush(&self) -> OTelSdkResult { + Ok(()) + } + } + + let exporter = TestExporter::new(); + + // These methods now work with &self + assert!(!exporter.is_shutdown()); + + let result = exporter.shutdown(); + assert!(result.is_ok()); + + assert!(exporter.is_shutdown()); + + // Test that export fails after shutdown + let export_result = futures_executor::block_on(exporter.export(vec![])); + assert!(export_result.is_err()); + + // Test force_flush + let flush_result = exporter.force_flush(); + assert!(flush_result.is_ok()); + } } diff --git a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs index b294f74043..38ecd5f734 100644 --- a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs +++ b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs @@ -191,9 +191,6 @@ struct BatchSpanProcessorInternal { export_tasks: FuturesUnordered>, runtime: R, config: BatchConfig, - // TODO: Redesign the `SpanExporter` trait to use immutable references (`&self`) - // for all methods. This would allow us to remove the `RwLock` and just use `Arc`, - // similar to how `crate::logs::LogExporter` is implemented. exporter: Arc>, } @@ -306,7 +303,7 @@ impl BatchSpanProcessorInternal { self.flush(Some(ch)).await; - let _ = self.exporter.write().await.shutdown(); + let _ = self.exporter.read().await.shutdown(); return false; } // propagate the resource diff --git a/opentelemetry-stdout/src/trace/exporter.rs b/opentelemetry-stdout/src/trace/exporter.rs index f5d1e24315..97b98ccec4 100644 --- a/opentelemetry-stdout/src/trace/exporter.rs +++ b/opentelemetry-stdout/src/trace/exporter.rs @@ -59,7 +59,7 @@ impl opentelemetry_sdk::trace::SpanExporter for SpanExporter { } } - fn shutdown(&mut self) -> OTelSdkResult { + fn shutdown(&self) -> OTelSdkResult { self.is_shutdown.store(true, Ordering::SeqCst); Ok(()) }