Coverage Report

Created: 2026-04-14 11:55

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/telemetry.rs
Line
Count
Source
1
// Copyright 2025 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::default::Default;
16
use std::env;
17
use std::sync::OnceLock;
18
19
use base64::Engine;
20
use base64::prelude::BASE64_STANDARD_NO_PAD;
21
use hyper::http::Response;
22
use nativelink_error::{Code, ResultExt, make_err};
23
use nativelink_proto::build::bazel::remote::execution::v2::RequestMetadata;
24
use opentelemetry::propagation::TextMapCompositePropagator;
25
use opentelemetry::trace::{TraceContextExt, Tracer, TracerProvider};
26
use opentelemetry::{KeyValue, global};
27
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
28
use opentelemetry_http::HeaderExtractor;
29
use opentelemetry_otlp::{LogExporter, MetricExporter, Protocol, SpanExporter, WithExportConfig};
30
use opentelemetry_sdk::Resource;
31
use opentelemetry_sdk::logs::SdkLoggerProvider;
32
use opentelemetry_sdk::metrics::SdkMeterProvider;
33
use opentelemetry_sdk::propagation::{BaggagePropagator, TraceContextPropagator};
34
use opentelemetry_sdk::trace::SdkTracerProvider;
35
use opentelemetry_semantic_conventions::attribute::ENDUSER_ID;
36
use prost::Message;
37
use tracing::debug;
38
use tracing::metadata::LevelFilter;
39
use tracing_opentelemetry::{MetricsLayer, layer};
40
use tracing_subscriber::filter::Directive;
41
use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt;
42
use tracing_subscriber::util::SubscriberInitExt;
43
use tracing_subscriber::{EnvFilter, Layer, Registry, fmt, registry};
44
use uuid::Uuid;
45
46
/// The OTLP "service.name" field for all nativelink services.
47
const NATIVELINK_SERVICE_NAME: &str = "nativelink";
48
49
// An `EnvFilter` to filter out non-nativelink information.
50
//
51
// See: https://github.com/open-telemetry/opentelemetry-rust/issues/2877
52
//
53
// Note that `EnvFilter` doesn't implement `clone`, so create a new one for
54
// each telemetry kind.
55
0
fn otlp_filter() -> EnvFilter {
56
0
    fn expect_parse(directive: &str) -> Directive {
57
0
        directive
58
0
            .parse()
59
0
            .unwrap_or_else(|_| panic!("Static directive '{directive}' failed to parse"))
60
0
    }
61
62
0
    EnvFilter::builder()
63
0
        .with_default_directive(LevelFilter::INFO.into())
64
0
        .from_env_lossy()
65
0
        .add_directive(expect_parse("hyper=off"))
66
0
        .add_directive(expect_parse("tonic=off"))
67
0
        .add_directive(expect_parse("h2=off"))
68
0
        .add_directive(expect_parse("reqwest=off"))
69
0
        .add_directive(expect_parse("tower=off"))
70
0
}
71
72
// Create a tracing layer intended for stdout printing.
73
//
74
// The output of this layer is configurable via the `NL_LOG` environment
75
// variable.
76
0
fn tracing_stdout_layer() -> impl Layer<Registry> {
77
0
    let nl_log_fmt = env::var("NL_LOG").unwrap_or_else(|_| "pretty".to_string());
78
79
0
    let stdout_filter = otlp_filter();
80
81
0
    match nl_log_fmt.as_str() {
82
0
        "compact" => fmt::layer()
83
0
            .compact()
84
0
            .with_timer(fmt::time::time())
85
0
            .with_filter(stdout_filter)
86
0
            .boxed(),
87
0
        "json" => fmt::layer()
88
0
            .json()
89
0
            .with_timer(fmt::time::time())
90
0
            .with_filter(stdout_filter)
91
0
            .boxed(),
92
0
        _ => fmt::layer()
93
0
            .pretty()
94
0
            .with_timer(fmt::time::time())
95
0
            .with_filter(stdout_filter)
96
0
            .boxed(),
97
    }
98
0
}
99
100
/// Initialize tracing with OpenTelemetry support.
101
///
102
/// # Errors
103
///
104
/// Returns `Err` if logging was already initialized or if the exporters can't
105
/// be initialized.
106
0
pub fn init_tracing() -> Result<(), nativelink_error::Error> {
107
    static INITIALIZED: OnceLock<()> = OnceLock::new();
108
109
0
    if INITIALIZED.get().is_some() {
110
0
        return Err(make_err!(Code::Internal, "Logging already initialized"));
111
0
    }
112
113
    // We currently use a UUIDv4 for "service.instance.id" as per:
114
    // https://opentelemetry.io/docs/specs/semconv/attributes-registry/service/
115
    // This might change as we get a better understanding of its usecases in the
116
    // context of broader observability infrastructure.
117
0
    let resource = Resource::builder()
118
0
        .with_service_name(NATIVELINK_SERVICE_NAME)
119
0
        .with_attribute(KeyValue::new(
120
            "service.instance.id",
121
0
            Uuid::new_v4().to_string(),
122
        ))
123
0
        .build();
124
125
0
    let propagator = TextMapCompositePropagator::new(vec![
126
0
        Box::new(BaggagePropagator::new()),
127
0
        Box::new(TraceContextPropagator::new()),
128
    ]);
129
0
    global::set_text_map_propagator(propagator);
130
131
    // Logs
132
0
    let otlp_log_layer = OpenTelemetryTracingBridge::new(
133
0
        &SdkLoggerProvider::builder()
134
0
            .with_resource(resource.clone())
135
0
            .with_batch_exporter(
136
0
                LogExporter::builder()
137
0
                    .with_tonic()
138
0
                    .with_protocol(Protocol::Grpc)
139
0
                    .build()
140
0
                    .map_err(|e| make_err!(Code::Internal, "{e}"))
141
0
                    .err_tip(|| "While creating OpenTelemetry OTLP Log exporter")?,
142
            )
143
0
            .build(),
144
    )
145
0
    .with_filter(otlp_filter());
146
147
    // Traces
148
0
    let otlp_trace_layer = layer()
149
0
        .with_tracer(
150
0
            SdkTracerProvider::builder()
151
0
                .with_resource(resource.clone())
152
0
                .with_batch_exporter(
153
0
                    SpanExporter::builder()
154
0
                        .with_tonic()
155
0
                        .with_protocol(Protocol::Grpc)
156
0
                        .build()
157
0
                        .map_err(|e| make_err!(Code::Internal, "{e}"))
158
0
                        .err_tip(|| "While creating OpenTelemetry OTLP Span exporter")?,
159
                )
160
0
                .build()
161
0
                .tracer(NATIVELINK_SERVICE_NAME),
162
        )
163
0
        .with_filter(otlp_filter());
164
165
    // Metrics
166
0
    let meter_provider = SdkMeterProvider::builder()
167
0
        .with_resource(resource)
168
0
        .with_periodic_exporter(
169
0
            MetricExporter::builder()
170
0
                .with_tonic()
171
0
                .with_protocol(Protocol::Grpc)
172
0
                .build()
173
0
                .map_err(|e| make_err!(Code::Internal, "{e}"))
174
0
                .err_tip(|| "While creating OpenTelemetry OTLP Metric exporter")?,
175
        )
176
0
        .build();
177
178
0
    global::set_meter_provider(meter_provider.clone());
179
180
0
    let otlp_metrics_layer = MetricsLayer::new(meter_provider).with_filter(otlp_filter());
181
182
0
    registry()
183
0
        .with(tracing_stdout_layer())
184
0
        .with(otlp_log_layer)
185
0
        .with(otlp_trace_layer)
186
0
        .with(otlp_metrics_layer)
187
0
        .init();
188
189
0
    INITIALIZED.set(()).unwrap_or(());
190
191
0
    Ok(())
192
0
}
193
194
/// Custom metadata key field for Bazel metadata.
195
const BAZEL_METADATA_KEY: &str = "bazel.metadata";
196
197
/// This is the header that bazel sends when using the `--remote_header` flag.
198
/// TODO(palfrey): There are various other headers that bazel supports.
199
///                    Optimize their usage.
200
const BAZEL_REQUESTMETADATA_HEADER: &str = "build.bazel.remote.execution.v2.requestmetadata-bin";
201
202
use opentelemetry::baggage::BaggageExt;
203
use opentelemetry::context::FutureExt;
204
205
#[derive(Debug, Clone)]
206
pub struct OtlpMiddleware<S> {
207
    inner: S,
208
    identity_required: bool,
209
}
210
211
impl<S> OtlpMiddleware<S> {
212
2
    const fn new(inner: S, identity_required: bool) -> Self {
213
2
        Self {
214
2
            inner,
215
2
            identity_required,
216
2
        }
217
2
    }
218
}
219
220
impl<S, ReqBody, ResBody> tower::Service<hyper::http::Request<ReqBody>> for OtlpMiddleware<S>
221
where
222
    S: tower::Service<hyper::http::Request<ReqBody>, Response = Response<ResBody>>
