/build/source/nativelink-util/src/telemetry.rs
Line  | Count  | Source  | 
1  |  | // Copyright 2025 The NativeLink Authors. All rights reserved.  | 
2  |  | //  | 
3  |  | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");  | 
4  |  | // you may not use this file except in compliance with the License.  | 
5  |  | // You may obtain a copy of the License at  | 
6  |  | //  | 
7  |  | //    See LICENSE file for details  | 
8  |  | //  | 
9  |  | // Unless required by applicable law or agreed to in writing, software  | 
10  |  | // distributed under the License is distributed on an "AS IS" BASIS,  | 
11  |  | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  | 
12  |  | // See the License for the specific language governing permissions and  | 
13  |  | // limitations under the License.  | 
14  |  |  | 
15  |  | use core::default::Default;  | 
16  |  | use std::env;  | 
17  |  | use std::sync::OnceLock;  | 
18  |  |  | 
19  |  | use base64::Engine;  | 
20  |  | use base64::prelude::BASE64_STANDARD_NO_PAD;  | 
21  |  | use hyper::http::Response;  | 
22  |  | use nativelink_error::{Code, ResultExt, make_err}; | 
23  |  | use nativelink_proto::build::bazel::remote::execution::v2::RequestMetadata;  | 
24  |  | use opentelemetry::propagation::TextMapCompositePropagator;  | 
25  |  | use opentelemetry::trace::{TraceContextExt, Tracer, TracerProvider}; | 
26  |  | use opentelemetry::{KeyValue, global}; | 
27  |  | use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;  | 
28  |  | use opentelemetry_http::HeaderExtractor;  | 
29  |  | use opentelemetry_otlp::{LogExporter, MetricExporter, Protocol, SpanExporter, WithExportConfig}; | 
30  |  | use opentelemetry_sdk::Resource;  | 
31  |  | use opentelemetry_sdk::logs::SdkLoggerProvider;  | 
32  |  | use opentelemetry_sdk::metrics::SdkMeterProvider;  | 
33  |  | use opentelemetry_sdk::propagation::{BaggagePropagator, TraceContextPropagator}; | 
34  |  | use opentelemetry_sdk::trace::SdkTracerProvider;  | 
35  |  | use opentelemetry_semantic_conventions::attribute::ENDUSER_ID;  | 
36  |  | use prost::Message;  | 
37  |  | use tracing::debug;  | 
38  |  | use tracing::metadata::LevelFilter;  | 
39  |  | use tracing_opentelemetry::{MetricsLayer, layer}; | 
40  |  | use tracing_subscriber::filter::Directive;  | 
41  |  | use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt;  | 
42  |  | use tracing_subscriber::util::SubscriberInitExt;  | 
43  |  | use tracing_subscriber::{EnvFilter, Layer, Registry, fmt, registry}; | 
44  |  | use uuid::Uuid;  | 
45  |  |  | 
46  |  | /// The OTLP "service.name" field for all nativelink services.  | 
47  |  | const NATIVELINK_SERVICE_NAME: &str = "nativelink";  | 
48  |  |  | 
49  |  | // An `EnvFilter` to filter out non-nativelink information.  | 
50  |  | //  | 
51  |  | // See: https://github.com/open-telemetry/opentelemetry-rust/issues/2877  | 
52  |  | //  | 
53  |  | // Note that `EnvFilter` doesn't implement `clone`, so create a new one for  | 
54  |  | // each telemetry kind.  | 
55  | 0  | fn otlp_filter() -> EnvFilter { | 
56  | 0  |     fn expect_parse(directive: &str) -> Directive { | 
57  | 0  |         directive  | 
58  | 0  |             .parse()  | 
59  | 0  |             .unwrap_or_else(|_| panic!("Static directive '{directive}' failed to parse")) | 
60  | 0  |     }  | 
61  |  |  | 
62  | 0  |     EnvFilter::builder()  | 
63  | 0  |         .with_default_directive(LevelFilter::INFO.into())  | 
64  | 0  |         .from_env_lossy()  | 
65  | 0  |         .add_directive(expect_parse("hyper=off")) | 
66  | 0  |         .add_directive(expect_parse("tonic=off")) | 
67  | 0  |         .add_directive(expect_parse("h2=off")) | 
68  | 0  |         .add_directive(expect_parse("reqwest=off")) | 
69  | 0  |         .add_directive(expect_parse("tower=off")) | 
70  | 0  | }  | 
71  |  |  | 
72  |  | // Create a tracing layer intended for stdout printing.  | 
73  |  | //  | 
74  |  | // The output of this layer is configurable via the `NL_LOG` environment  | 
75  |  | // variable.  | 
76  | 0  | fn tracing_stdout_layer() -> impl Layer<Registry> { | 
77  | 0  |     let nl_log_fmt = env::var("NL_LOG").unwrap_or_else(|_| "pretty".to_string()); | 
78  |  |  | 
79  | 0  |     let stdout_filter = otlp_filter();  | 
80  |  |  | 
81  | 0  |     match nl_log_fmt.as_str() { | 
82  | 0  |         "compact" => fmt::layer()  | 
83  | 0  |             .compact()  | 
84  | 0  |             .with_timer(fmt::time::time())  | 
85  | 0  |             .with_filter(stdout_filter)  | 
86  | 0  |             .boxed(),  | 
87  | 0  |         "json" => fmt::layer()  | 
88  | 0  |             .json()  | 
89  | 0  |             .with_timer(fmt::time::time())  | 
90  | 0  |             .with_filter(stdout_filter)  | 
91  | 0  |             .boxed(),  | 
92  | 0  |         _ => fmt::layer()  | 
93  | 0  |             .pretty()  | 
94  | 0  |             .with_timer(fmt::time::time())  | 
95  | 0  |             .with_filter(stdout_filter)  | 
96  | 0  |             .boxed(),  | 
97  |  |     }  | 
98  | 0  | }  | 
99  |  |  | 
100  |  | /// Initialize tracing with OpenTelemetry support.  | 
101  |  | ///  | 
102  |  | /// # Errors  | 
103  |  | ///  | 
104  |  | /// Returns `Err` if logging was already initialized or if the exporters can't  | 
105  |  | /// be initialized.  | 
106  | 0  | pub fn init_tracing() -> Result<(), nativelink_error::Error> { | 
107  |  |     static INITIALIZED: OnceLock<()> = OnceLock::new();  | 
108  |  |  | 
109  | 0  |     if INITIALIZED.get().is_some() {  Branch (109:8): [True: 0, False: 0]
   Branch (109:8): [Folded - Ignored]
  | 
110  | 0  |         return Err(make_err!(Code::Internal, "Logging already initialized"));  | 
111  | 0  |     }  | 
112  |  |  | 
113  |  |     // We currently use a UUIDv4 for "service.instance.id" as per:  | 
114  |  |     // https://opentelemetry.io/docs/specs/semconv/attributes-registry/service/  | 
115  |  |     // This might change as we get a better understanding of its usecases in the  | 
116  |  |     // context of broader observability infrastructure.  | 
117  | 0  |     let resource = Resource::builder()  | 
118  | 0  |         .with_service_name(NATIVELINK_SERVICE_NAME)  | 
119  | 0  |         .with_attribute(KeyValue::new(  | 
120  |  |             "service.instance.id",  | 
121  | 0  |             Uuid::new_v4().to_string(),  | 
122  |  |         ))  | 
123  | 0  |         .build();  | 
124  |  |  | 
125  | 0  |     let propagator = TextMapCompositePropagator::new(vec![  | 
126  | 0  |         Box::new(BaggagePropagator::new()),  | 
127  | 0  |         Box::new(TraceContextPropagator::new()),  | 
128  |  |     ]);  | 
129  | 0  |     global::set_text_map_propagator(propagator);  | 
130  |  |  | 
131  |  |     // Logs  | 
132  | 0  |     let otlp_log_layer = OpenTelemetryTracingBridge::new(  | 
133  | 0  |         &SdkLoggerProvider::builder()  | 
134  | 0  |             .with_resource(resource.clone())  | 
135  | 0  |             .with_batch_exporter(  | 
136  | 0  |                 LogExporter::builder()  | 
137  | 0  |                     .with_tonic()  | 
138  | 0  |                     .with_protocol(Protocol::Grpc)  | 
139  | 0  |                     .build()  | 
140  | 0  |                     .map_err(|e| make_err!(Code::Internal, "{e}")) | 
141  | 0  |                     .err_tip(|| "While creating OpenTelemetry OTLP Log exporter")?,  | 
142  |  |             )  | 
143  | 0  |             .build(),  | 
144  |  |     )  | 
145  | 0  |     .with_filter(otlp_filter());  | 
146  |  |  | 
147  |  |     // Traces  | 
148  | 0  |     let otlp_trace_layer = layer()  | 
149  | 0  |         .with_tracer(  | 
150  | 0  |             SdkTracerProvider::builder()  | 
151  | 0  |                 .with_resource(resource.clone())  | 
152  | 0  |                 .with_batch_exporter(  | 
153  | 0  |                     SpanExporter::builder()  | 
154  | 0  |                         .with_tonic()  | 
155  | 0  |                         .with_protocol(Protocol::Grpc)  | 
156  | 0  |                         .build()  | 
157  | 0  |                         .map_err(|e| make_err!(Code::Internal, "{e}")) | 
158  | 0  |                         .err_tip(|| "While creating OpenTelemetry OTLP Span exporter")?,  | 
159  |  |                 )  | 
160  | 0  |                 .build()  | 
161  | 0  |                 .tracer(NATIVELINK_SERVICE_NAME),  | 
162  |  |         )  | 
163  | 0  |         .with_filter(otlp_filter());  | 
164  |  |  | 
165  |  |     // Metrics  | 
166  | 0  |     let meter_provider = SdkMeterProvider::builder()  | 
167  | 0  |         .with_resource(resource)  | 
168  | 0  |         .with_periodic_exporter(  | 
169  | 0  |             MetricExporter::builder()  | 
170  | 0  |                 .with_tonic()  | 
171  | 0  |                 .with_protocol(Protocol::Grpc)  | 
172  | 0  |                 .build()  | 
173  | 0  |                 .map_err(|e| make_err!(Code::Internal, "{e}")) | 
174  | 0  |                 .err_tip(|| "While creating OpenTelemetry OTLP Metric exporter")?,  | 
175  |  |         )  | 
176  | 0  |         .build();  | 
177  |  |  | 
178  | 0  |     global::set_meter_provider(meter_provider.clone());  | 
179  |  |  | 
180  | 0  |     let otlp_metrics_layer = MetricsLayer::new(meter_provider).with_filter(otlp_filter());  | 
181  |  |  | 
182  | 0  |     registry()  | 
183  | 0  |         .with(tracing_stdout_layer())  | 
184  | 0  |         .with(otlp_log_layer)  | 
185  | 0  |         .with(otlp_trace_layer)  | 
186  | 0  |         .with(otlp_metrics_layer)  | 
187  | 0  |         .init();  | 
188  |  |  | 
189  | 0  |     INITIALIZED.set(()).unwrap_or(());  | 
190  |  |  | 
191  | 0  |     Ok(())  | 
192  | 0  | }  | 
193  |  |  | 
194  |  | /// Custom metadata key field for Bazel metadata.  | 
195  |  | const BAZEL_METADATA_KEY: &str = "bazel.metadata";  | 
196  |  |  | 
197  |  | /// This is the header that bazel sends when using the `--remote_header` flag.  | 
198  |  | /// TODO(palfrey): There are various other headers that bazel supports.  | 
199  |  | ///                    Optimize their usage.  | 
200  |  | const BAZEL_REQUESTMETADATA_HEADER: &str = "build.bazel.remote.execution.v2.requestmetadata-bin";  | 
201  |  |  | 
202  |  | use opentelemetry::baggage::BaggageExt;  | 
203  |  | use opentelemetry::context::FutureExt;  | 
204  |  |  | 
205  |  | #[derive(Debug, Clone)]  | 
206  |  | pub struct OtlpMiddleware<S> { | 
207  |  |     inner: S,  | 
208  |  |     identity_required: bool,  | 
209  |  | }  | 
210  |  |  | 
211  |  | impl<S> OtlpMiddleware<S> { | 
212  | 0  |     const fn new(inner: S, identity_required: bool) -> Self { | 
213  | 0  |         Self { | 
214  | 0  |             inner,  | 
215  | 0  |             identity_required,  | 
216  | 0  |         }  | 
217  | 0  |     }  | 
218  |  | }  | 
219  |  |  | 
220  |  | impl<S, ReqBody, ResBody> tower::Service<hyper::http::Request<ReqBody>> for OtlpMiddleware<S>  | 
221  |  | where  | 
222  |  |     S: tower::Service<hyper::http::Request<ReqBody>, Response = Response<ResBody>>  | 
223  |  |         + Clone  | 
224  |  |         + Send  | 
225  |  |         + 'static,  | 
226  |  |     S::Future: Send + 'static,  | 
227  |  |     ReqBody: core::fmt::Debug + Send + 'static,  | 
228  |  |     ResBody: From<String> + Send + 'static + Default,  | 
229  |  | { | 
230  |  |     type Response = S::Response;  | 
231  |  |     type Error = S::Error;  | 
232  |  |     type Future = futures::future::BoxFuture<'static, Result<Self::Response, Self::Error>>;  | 
233  |  |  | 
234  | 0  |     fn poll_ready(  | 
235  | 0  |         &mut self,  | 
236  | 0  |         cx: &mut core::task::Context<'_>,  | 
237  | 0  |     ) -> core::task::Poll<Result<(), Self::Error>> { | 
238  | 0  |         self.inner.poll_ready(cx)  | 
239  | 0  |     }  | 
240  |  |  | 
241  | 0  |     fn call(&mut self, req: hyper::http::Request<ReqBody>) -> Self::Future { | 
242  |  |         // We must take the current `inner` and not the clone.  | 
243  |  |         // See: https://docs.rs/tower/latest/tower/trait.Service.html#be-careful-when-cloning-inner-services  | 
244  | 0  |         let clone = self.inner.clone();  | 
245  | 0  |         let mut inner = core::mem::replace(&mut self.inner, clone);  | 
246  |  |  | 
247  | 0  |         let parent_cx = global::get_text_map_propagator(|propagator| { | 
248  | 0  |             propagator.extract(&HeaderExtractor(req.headers()))  | 
249  | 0  |         });  | 
250  |  |  | 
251  | 0  |         let identity = parent_cx  | 
252  | 0  |             .baggage()  | 
253  | 0  |             .get(ENDUSER_ID)  | 
254  | 0  |             .map(|value| value.as_str().to_string())  | 
255  | 0  |             .unwrap_or_default();  | 
256  |  |  | 
257  | 0  |         if identity.is_empty() && self.identity_required {  Branch (257:12): [Folded - Ignored]
  Branch (257:35): [Folded - Ignored]
   Branch (257:12): [Folded - Ignored]
  Branch (257:35): [Folded - Ignored]
  | 
258  | 0  |             return Box::pin(async move { | 
259  | 0  |                 Ok(tonic::Status::failed_precondition(  | 
260  | 0  |                     r"  | 
261  | 0  | 
  | 
262  | 0  | NativeLink instance configured to require this OpenTelemetry Baggage header:  | 
263  | 0  | 
  | 
264  | 0  |     `Baggage: enduser.id=YOUR_IDENTITY`  | 
265  | 0  | 
  | 
266  | 0  | ",  | 
267  | 0  |                 )  | 
268  | 0  |                 .into_http())  | 
269  | 0  |             });  | 
270  | 0  |         }  | 
271  |  |  | 
272  | 0  |         debug!("Baggage enduser.id: {identity}"); | 
273  |  |  | 
274  | 0  |         let tracer = global::tracer("origin_middleware"); | 
275  | 0  |         let span = tracer  | 
276  | 0  |             .span_builder("origin_request") | 
277  | 0  |             .with_kind(opentelemetry::trace::SpanKind::Server)  | 
278  | 0  |             .start_with_context(&tracer, &parent_cx);  | 
279  |  |  | 
280  | 0  |         let mut cx = parent_cx.with_span(span);  | 
281  |  |  | 
282  | 0  |         if let Some(bazel_header) = req.headers().get(BAZEL_REQUESTMETADATA_HEADER) {  Branch (282:16): [Folded - Ignored]
   Branch (282:16): [Folded - Ignored]
  | 
283  | 0  |             if let Ok(decoded) = BASE64_STANDARD_NO_PAD.decode(bazel_header.as_bytes()) {  Branch (283:20): [Folded - Ignored]
   Branch (283:20): [Folded - Ignored]
  | 
284  | 0  |                 if let Ok(metadata) = RequestMetadata::decode(decoded.as_slice()) {  Branch (284:24): [Folded - Ignored]
   Branch (284:24): [Folded - Ignored]
  | 
285  | 0  |                     let metadata_str = format!("{metadata:?}"); | 
286  | 0  |                     debug!("Baggage Bazel request metadata: {metadata_str}"); | 
287  | 0  |                     cx = cx.with_baggage(vec![  | 
288  | 0  |                         KeyValue::new(BAZEL_METADATA_KEY, metadata_str),  | 
289  | 0  |                         KeyValue::new(ENDUSER_ID, identity),  | 
290  |  |                     ]);  | 
291  | 0  |                 }  | 
292  | 0  |             }  | 
293  | 0  |         }  | 
294  |  |  | 
295  | 0  |         Box::pin(async move { inner.call(req).with_context(cx).await }) | 
296  | 0  |     }  | 
297  |  | }  | 
298  |  |  | 
299  |  | #[derive(Debug, Clone, Copy)]  | 
300  |  | pub struct OtlpLayer { | 
301  |  |     identity_required: bool,  | 
302  |  | }  | 
303  |  |  | 
304  |  | impl OtlpLayer { | 
305  | 0  |     pub const fn new(identity_required: bool) -> Self { | 
306  | 0  |         Self { identity_required } | 
307  | 0  |     }  | 
308  |  | }  | 
309  |  |  | 
310  |  | impl<S> tower::Layer<S> for OtlpLayer { | 
311  |  |     type Service = OtlpMiddleware<S>;  | 
312  |  |  | 
313  | 0  |     fn layer(&self, service: S) -> Self::Service { | 
314  | 0  |         OtlpMiddleware::new(service, self.identity_required)  | 
315  | 0  |     }  | 
316  |  | }  |