Coverage Report

Created: 2026-06-04 10:48

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/telemetry.rs
Line
Count
Source
1
// Copyright 2025 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::default::Default;
16
use std::collections::HashMap;
17
use std::env;
18
use std::sync::{Arc, OnceLock};
19
20
use base64::Engine;
21
use base64::prelude::BASE64_STANDARD_NO_PAD;
22
use hyper::http::Response;
23
use nativelink_error::{Code, ResultExt, make_err};
24
use nativelink_proto::build::bazel::remote::execution::v2::RequestMetadata;
25
use opentelemetry::propagation::TextMapCompositePropagator;
26
use opentelemetry::trace::{TraceContextExt, Tracer, TracerProvider};
27
use opentelemetry::{KeyValue, global};
28
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
29
use opentelemetry_http::HeaderExtractor;
30
use opentelemetry_otlp::{LogExporter, MetricExporter, Protocol, SpanExporter, WithExportConfig};
31
use opentelemetry_sdk::Resource;
32
use opentelemetry_sdk::logs::SdkLoggerProvider;
33
use opentelemetry_sdk::metrics::SdkMeterProvider;
34
use opentelemetry_sdk::propagation::{BaggagePropagator, TraceContextPropagator};
35
use opentelemetry_sdk::trace::SdkTracerProvider;
36
use opentelemetry_semantic_conventions::attribute::ENDUSER_ID;
37
use prost::Message;
38
use tracing::debug;
39
use tracing::metadata::LevelFilter;
40
use tracing_opentelemetry::{MetricsLayer, layer};
41
use tracing_subscriber::filter::Directive;
42
use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt;
43
use tracing_subscriber::util::SubscriberInitExt;
44
use tracing_subscriber::{EnvFilter, Layer, Registry, fmt, registry};
45
use uuid::Uuid;
46
47
/// The OTLP "service.name" field for all nativelink services.
48
const NATIVELINK_SERVICE_NAME: &str = "nativelink";
49
50
// An `EnvFilter` to filter out non-nativelink information.
51
//
52
// See: https://github.com/open-telemetry/opentelemetry-rust/issues/2877
53
//
54
// Note that `EnvFilter` doesn't implement `clone`, so create a new one for
55
// each telemetry kind.
56
0
fn otlp_filter() -> EnvFilter {
57
0
    fn expect_parse(directive: &str) -> Directive {
58
0
        directive
59
0
            .parse()
60
0
            .unwrap_or_else(|_| panic!("Static directive '{directive}' failed to parse"))
61
0
    }
62
63
0
    EnvFilter::builder()
64
0
        .with_default_directive(LevelFilter::INFO.into())
65
0
        .from_env_lossy()
66
0
        .add_directive(expect_parse("hyper=off"))
67
0
        .add_directive(expect_parse("tonic=off"))
68
0
        .add_directive(expect_parse("h2=off"))
69
0
        .add_directive(expect_parse("reqwest=off"))
70
0
        .add_directive(expect_parse("tower=off"))
71
0
}
72
73
// Create a tracing layer intended for stdout printing.
74
//
75
// The output of this layer is configurable via the `NL_LOG` environment
76
// variable.
77
0
fn tracing_stdout_layer() -> impl Layer<Registry> {
78
0
    let nl_log_fmt = env::var("NL_LOG").unwrap_or_else(|_| "pretty".to_string());
79
80
0
    let stdout_filter = otlp_filter();
81
82
0
    match nl_log_fmt.as_str() {
83
0
        "compact" => fmt::layer()
84
0
            .compact()
85
0
            .with_timer(fmt::time::time())
86
0
            .with_filter(stdout_filter)
87
0
            .boxed(),
88
0
        "json" => fmt::layer()
89
0
            .json()
90
0
            .with_timer(fmt::time::time())
91
0
            .with_filter(stdout_filter)
92
0
            .boxed(),
93
0
        _ => fmt::layer()
94
0
            .pretty()
95
0
            .with_timer(fmt::time::time())
96
0
            .with_filter(stdout_filter)
97
0
            .boxed(),
98
    }
99
0
}
100
101
/// Initialize tracing with OpenTelemetry support.
102
///
103
/// # Errors
104
///
105
/// Returns `Err` if logging was already initialized or if the exporters can't
106
/// be initialized.
107
0
pub fn init_tracing() -> Result<(), nativelink_error::Error> {
108
    static INITIALIZED: OnceLock<()> = OnceLock::new();
109
110
0
    if INITIALIZED.get().is_some() {
111
0
        return Err(make_err!(Code::Internal, "Logging already initialized"));
112
0
    }
113
114
    // We currently use a UUIDv4 for "service.instance.id" as per:
115
    // https://opentelemetry.io/docs/specs/semconv/attributes-registry/service/
116
    // This might change as we get a better understanding of its usecases in the
117
    // context of broader observability infrastructure.
118
0
    let resource = Resource::builder()
119
0
        .with_service_name(NATIVELINK_SERVICE_NAME)
120
0
        .with_attribute(KeyValue::new(
121
            "service.instance.id",
122
0
            Uuid::new_v4().to_string(),
123
        ))
124
0
        .build();
125
126
0
    let propagator = TextMapCompositePropagator::new(vec![
127
0
        Box::new(BaggagePropagator::new()),
128
0
        Box::new(TraceContextPropagator::new()),
129
    ]);
130
0
    global::set_text_map_propagator(propagator);
131
132
    // Logs
133
0
    let otlp_log_layer = OpenTelemetryTracingBridge::new(
134
0
        &SdkLoggerProvider::builder()
135
0
            .with_resource(resource.clone())
136
0
            .with_batch_exporter(
137
0
                LogExporter::builder()
138
0
                    .with_tonic()
139
0
                    .with_protocol(Protocol::Grpc)
140
0
                    .build()
141
0
                    .map_err(|e| make_err!(Code::Internal, "{e}"))
142
0
                    .err_tip(|| "While creating OpenTelemetry OTLP Log exporter")?,
143
            )
144
0
            .build(),
145
    )
146
0
    .with_filter(otlp_filter());
147
148
    // Traces
149
0
    let otlp_trace_layer = layer()
150
0
        .with_tracer(
151
0
            SdkTracerProvider::builder()
152
0
                .with_resource(resource.clone())
153
0
                .with_batch_exporter(
154
0
                    SpanExporter::builder()
155
0
                        .with_tonic()
156
0
                        .with_protocol(Protocol::Grpc)
157
0
                        .build()
158
0
                        .map_err(|e| make_err!(Code::Internal, "{e}"))
159
0
                        .err_tip(|| "While creating OpenTelemetry OTLP Span exporter")?,
160
                )
161
0
                .build()
162
0
                .tracer(NATIVELINK_SERVICE_NAME),
163
        )
164
0
        .with_filter(otlp_filter());
165
166
    // Metrics
167
0
    let meter_provider = SdkMeterProvider::builder()
168
0
        .with_resource(resource)
169
0
        .with_periodic_exporter(
170
0
            MetricExporter::builder()
171
0
                .with_tonic()
172
0
                .with_protocol(Protocol::Grpc)
173
0
                .build()
174
0
                .map_err(|e| make_err!(Code::Internal, "{e}"))
175
0
                .err_tip(|| "While creating OpenTelemetry OTLP Metric exporter")?,
176
        )
177
0
        .build();
178
179
0
    global::set_meter_provider(meter_provider.clone());
180
181
0
    let otlp_metrics_layer = MetricsLayer::new(meter_provider).with_filter(otlp_filter());
182
183
0
    registry()
184
0
        .with(tracing_stdout_layer())
185
0
        .with(otlp_log_layer)
186
0
        .with(otlp_trace_layer)
187
0
        .with(otlp_metrics_layer)
188
0
        .init();
189
190
0
    INITIALIZED.set(()).unwrap_or(());
191
192
0
    Ok(())
193
0
}
194
195
/// Custom metadata key field for Bazel metadata.
196
const BAZEL_METADATA_KEY: &str = "bazel.metadata";
197
198
/// This is the header that bazel sends when using the `--remote_header` flag.
199
/// TODO(palfrey): There are various other headers that bazel supports.
200
///                    Optimize their usage.
201
const BAZEL_REQUESTMETADATA_HEADER: &str = "build.bazel.remote.execution.v2.requestmetadata-bin";
202
203
use opentelemetry::baggage::BaggageExt;
204
use opentelemetry::context::FutureExt;
205
206
/// ASCII headers from an inbound client request, stored in the task context
207
/// so that outgoing upstream calls can forward them (e.g. JWT auth tokens).
208
#[derive(Clone, Debug, Default)]
209
pub struct ClientHeaders(pub Arc<HashMap<String, String>>);
210
211
#[derive(Debug, Clone)]
212
pub struct OtlpMiddleware<S> {
213
    inner: S,
214
    identity_required: bool,
215
}
216
217
impl<S> OtlpMiddleware<S> {
218
3
    const fn new(inner: S, identity_required: bool) -> Self {
219
3
        Self {
220
3
            inner,
221
3
            identity_required,
222
3
        }
223
3
    }
224
}
225
226
impl<S, ReqBody, ResBody> tower::Service<hyper::http::Request<ReqBody>> for OtlpMiddleware<S>
227
where
228
    S: tower::Service<hyper::http::Request<ReqBody>, Response = Response<ResBody>>