223
        + Clone
224
        + Send
225
        + 'static,
226
    S::Future: Send + 'static,
227
    ReqBody: core::fmt::Debug + Send + 'static,
228
    ResBody: From<String> + Send + 'static + Default,
229
{
230
    type Response = S::Response;
231
    type Error = S::Error;
232
    type Future = futures::future::BoxFuture<'static, Result<Self::Response, Self::Error>>;
233
234
2
    fn poll_ready(
235
2
        &mut self,
236
2
        cx: &mut core::task::Context<'_>,
237
2
    ) -> core::task::Poll<Result<(), Self::Error>> {
238
2
        self.inner.poll_ready(cx)
239
2
    }
240
241
2
    fn call(&mut self, req: hyper::http::Request<ReqBody>) -> Self::Future {
242
        // We must take the current `inner` and not the clone.
243
        // See: https://docs.rs/tower/latest/tower/trait.Service.html#be-careful-when-cloning-inner-services
244
2
        let clone = self.inner.clone();
245
2
        let mut inner = core::mem::replace(&mut self.inner, clone);
246
247
2
        let parent_cx = global::get_text_map_propagator(|propagator| {
248
2
            propagator.extract(&HeaderExtractor(req.headers()))
249
2
        });
250
251
2
        let identity = parent_cx
252
2
            .baggage()
253
2
            .get(ENDUSER_ID)
254
2
            .map(|value| 
value.as_str()1
.
to_string1
())
255
2
            .unwrap_or_default();
256
257
2
        if identity.is_empty() {
258
1
            if self.identity_required {
259
0
                return Box::pin(async move {
260
0
                    Ok(tonic::Status::failed_precondition(
261
0
                        r"
262
0
263
0
NativeLink instance configured to require this OpenTelemetry Baggage header:
264
0
265
0
    `Baggage: enduser.id=YOUR_IDENTITY`
266
0
267
0
",
268
0
                    )
269
0
                    .into_http())
270
0
                });
271
1
            }
272
        } else {
273
1
            debug!("Baggage enduser.id: {identity}");
274
        }
275
276
2
        let tracer = global::tracer("origin_middleware");
277
2
        let span = tracer
278
2
            .span_builder("origin_request")
279
2
            .with_kind(opentelemetry::trace::SpanKind::Server)
280
2
            .start_with_context(&tracer, &parent_cx);
281
282
2
        let mut cx = parent_cx.with_span(span);
283
284
2
        if let Some(
bazel_header0
) = req.headers().get(BAZEL_REQUESTMETADATA_HEADER)
285
0
            && let Ok(decoded) = BASE64_STANDARD_NO_PAD.decode(bazel_header.as_bytes())
286
0
            && let Ok(metadata) = RequestMetadata::decode(decoded.as_slice())
287
        {
288
0
            let metadata_str = format!("{metadata:?}");
289
0
            debug!("Baggage Bazel request metadata: {metadata_str}");
290
0
            cx = cx.with_baggage(vec![
291
0
                KeyValue::new(BAZEL_METADATA_KEY, metadata_str),
292
0
                KeyValue::new(ENDUSER_ID, identity),
293
            ]);
294
2
        }
295
296
2
        Box::pin(async move { inner.call(req).with_context(cx).await })
297
2
    }
298
}
299
300
#[derive(Debug, Clone, Copy)]
301
pub struct OtlpLayer {
302
    identity_required: bool,
303
}
304
305
impl OtlpLayer {
306
2
    pub const fn new(identity_required: bool) -> Self {
307
2
        Self { identity_required }
308
2
    }
309
}
310
311
impl<S> tower::Layer<S> for OtlpLayer {
312
    type Service = OtlpMiddleware<S>;
313
314
2
    fn layer(&self, service: S) -> Self::Service {
315
2
        OtlpMiddleware::new(service, self.identity_required)
316
2
    }
317
}