/build/source/nativelink-util/src/telemetry.rs
Line | Count | Source |
1 | | // Copyright 2025 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use core::default::Default; |
16 | | use std::env; |
17 | | use std::sync::OnceLock; |
18 | | |
19 | | use base64::Engine; |
20 | | use base64::prelude::BASE64_STANDARD_NO_PAD; |
21 | | use hyper::http::Response; |
22 | | use nativelink_error::{Code, ResultExt, make_err}; |
23 | | use nativelink_proto::build::bazel::remote::execution::v2::RequestMetadata; |
24 | | use opentelemetry::propagation::TextMapCompositePropagator; |
25 | | use opentelemetry::trace::{TraceContextExt, Tracer, TracerProvider}; |
26 | | use opentelemetry::{KeyValue, global}; |
27 | | use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; |
28 | | use opentelemetry_http::HeaderExtractor; |
29 | | use opentelemetry_otlp::{LogExporter, MetricExporter, Protocol, SpanExporter, WithExportConfig}; |
30 | | use opentelemetry_sdk::Resource; |
31 | | use opentelemetry_sdk::logs::SdkLoggerProvider; |
32 | | use opentelemetry_sdk::metrics::SdkMeterProvider; |
33 | | use opentelemetry_sdk::propagation::{BaggagePropagator, TraceContextPropagator}; |
34 | | use opentelemetry_sdk::trace::SdkTracerProvider; |
35 | | use opentelemetry_semantic_conventions::attribute::ENDUSER_ID; |
36 | | use prost::Message; |
37 | | use tracing::debug; |
38 | | use tracing::metadata::LevelFilter; |
39 | | use tracing_opentelemetry::{MetricsLayer, layer}; |
40 | | use tracing_subscriber::filter::Directive; |
41 | | use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt; |
42 | | use tracing_subscriber::util::SubscriberInitExt; |
43 | | use tracing_subscriber::{EnvFilter, Layer, Registry, fmt, registry}; |
44 | | use uuid::Uuid; |
45 | | |
46 | | /// The OTLP "service.name" field for all nativelink services. |
47 | | const NATIVELINK_SERVICE_NAME: &str = "nativelink"; |
48 | | |
49 | | // An `EnvFilter` to filter out non-nativelink information. |
50 | | // |
51 | | // See: https://github.com/open-telemetry/opentelemetry-rust/issues/2877 |
52 | | // |
53 | | // Note that `EnvFilter` doesn't implement `clone`, so create a new one for |
54 | | // each telemetry kind. |
55 | 42 | fn otlp_filter() -> EnvFilter { |
56 | 210 | fn expect_parse(directive: &str) -> Directive { |
57 | 210 | directive |
58 | 210 | .parse() |
59 | 210 | .unwrap_or_else(|_| panic!("Static directive '{directive}' failed to parse"0 )) |
60 | 210 | } |
61 | | |
62 | 42 | EnvFilter::builder() |
63 | 42 | .with_default_directive(LevelFilter::INFO.into()) |
64 | 42 | .from_env_lossy() |
65 | 42 | .add_directive(expect_parse("hyper=off")) |
66 | 42 | .add_directive(expect_parse("tonic=off")) |
67 | 42 | .add_directive(expect_parse("h2=off")) |
68 | 42 | .add_directive(expect_parse("reqwest=off")) |
69 | 42 | .add_directive(expect_parse("tower=off")) |
70 | 42 | } |
71 | | |
72 | | // Create a tracing layer intended for stdout printing. |
73 | | // |
74 | | // The output of this layer is configurable via the `NL_LOG_FMT` environment |
75 | | // variable. |
76 | 42 | fn tracing_stdout_layer() -> impl Layer<Registry> { |
77 | 42 | let nl_log_fmt = env::var("NL_LOG").unwrap_or_else(|_| "pretty".to_string()); |
78 | | |
79 | 42 | let stdout_filter = otlp_filter(); |
80 | | |
81 | 42 | match nl_log_fmt.as_str() { |
82 | 42 | "compact" => fmt::layer0 () |
83 | 0 | .compact() |
84 | 0 | .with_timer(fmt::time::time()) |
85 | 0 | .with_filter(stdout_filter) |
86 | 0 | .boxed(), |
87 | 42 | "json" => fmt::layer0 () |
88 | 0 | .json() |
89 | 0 | .with_timer(fmt::time::time()) |
90 | 0 | .with_filter(stdout_filter) |
91 | 0 | .boxed(), |
92 | 42 | _ => fmt::layer() |
93 | 42 | .pretty() |
94 | 42 | .with_timer(fmt::time::time()) |
95 | 42 | .with_filter(stdout_filter) |
96 | 42 | .boxed(), |
97 | | } |
98 | 42 | } |
99 | | |
100 | | /// Initialize a minimal tracing configuration for tests. |
101 | | /// |
102 | | /// The OTLP logic in the main tracing loop causes issues with the tokio runtime |
103 | | /// in tests, so we use a more naive logger implementation here. This function |
104 | | /// is idempotent and can be called multiple times safely. |
105 | 343 | pub fn init_tracing_for_tests() { |
106 | | static INITIALIZED: OnceLock<()> = OnceLock::new(); |
107 | 343 | INITIALIZED.get_or_init(|| {42 |
108 | 42 | registry().with(tracing_stdout_layer()).init(); |
109 | 42 | }); |
110 | 343 | } |
111 | | |
112 | | /// Initialize tracing with OpenTelemetry support. |
113 | | /// |
114 | | /// # Errors |
115 | | /// |
116 | | /// Returns `Err` if logging was already initialized or if the exporters can't |
117 | | /// be initialized. |
118 | 0 | pub fn init_tracing() -> Result<(), nativelink_error::Error> { |
119 | | static INITIALIZED: OnceLock<()> = OnceLock::new(); |
120 | | |
121 | 0 | if INITIALIZED.get().is_some() { Branch (121:8): [True: 0, False: 0]
Branch (121:8): [Folded - Ignored]
|
122 | 0 | return Err(make_err!(Code::Internal, "Logging already initialized")); |
123 | 0 | } |
124 | | |
125 | | // We currently use a UUIDv4 for "service.instance.id" as per: |
126 | | // https://opentelemetry.io/docs/specs/semconv/attributes-registry/service/ |
127 | | // This might change as we get a better understanding of its usecases in the |
128 | | // context of broader observability infrastructure. |
129 | 0 | let resource = Resource::builder() |
130 | 0 | .with_service_name(NATIVELINK_SERVICE_NAME) |
131 | 0 | .with_attribute(KeyValue::new( |
132 | | "service.instance.id", |
133 | 0 | Uuid::new_v4().to_string(), |
134 | | )) |
135 | 0 | .build(); |
136 | | |
137 | 0 | let propagator = TextMapCompositePropagator::new(vec![ |
138 | 0 | Box::new(BaggagePropagator::new()), |
139 | 0 | Box::new(TraceContextPropagator::new()), |
140 | | ]); |
141 | 0 | global::set_text_map_propagator(propagator); |
142 | | |
143 | | // Logs |
144 | 0 | let otlp_log_layer = OpenTelemetryTracingBridge::new( |
145 | 0 | &SdkLoggerProvider::builder() |
146 | 0 | .with_resource(resource.clone()) |
147 | 0 | .with_batch_exporter( |
148 | 0 | LogExporter::builder() |
149 | 0 | .with_tonic() |
150 | 0 | .with_protocol(Protocol::Grpc) |
151 | 0 | .build() |
152 | 0 | .map_err(|e| make_err!(Code::Internal, "{e}")) |
153 | 0 | .err_tip(|| "While creating OpenTelemetry OTLP Log exporter")?, |
154 | | ) |
155 | 0 | .build(), |
156 | | ) |
157 | 0 | .with_filter(otlp_filter()); |
158 | | |
159 | | // Traces |
160 | 0 | let otlp_trace_layer = layer() |
161 | 0 | .with_tracer( |
162 | 0 | SdkTracerProvider::builder() |
163 | 0 | .with_resource(resource.clone()) |
164 | 0 | .with_batch_exporter( |
165 | 0 | SpanExporter::builder() |
166 | 0 | .with_tonic() |
167 | 0 | .with_protocol(Protocol::Grpc) |
168 | 0 | .build() |
169 | 0 | .map_err(|e| make_err!(Code::Internal, "{e}")) |
170 | 0 | .err_tip(|| "While creating OpenTelemetry OTLP Span exporter")?, |
171 | | ) |
172 | 0 | .build() |
173 | 0 | .tracer(NATIVELINK_SERVICE_NAME), |
174 | | ) |
175 | 0 | .with_filter(otlp_filter()); |
176 | | |
177 | | // Metrics |
178 | 0 | let meter_provider = SdkMeterProvider::builder() |
179 | 0 | .with_resource(resource) |
180 | 0 | .with_periodic_exporter( |
181 | 0 | MetricExporter::builder() |
182 | 0 | .with_tonic() |
183 | 0 | .with_protocol(Protocol::Grpc) |
184 | 0 | .build() |
185 | 0 | .map_err(|e| make_err!(Code::Internal, "{e}")) |
186 | 0 | .err_tip(|| "While creating OpenTelemetry OTLP Metric exporter")?, |
187 | | ) |
188 | 0 | .build(); |
189 | | |
190 | 0 | global::set_meter_provider(meter_provider.clone()); |
191 | | |
192 | 0 | let otlp_metrics_layer = MetricsLayer::new(meter_provider).with_filter(otlp_filter()); |
193 | | |
194 | 0 | registry() |
195 | 0 | .with(tracing_stdout_layer()) |
196 | 0 | .with(otlp_log_layer) |
197 | 0 | .with(otlp_trace_layer) |
198 | 0 | .with(otlp_metrics_layer) |
199 | 0 | .init(); |
200 | | |
201 | 0 | INITIALIZED.set(()).unwrap_or(()); |
202 | | |
203 | 0 | Ok(()) |
204 | 0 | } |
205 | | |
206 | | /// Custom metadata key field for Bazel metadata. |
207 | | const BAZEL_METADATA_KEY: &str = "bazel.metadata"; |
208 | | |
209 | | /// This is the header that bazel sends when using the `--remote_header` flag. |
210 | | /// TODO(aaronmondal): There are various other headers that bazel supports. |
211 | | /// Optimize their usage. |
212 | | const BAZEL_REQUESTMETADATA_HEADER: &str = "build.bazel.remote.execution.v2.requestmetadata-bin"; |
213 | | |
214 | | use opentelemetry::baggage::BaggageExt; |
215 | | use opentelemetry::context::FutureExt; |
216 | | |
217 | | #[derive(Debug, Clone)] |
218 | | pub struct OtlpMiddleware<S> { |
219 | | inner: S, |
220 | | identity_required: bool, |
221 | | } |
222 | | |
223 | | impl<S> OtlpMiddleware<S> { |
224 | 0 | const fn new(inner: S, identity_required: bool) -> Self { |
225 | 0 | Self { |
226 | 0 | inner, |
227 | 0 | identity_required, |
228 | 0 | } |
229 | 0 | } |
230 | | } |
231 | | |
232 | | impl<S, ReqBody, ResBody> tower::Service<hyper::http::Request<ReqBody>> for OtlpMiddleware<S> |
233 | | where |
234 | | S: tower::Service<hyper::http::Request<ReqBody>, Response = Response<ResBody>> |
235 | | + Clone |
236 | | + Send |
237 | | + 'static, |
238 | | S::Future: Send + 'static, |
239 | | ReqBody: core::fmt::Debug + Send + 'static, |
240 | | ResBody: From<String> + Send + 'static + Default, |
241 | | { |
242 | | type Response = S::Response; |
243 | | type Error = S::Error; |
244 | | type Future = futures::future::BoxFuture<'static, Result<Self::Response, Self::Error>>; |
245 | | |
246 | 0 | fn poll_ready( |
247 | 0 | &mut self, |
248 | 0 | cx: &mut core::task::Context<'_>, |
249 | 0 | ) -> core::task::Poll<Result<(), Self::Error>> { |
250 | 0 | self.inner.poll_ready(cx) |
251 | 0 | } |
252 | | |
253 | 0 | fn call(&mut self, req: hyper::http::Request<ReqBody>) -> Self::Future { |
254 | | // We must take the current `inner` and not the clone. |
255 | | // See: https://docs.rs/tower/latest/tower/trait.Service.html#be-careful-when-cloning-inner-services |
256 | 0 | let clone = self.inner.clone(); |
257 | 0 | let mut inner = core::mem::replace(&mut self.inner, clone); |
258 | | |
259 | 0 | let parent_cx = global::get_text_map_propagator(|propagator| { |
260 | 0 | propagator.extract(&HeaderExtractor(req.headers())) |
261 | 0 | }); |
262 | | |
263 | 0 | let identity = parent_cx |
264 | 0 | .baggage() |
265 | 0 | .get(ENDUSER_ID) |
266 | 0 | .map(|value| value.as_str().to_string()) |
267 | 0 | .unwrap_or_default(); |
268 | | |
269 | 0 | if identity.is_empty() && self.identity_required { Branch (269:12): [Folded - Ignored]
Branch (269:35): [Folded - Ignored]
Branch (269:12): [Folded - Ignored]
Branch (269:35): [Folded - Ignored]
|
270 | 0 | return Box::pin(async move { |
271 | 0 | Ok(tonic::Status::failed_precondition( |
272 | 0 | r" |
273 | 0 |
|
274 | 0 | NativeLink instance configured to require this OpenTelemetry Baggage header: |
275 | 0 |
|
276 | 0 | `Baggage: enduser.id=YOUR_IDENTITY` |
277 | 0 |
|
278 | 0 | ", |
279 | 0 | ) |
280 | 0 | .into_http()) |
281 | 0 | }); |
282 | 0 | } |
283 | | |
284 | 0 | debug!("Baggage enduser.id: {identity}"); |
285 | | |
286 | 0 | let tracer = global::tracer("origin_middleware"); |
287 | 0 | let span = tracer |
288 | 0 | .span_builder("origin_request") |
289 | 0 | .with_kind(opentelemetry::trace::SpanKind::Server) |
290 | 0 | .start_with_context(&tracer, &parent_cx); |
291 | | |
292 | 0 | let mut cx = parent_cx.with_span(span); |
293 | | |
294 | 0 | if let Some(bazel_header) = req.headers().get(BAZEL_REQUESTMETADATA_HEADER) { Branch (294:16): [Folded - Ignored]
Branch (294:16): [Folded - Ignored]
|
295 | 0 | if let Ok(decoded) = BASE64_STANDARD_NO_PAD.decode(bazel_header.as_bytes()) { Branch (295:20): [Folded - Ignored]
Branch (295:20): [Folded - Ignored]
|
296 | 0 | if let Ok(metadata) = RequestMetadata::decode(decoded.as_slice()) { Branch (296:24): [Folded - Ignored]
Branch (296:24): [Folded - Ignored]
|
297 | 0 | let metadata_str = format!("{metadata:?}"); |
298 | 0 | debug!("Baggage Bazel request metadata: {metadata_str}"); |
299 | 0 | cx = cx.with_baggage(vec![ |
300 | 0 | KeyValue::new(BAZEL_METADATA_KEY, metadata_str), |
301 | 0 | KeyValue::new(ENDUSER_ID, identity), |
302 | | ]); |
303 | 0 | } |
304 | 0 | } |
305 | 0 | } |
306 | | |
307 | 0 | Box::pin(async move { inner.call(req).with_context(cx).await }) |
308 | 0 | } |
309 | | } |
310 | | |
311 | | #[derive(Debug, Clone, Copy)] |
312 | | pub struct OtlpLayer { |
313 | | identity_required: bool, |
314 | | } |
315 | | |
316 | | impl OtlpLayer { |
317 | 0 | pub const fn new(identity_required: bool) -> Self { |
318 | 0 | Self { identity_required } |
319 | 0 | } |
320 | | } |
321 | | |
322 | | impl<S> tower::Layer<S> for OtlpLayer { |
323 | | type Service = OtlpMiddleware<S>; |
324 | | |
325 | 0 | fn layer(&self, service: S) -> Self::Service { |
326 | 0 | OtlpMiddleware::new(service, self.identity_required) |
327 | 0 | } |
328 | | } |