229
        + Clone
230
        + Send
231
        + 'static,
232
    S::Future: Send + 'static,
233
    ReqBody: core::fmt::Debug + Send + 'static,
234
    ResBody: From<String> + Send + 'static + Default,
235
{
236
    type Response = S::Response;
237
    type Error = S::Error;
238
    type Future = futures::future::BoxFuture<'static, Result<Self::Response, Self::Error>>;
239
240
3
    fn poll_ready(
241
3
        &mut self,
242
3
        cx: &mut core::task::Context<'_>,
243
3
    ) -> core::task::Poll<Result<(), Self::Error>> {
244
3
        self.inner.poll_ready(cx)
245
3
    }
246
247
3
    fn call(&mut self, req: hyper::http::Request<ReqBody>) -> Self::Future {
248
        // We must take the current `inner` and not the clone.
249
        // See: https://docs.rs/tower/latest/tower/trait.Service.html#be-careful-when-cloning-inner-services
250
3
        let clone = self.inner.clone();
251
3
        let mut inner = core::mem::replace(&mut self.inner, clone);
252
253
        // Capture all ASCII-valued request headers before req is consumed, so
254
        // they can be forwarded to upstream services (e.g. JWT auth tokens).
255
3
        let client_headers = ClientHeaders(Arc::new(
256
3
            req.headers()
257
3
                .iter()
258
3
                .filter_map(|(name, value)| 
{2
259
2
                    value
260
2
                        .to_str()
261
2
                        .ok()
262
2
                        .map(|v| (name.as_str().to_lowercase(), v.to_string()))
263
2
                })
264
3
                .collect(),
265
        ));
266
267
3
        let parent_cx = global::get_text_map_propagator(|propagator| {
268
3
            propagator.extract(&HeaderExtractor(req.headers()))
269
3
        });
270
271
3
        let identity = parent_cx
272
3
            .baggage()
273
3
            .get(ENDUSER_ID)
274
3
            .map(|value| 
value.as_str()1
.
to_string1
())
275
3
            .unwrap_or_default();
276
277
3
        if identity.is_empty() {
278
2
            if self.identity_required {
279
0
                return Box::pin(async move {
280
0
                    Ok(tonic::Status::failed_precondition(
281
0
                        r"
282
0
283
0
NativeLink instance configured to require this OpenTelemetry Baggage header:
284
0
285
0
    `Baggage: enduser.id=YOUR_IDENTITY`
286
0
287
0
",
288
0
                    )
289
0
                    .into_http())
290
0
                });
291
2
            }
292
        } else {
293
1
            debug!("Baggage enduser.id: {identity}");
294
        }
295
296
3
        let tracer = global::tracer("origin_middleware");
297
3
        let span = tracer
298
3
            .span_builder("origin_request")
299
3
            .with_kind(opentelemetry::trace::SpanKind::Server)
300
3
            .start_with_context(&tracer, &parent_cx);
301
302
3
        let mut cx = parent_cx.with_span(span);
303
304
3
        if let Some(
bazel_header0
) = req.headers().get(BAZEL_REQUESTMETADATA_HEADER)
305
0
            && let Ok(decoded) = BASE64_STANDARD_NO_PAD.decode(bazel_header.as_bytes())
306
0
            && let Ok(metadata) = RequestMetadata::decode(decoded.as_slice())
307
        {
308
0
            let metadata_str = format!("{metadata:?}");
309
0
            debug!("Baggage Bazel request metadata: {metadata_str}");
310
0
            cx = cx.with_baggage(vec![
311
0
                KeyValue::new(BAZEL_METADATA_KEY, metadata_str),
312
0
                KeyValue::new(ENDUSER_ID, identity),
313
            ]);
314
3
        }
315
316
3
        let cx = cx.with_value(client_headers);
317
3
        Box::pin(async move { inner.call(req).with_context(cx).await })
318
3
    }
319
}
320
321
#[derive(Debug, Clone, Copy)]
322
pub struct OtlpLayer {
323
    identity_required: bool,
324
}
325
326
impl OtlpLayer {
327
0
    pub const fn new(identity_required: bool) -> Self {
328
0
        Self { identity_required }
329
0
    }
330
}
331
332
impl<S> tower::Layer<S> for OtlpLayer {
333
    type Service = OtlpMiddleware<S>;
334
335
3
    fn layer(&self, service: S) -> Self::Service {
336
3
        OtlpMiddleware::new(service, self.identity_required)
337
3
    }
338
}