Coverage Report

Created: 2025-12-17 22:46

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/telemetry.rs
Line
Count
Source
1
// Copyright 2025 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::default::Default;
16
use std::env;
17
use std::sync::OnceLock;
18
19
use base64::Engine;
20
use base64::prelude::BASE64_STANDARD_NO_PAD;
21
use hyper::http::Response;
22
use nativelink_error::{Code, ResultExt, make_err};
23
use nativelink_proto::build::bazel::remote::execution::v2::RequestMetadata;
24
use opentelemetry::propagation::TextMapCompositePropagator;
25
use opentelemetry::trace::{TraceContextExt, Tracer, TracerProvider};
26
use opentelemetry::{KeyValue, global};
27
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
28
use opentelemetry_http::HeaderExtractor;
29
use opentelemetry_otlp::{LogExporter, MetricExporter, Protocol, SpanExporter, WithExportConfig};
30
use opentelemetry_sdk::Resource;
31
use opentelemetry_sdk::logs::SdkLoggerProvider;
32
use opentelemetry_sdk::metrics::SdkMeterProvider;
33
use opentelemetry_sdk::propagation::{BaggagePropagator, TraceContextPropagator};
34
use opentelemetry_sdk::trace::SdkTracerProvider;
35
use opentelemetry_semantic_conventions::attribute::ENDUSER_ID;
36
use prost::Message;
37
use tracing::debug;
38
use tracing::metadata::LevelFilter;
39
use tracing_opentelemetry::{MetricsLayer, layer};
40
use tracing_subscriber::filter::Directive;
41
use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt;
42
use tracing_subscriber::util::SubscriberInitExt;
43
use tracing_subscriber::{EnvFilter, Layer, Registry, fmt, registry};
44
use uuid::Uuid;
45
46
/// The OTLP "service.name" field for all nativelink services.
47
const NATIVELINK_SERVICE_NAME: &str = "nativelink";
48
49
// An `EnvFilter` to filter out non-nativelink information.
50
//
51
// See: https://github.com/open-telemetry/opentelemetry-rust/issues/2877
52
//
53
// Note that `EnvFilter` doesn't implement `clone`, so create a new one for
54
// each telemetry kind.
55
0
fn otlp_filter() -> EnvFilter {
56
0
    fn expect_parse(directive: &str) -> Directive {
57
0
        directive
58
0
            .parse()
59
0
            .unwrap_or_else(|_| panic!("Static directive '{directive}' failed to parse"))
60
0
    }
61
62
0
    EnvFilter::builder()
63
0
        .with_default_directive(LevelFilter::INFO.into())
64
0
        .from_env_lossy()
65
0
        .add_directive(expect_parse("hyper=off"))
66
0
        .add_directive(expect_parse("tonic=off"))
67
0
        .add_directive(expect_parse("h2=off"))
68
0
        .add_directive(expect_parse("reqwest=off"))
69
0
        .add_directive(expect_parse("tower=off"))
70
0
        .add_directive(expect_parse("fred=off"))
71
0
}
72
73
// Create a tracing layer intended for stdout printing.
74
//
75
// The output of this layer is configurable via the `NL_LOG` environment
76
// variable.
77
0
fn tracing_stdout_layer() -> impl Layer<Registry> {
78
0
    let nl_log_fmt = env::var("NL_LOG").unwrap_or_else(|_| "pretty".to_string());
79
80
0
    let stdout_filter = otlp_filter();
81
82
0
    match nl_log_fmt.as_str() {
83
0
        "compact" => fmt::layer()
84
0
            .compact()
85
0
            .with_timer(fmt::time::time())
86
0
            .with_filter(stdout_filter)
87
0
            .boxed(),
88
0
        "json" => fmt::layer()
89
0
            .json()
90
0
            .with_timer(fmt::time::time())
91
0
            .with_filter(stdout_filter)
92
0
            .boxed(),
93
0
        _ => fmt::layer()
94
0
            .pretty()
95
0
            .with_timer(fmt::time::time())
96
0
            .with_filter(stdout_filter)
97
0
            .boxed(),
98
    }
99
0
}
100
101
/// Initialize tracing with OpenTelemetry support.
102
///
103
/// # Errors
104
///
105
/// Returns `Err` if logging was already initialized or if the exporters can't
106
/// be initialized.
107
0
pub fn init_tracing() -> Result<(), nativelink_error::Error> {
108
    static INITIALIZED: OnceLock<()> = OnceLock::new();
109
110
0
    if INITIALIZED.get().is_some() {
  Branch (110:8): [True: 0, False: 0]
  Branch (110:8): [Folded - Ignored]
111
0
        return Err(make_err!(Code::Internal, "Logging already initialized"));
112
0
    }
113
114
    // We currently use a UUIDv4 for "service.instance.id" as per:
115
    // https://opentelemetry.io/docs/specs/semconv/attributes-registry/service/
116
    // This might change as we get a better understanding of its usecases in the
117
    // context of broader observability infrastructure.
118
0
    let resource = Resource::builder()
119
0
        .with_service_name(NATIVELINK_SERVICE_NAME)
120
0
        .with_attribute(KeyValue::new(
121
            "service.instance.id",
122
0
            Uuid::new_v4().to_string(),
123
        ))
124
0
        .build();
125
126
0
    let propagator = TextMapCompositePropagator::new(vec![
127
0
        Box::new(BaggagePropagator::new()),
128
0
        Box::new(TraceContextPropagator::new()),
129
    ]);
130
0
    global::set_text_map_propagator(propagator);
131
132
    // Logs
133
0
    let otlp_log_layer = OpenTelemetryTracingBridge::new(
134
0
        &SdkLoggerProvider::builder()
135
0
            .with_resource(resource.clone())
136
0
            .with_batch_exporter(
137
0
                LogExporter::builder()
138
0
                    .with_tonic()
139
0
                    .with_protocol(Protocol::Grpc)
140
0
                    .build()
141
0
                    .map_err(|e| make_err!(Code::Internal, "{e}"))
142
0
                    .err_tip(|| "While creating OpenTelemetry OTLP Log exporter")?,
143
            )
144
0
            .build(),
145
    )
146
0
    .with_filter(otlp_filter());
147
148
    // Traces
149
0
    let otlp_trace_layer = layer()
150
0
        .with_tracer(
151
0
            SdkTracerProvider::builder()
152
0
                .with_resource(resource.clone())
153
0
                .with_batch_exporter(
154
0
                    SpanExporter::builder()
155
0
                        .with_tonic()
156
0
                        .with_protocol(Protocol::Grpc)
157
0
                        .build()
158
0
                        .map_err(|e| make_err!(Code::Internal, "{e}"))
159
0
                        .err_tip(|| "While creating OpenTelemetry OTLP Span exporter")?,
160
                )
161
0
                .build()
162
0
                .tracer(NATIVELINK_SERVICE_NAME),
163
        )
164
0
        .with_filter(otlp_filter());
165
166
    // Metrics
167
0
    let meter_provider = SdkMeterProvider::builder()
168
0
        .with_resource(resource)
169
0
        .with_periodic_exporter(
170
0
            MetricExporter::builder()
171
0
                .with_tonic()
172
0
                .with_protocol(Protocol::Grpc)
173
0
                .build()
174
0
                .map_err(|e| make_err!(Code::Internal, "{e}"))
175
0
                .err_tip(|| "While creating OpenTelemetry OTLP Metric exporter")?,
176
        )
177
0
        .build();
178
179
0
    global::set_meter_provider(meter_provider.clone());
180
181
0
    let otlp_metrics_layer = MetricsLayer::new(meter_provider).with_filter(otlp_filter());
182
183
0
    registry()
184
0
        .with(tracing_stdout_layer())
185
0
        .with(otlp_log_layer)
186
0
        .with(otlp_trace_layer)
187
0
        .with(otlp_metrics_layer)
188
0
        .init();
189
190
0
    INITIALIZED.set(()).unwrap_or(());
191
192
0
    Ok(())
193
0
}
194
195
/// Custom metadata key field for Bazel metadata.
196
const BAZEL_METADATA_KEY: &str = "bazel.metadata";
197
198
/// This is the header that bazel sends when using the `--remote_header` flag.
199
/// TODO(palfrey): There are various other headers that bazel supports.
200
///                    Optimize their usage.
201
const BAZEL_REQUESTMETADATA_HEADER: &str = "build.bazel.remote.execution.v2.requestmetadata-bin";
202
203
use opentelemetry::baggage::BaggageExt;
204
use opentelemetry::context::FutureExt;
205
206
#[derive(Debug, Clone)]
207
pub struct OtlpMiddleware<S> {
208
    inner: S,
209
    identity_required: bool,
210
}
211
212
impl<S> OtlpMiddleware<S> {
213
0
    const fn new(inner: S, identity_required: bool) -> Self {
214
0
        Self {
215
0
            inner,
216
0
            identity_required,
217
0
        }
218
0
    }
219
}
220
221
impl<S, ReqBody, ResBody> tower::Service<hyper::http::Request<ReqBody>> for OtlpMiddleware<S>
222
where
223
    S: tower::Service<hyper::http::Request<ReqBody>, Response = Response<ResBody>>
