diff --git a/examples/tracing-http-propagator/src/server.rs b/examples/tracing-http-propagator/src/server.rs index 8060c99841..9bb0a4315d 100644 --- a/examples/tracing-http-propagator/src/server.rs +++ b/examples/tracing-http-propagator/src/server.rs @@ -146,7 +146,12 @@ impl SpanProcessor for EnrichWithBaggageSpanProcessor { } } - fn on_end(&self, _span: opentelemetry_sdk::trace::SpanData) {} + fn on_end( + &self, + _span: &opentelemetry_sdk::trace::SpanData, + _instrumentation_scope: &opentelemetry::InstrumentationScope, + ) { + } } fn init_tracer() -> SdkTracerProvider { diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index e5e4ca3295..3a6a48eca2 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -5,8 +5,12 @@ - Added `Resource::get_ref(&self, key: &Key) -> Option<&Value>` to allow retrieving a reference to a resource value without cloning. - **Breaking** Removed the following public hidden methods from the `SdkTracer` [#3227][3227]: - `id_generator`, `should_sample` +- **Breaking** **Performance**: Changed `SpanProcessor::on_end()` signature to accept `&SpanData` and `&InstrumentationScope` by reference ([#3267][3267]). + This eliminates unnecessary `SpanData` clones when multiple `SpanProcessor`s are configured. Performance improvement scales with the number of processors. + Processors that need to store the data asynchronously should clone it explicitly. [3227]: https://github.com/open-telemetry/opentelemetry-rust/pull/3227 +[3267]: https://github.com/open-telemetry/opentelemetry-rust/pull/3267 ## 0.31.0 diff --git a/opentelemetry-sdk/benches/batch_span_processor.rs b/opentelemetry-sdk/benches/batch_span_processor.rs index 6e90b9a62b..78666c7d14 100644 --- a/opentelemetry-sdk/benches/batch_span_processor.rs +++ b/opentelemetry-sdk/benches/batch_span_processor.rs @@ -62,8 +62,11 @@ fn criterion_benchmark(c: &mut Criterion) { let span_processor = shared_span_processor.clone(); let spans = get_span_data(); handles.push(tokio::spawn(async move { - for span in spans { - span_processor.on_end(span); + for span in &spans { + span_processor.on_end( + span, + &opentelemetry::InstrumentationScope::default(), + ); tokio::task::yield_now().await; } })); diff --git a/opentelemetry-sdk/src/trace/mod.rs b/opentelemetry-sdk/src/trace/mod.rs index 585f65f27d..1a838093f8 100644 --- a/opentelemetry-sdk/src/trace/mod.rs +++ b/opentelemetry-sdk/src/trace/mod.rs @@ -137,7 +137,11 @@ mod tests { } } - fn on_end(&self, _span: SpanData) { + fn on_end( + &self, + _span: &SpanData, + _instrumentation_scope: &opentelemetry::InstrumentationScope, + ) { // TODO: Accessing Context::current() will panic today and hence commented out. // See https://github.com/open-telemetry/opentelemetry-rust/issues/2871 // let _c = Context::current(); diff --git a/opentelemetry-sdk/src/trace/provider.rs b/opentelemetry-sdk/src/trace/provider.rs index 2b05f89aea..d2bbd55be6 100644 --- a/opentelemetry-sdk/src/trace/provider.rs +++ b/opentelemetry-sdk/src/trace/provider.rs @@ -526,7 +526,11 @@ mod tests { .fetch_add(1, Ordering::SeqCst); } - fn on_end(&self, _span: SpanData) { + fn on_end( + &self, + _span: &SpanData, + _instrumentation_scope: &opentelemetry::InstrumentationScope, + ) { // ignore } @@ -789,7 +793,11 @@ mod tests { // No operation needed for this processor } - fn on_end(&self, _span: SpanData) { + fn on_end( + &self, + _span: &SpanData, + _instrumentation_scope: &opentelemetry::InstrumentationScope, + ) { // No operation needed for this processor } diff --git a/opentelemetry-sdk/src/trace/span.rs b/opentelemetry-sdk/src/trace/span.rs index 4753650d53..2fe37adb64 100644 --- a/opentelemetry-sdk/src/trace/span.rs +++ b/opentelemetry-sdk/src/trace/span.rs @@ -219,24 +219,13 @@ impl Span { data.end_time = opentelemetry::time::now(); } - match provider.span_processors() { - [] => {} - [processor] => { - processor.on_end(build_export_data( - data, - self.span_context.clone(), - &self.tracer, - )); - } - processors => { - for processor in processors { - processor.on_end(build_export_data( - data.clone(), - self.span_context.clone(), - &self.tracer, - )); - } - } + // Build export data once to avoid cloning for multiple processors + let export_data = build_export_data(data, self.span_context.clone(), &self.tracer); + + let instrumentation_scope = self.tracer.instrumentation_scope(); + + for processor in provider.span_processors() { + processor.on_end(&export_data, instrumentation_scope); } } } diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 0ebe80f885..8a385d8d8a 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -82,8 +82,14 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug { /// `on_end` is called after a `Span` is ended (i.e., the end timestamp is /// already set). This method is called synchronously within the `Span::end` /// API, therefore it should not block or throw an exception. - /// TODO - This method should take reference to `SpanData` - fn on_end(&self, span: SpanData); + /// + /// # Parameters + /// - `span`: A reference to `SpanData` representing the ended span. + /// - `instrumentation_scope`: The instrumentation scope associated with the span. + /// + /// If the processor needs to handle the export asynchronously, it should clone + /// the data to ensure it can be safely processed without lifetime issues. + fn on_end(&self, span: &SpanData, instrumentation_scope: &opentelemetry::InstrumentationScope); /// Force the spans lying in the cache to be exported. fn force_flush(&self) -> OTelSdkResult; /// Shuts down the processor. Called when SDK is shut down. This is an @@ -133,16 +139,22 @@ impl SpanProcessor for SimpleSpanProcessor { // Ignored } - fn on_end(&self, span: SpanData) { + fn on_end( + &self, + span: &SpanData, + _instrumentation_scope: &opentelemetry::InstrumentationScope, + ) { if !span.span_context.is_sampled() { return; } + let span_data = span.clone(); + let result = self .exporter .lock() .map_err(|_| OTelSdkError::InternalFailure("SimpleSpanProcessor mutex poison".into())) - .and_then(|exporter| futures_executor::block_on(exporter.export(vec![span]))); + .and_then(|exporter| futures_executor::block_on(exporter.export(vec![span_data]))); if let Err(err) = result { // TODO: check error type, and log `error` only if the error is user-actionable, else log `debug` @@ -526,8 +538,13 @@ impl SpanProcessor for BatchSpanProcessor { } /// Handles span end. - fn on_end(&self, span: SpanData) { - let result = self.span_sender.try_send(span); + fn on_end( + &self, + span: &SpanData, + _instrumentation_scope: &opentelemetry::InstrumentationScope, + ) { + let span_data = span.clone(); + let result = self.span_sender.try_send(span_data); // match for result and handle each separately match result { @@ -956,7 +973,7 @@ mod tests { let exporter = InMemorySpanExporterBuilder::new().build(); let processor = SimpleSpanProcessor::new(exporter.clone()); let span_data = new_test_export_span_data(); - processor.on_end(span_data.clone()); + processor.on_end(&span_data, &opentelemetry::InstrumentationScope::default()); assert_eq!(exporter.get_finished_spans().unwrap()[0], span_data); let _result = processor.shutdown(); } @@ -980,7 +997,7 @@ mod tests { status: Status::Unset, instrumentation_scope: Default::default(), }; - processor.on_end(unsampled); + processor.on_end(&unsampled, &opentelemetry::InstrumentationScope::default()); assert!(exporter.get_finished_spans().unwrap().is_empty()); } @@ -989,7 +1006,7 @@ mod tests { let exporter = InMemorySpanExporterBuilder::new().build(); let processor = SimpleSpanProcessor::new(exporter.clone()); let span_data = new_test_export_span_data(); - processor.on_end(span_data.clone()); + processor.on_end(&span_data, &opentelemetry::InstrumentationScope::default()); assert!(!exporter.get_finished_spans().unwrap().is_empty()); let _result = processor.shutdown(); // Assume shutdown is called by ensuring spans are empty in the exporter @@ -1192,7 +1209,7 @@ mod tests { let processor = BatchSpanProcessor::new(exporter, config); let test_span = create_test_span("test_span"); - processor.on_end(test_span.clone()); + processor.on_end(&test_span, &opentelemetry::InstrumentationScope::default()); // Wait for flush interval to ensure the span is processed std::thread::sleep(Duration::from_secs(6)); @@ -1215,7 +1232,7 @@ mod tests { // Create a test span and send it to the processor let test_span = create_test_span("force_flush_span"); - processor.on_end(test_span.clone()); + processor.on_end(&test_span, &opentelemetry::InstrumentationScope::default()); // Call force_flush to immediately export the spans let flush_result = processor.force_flush(); @@ -1241,12 +1258,15 @@ mod tests { let record = create_test_span("test_span"); - processor.on_end(record); + processor.on_end(&record, &opentelemetry::InstrumentationScope::default()); processor.force_flush().unwrap(); processor.shutdown().unwrap(); // todo: expect to see errors here. How should we assert this? - processor.on_end(create_test_span("after_shutdown_span")); + processor.on_end( + &create_test_span("after_shutdown_span"), + &opentelemetry::InstrumentationScope::default(), + ); assert_eq!(1, exporter.get_finished_spans().unwrap().len()); assert!(exporter.is_shutdown_called()); @@ -1268,9 +1288,9 @@ mod tests { let span2 = create_test_span("span2"); let span3 = create_test_span("span3"); // This span should be dropped - processor.on_end(span1.clone()); - processor.on_end(span2.clone()); - processor.on_end(span3.clone()); // This span exceeds the queue size + processor.on_end(&span1, &opentelemetry::InstrumentationScope::default()); + processor.on_end(&span2, &opentelemetry::InstrumentationScope::default()); + processor.on_end(&span3, &opentelemetry::InstrumentationScope::default()); // This span exceeds the queue size // Wait for the scheduled delay to expire std::thread::sleep(Duration::from_secs(6)); @@ -1314,7 +1334,7 @@ mod tests { KeyValue::new("key1", "value1"), KeyValue::new("key2", "value2"), ]; - processor.on_end(span_data.clone()); + processor.on_end(&span_data, &opentelemetry::InstrumentationScope::default()); // Force flush to export the span let _ = processor.force_flush(); @@ -1345,7 +1365,7 @@ mod tests { // Create a span and send it to the processor let test_span = create_test_span("resource_test"); - processor.on_end(test_span.clone()); + processor.on_end(&test_span, &opentelemetry::InstrumentationScope::default()); // Force flush to ensure the span is exported let _ = processor.force_flush(); @@ -1380,7 +1400,7 @@ mod tests { for _ in 0..4 { let span = new_test_export_span_data(); - processor.on_end(span); + processor.on_end(&span, &opentelemetry::InstrumentationScope::default()); } processor.force_flush().unwrap(); @@ -1403,7 +1423,7 @@ mod tests { for _ in 0..4 { let span = new_test_export_span_data(); - processor.on_end(span); + processor.on_end(&span, &opentelemetry::InstrumentationScope::default()); } processor.force_flush().unwrap(); @@ -1430,7 +1450,7 @@ mod tests { let processor_clone = Arc::clone(&processor); let handle = tokio::spawn(async move { let span = new_test_export_span_data(); - processor_clone.on_end(span); + processor_clone.on_end(&span, &opentelemetry::InstrumentationScope::default()); }); handles.push(handle); } diff --git a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs index b294f74043..98ac62a46d 100644 --- a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs +++ b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs @@ -105,12 +105,20 @@ impl SpanProcessor for BatchSpanProcessor { // Ignored } - fn on_end(&self, span: SpanData) { + fn on_end( + &self, + span: &SpanData, + _instrumentation_scope: &opentelemetry::InstrumentationScope, + ) { if !span.span_context.is_sampled() { return; } - let result = self.message_sender.try_send(BatchMessage::ExportSpan(span)); + // Clone the span data for async processing + let span_data = span.clone(); + let result = self + .message_sender + .try_send(BatchMessage::ExportSpan(span_data)); // If the queue is full, and we can't buffer a span if result.is_err() { @@ -577,7 +585,10 @@ mod tests { } }); tokio::time::sleep(Duration::from_secs(1)).await; // skip the first - processor.on_end(new_test_export_span_data()); + processor.on_end( + &new_test_export_span_data(), + &opentelemetry::InstrumentationScope::default(), + ); let flush_res = processor.force_flush(); assert!(flush_res.is_ok()); let _shutdown_result = processor.shutdown(); @@ -604,7 +615,10 @@ mod tests { }; let processor = BatchSpanProcessor::new(exporter, config, runtime::TokioCurrentThread); tokio::time::sleep(Duration::from_secs(1)).await; // skip the first - processor.on_end(new_test_export_span_data()); + processor.on_end( + &new_test_export_span_data(), + &opentelemetry::InstrumentationScope::default(), + ); let flush_res = processor.force_flush(); if time_out { assert!(flush_res.is_err()); @@ -653,9 +667,18 @@ mod tests { let processor = BatchSpanProcessor::new(exporter, config, runtime::Tokio); // Finish three spans in rapid succession. - processor.on_end(new_test_export_span_data()); - processor.on_end(new_test_export_span_data()); - processor.on_end(new_test_export_span_data()); + processor.on_end( + &new_test_export_span_data(), + &opentelemetry::InstrumentationScope::default(), + ); + processor.on_end( + &new_test_export_span_data(), + &opentelemetry::InstrumentationScope::default(), + ); + processor.on_end( + &new_test_export_span_data(), + &opentelemetry::InstrumentationScope::default(), + ); // Wait until everything has been exported. processor.force_flush().expect("force flush failed"); @@ -690,9 +713,18 @@ mod tests { let processor = BatchSpanProcessor::new(exporter, config, runtime::Tokio); // Finish several spans quickly. - processor.on_end(new_test_export_span_data()); - processor.on_end(new_test_export_span_data()); - processor.on_end(new_test_export_span_data()); + processor.on_end( + &new_test_export_span_data(), + &opentelemetry::InstrumentationScope::default(), + ); + processor.on_end( + &new_test_export_span_data(), + &opentelemetry::InstrumentationScope::default(), + ); + processor.on_end( + &new_test_export_span_data(), + &opentelemetry::InstrumentationScope::default(), + ); processor.force_flush().expect("force flush failed"); processor.shutdown().expect("shutdown failed"); diff --git a/stress/src/traces.rs b/stress/src/traces.rs index e0f15099e5..7426c73bda 100644 --- a/stress/src/traces.rs +++ b/stress/src/traces.rs @@ -37,7 +37,11 @@ impl SpanProcessor for NoOpSpanProcessor { // No-op } - fn on_end(&self, _span: SpanData) { + fn on_end( + &self, + _span: &SpanData, + _instrumentation_scope: &opentelemetry::InstrumentationScope, + ) { // No-op }