224
        + Clone
225
        + Send
226
        + 'static,
227
    S::Future: Send + 'static,
228
    ReqBody: core::fmt::Debug + Send + 'static,
229
    ResBody: From<String> + Send + 'static + Default,
230
{
231
    type Response = S::Response;
232
    type Error = S::Error;
233
    type Future = futures::future::BoxFuture<'static, Result<Self::Response, Self::Error>>;
234
235
0
    fn poll_ready(
236
0
        &mut self,
237
0
        cx: &mut core::task::Context<'_>,
238
0
    ) -> core::task::Poll<Result<(), Self::Error>> {
239
0
        self.inner.poll_ready(cx)
240
0
    }
241
242
0
    fn call(&mut self, req: hyper::http::Request<ReqBody>) -> Self::Future {
243
        // We must take the current `inner` and not the clone.
244
        // See: https://docs.rs/tower/latest/tower/trait.Service.html#be-careful-when-cloning-inner-services
245
0
        let clone = self.inner.clone();
246
0
        let mut inner = core::mem::replace(&mut self.inner, clone);
247
248
0
        let parent_cx = global::get_text_map_propagator(|propagator| {
249
0
            propagator.extract(&HeaderExtractor(req.headers()))
250
0
        });
251
252
0
        let identity = parent_cx
253
0
            .baggage()
254
0
            .get(ENDUSER_ID)
255
0
            .map(|value| value.as_str().to_string())
256
0
            .unwrap_or_default();
257
258
0
        if identity.is_empty() && self.identity_required {
  Branch (258:12): [Folded - Ignored]
  Branch (258:35): [Folded - Ignored]
  Branch (258:12): [Folded - Ignored]
  Branch (258:35): [Folded - Ignored]
259
0
            return Box::pin(async move {
260
0
                Ok(tonic::Status::failed_precondition(
261
0
                    r"
262
0
263
0
NativeLink instance configured to require this OpenTelemetry Baggage header:
264
0
265
0
    `Baggage: enduser.id=YOUR_IDENTITY`
266
0
267
0
",
268
0
                )
269
0
                .into_http())
270
0
            });
271
0
        }
272
273
0
        debug!("Baggage enduser.id: {identity}");
274
275
0
        let tracer = global::tracer("origin_middleware");
276
0
        let span = tracer
277
0
            .span_builder("origin_request")
278
0
            .with_kind(opentelemetry::trace::SpanKind::Server)
279
0
            .start_with_context(&tracer, &parent_cx);
280
281
0
        let mut cx = parent_cx.with_span(span);
282
283
0
        if let Some(bazel_header) = req.headers().get(BAZEL_REQUESTMETADATA_HEADER) {
  Branch (283:16): [Folded - Ignored]
  Branch (283:16): [Folded - Ignored]
284
0
            if let Ok(decoded) = BASE64_STANDARD_NO_PAD.decode(bazel_header.as_bytes()) {
  Branch (284:20): [Folded - Ignored]
  Branch (284:20): [Folded - Ignored]
285
0
                if let Ok(metadata) = RequestMetadata::decode(decoded.as_slice()) {
  Branch (285:24): [Folded - Ignored]
  Branch (285:24): [Folded - Ignored]
286
0
                    let metadata_str = format!("{metadata:?}");
287
0
                    debug!("Baggage Bazel request metadata: {metadata_str}");
288
0
                    cx = cx.with_baggage(vec![
289
0
                        KeyValue::new(BAZEL_METADATA_KEY, metadata_str),
290
0
                        KeyValue::new(ENDUSER_ID, identity),
291
                    ]);
292
0
                }
293
0
            }
294
0
        }
295
296
0
        Box::pin(async move { inner.call(req).with_context(cx).await })
297
0
    }
298
}
299
300
#[derive(Debug, Clone, Copy)]
301
pub struct OtlpLayer {
302
    identity_required: bool,
303
}
304
305
impl OtlpLayer {
306
0
    pub const fn new(identity_required: bool) -> Self {
307
0
        Self { identity_required }
308
0
    }
309
}
310
311
impl<S> tower::Layer<S> for OtlpLayer {
312
    type Service = OtlpMiddleware<S>;
313
314
0
    fn layer(&self, service: S) -> Self::Service {
315
0
        OtlpMiddleware::new(service, self.identity_required)
316
0
    }
317
}