Coverage Report

Created: 2025-07-30 16:11

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/telemetry.rs
Line
Count
Source
1
// Copyright 2025 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::default::Default;
16
use std::env;
17
use std::sync::OnceLock;
18
19
use base64::Engine;
20
use base64::prelude::BASE64_STANDARD_NO_PAD;
21
use hyper::http::Response;
22
use nativelink_error::{Code, ResultExt, make_err};
23
use nativelink_proto::build::bazel::remote::execution::v2::RequestMetadata;
24
use opentelemetry::propagation::TextMapCompositePropagator;
25
use opentelemetry::trace::{TraceContextExt, Tracer, TracerProvider};
26
use opentelemetry::{KeyValue, global};
27
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
28
use opentelemetry_http::HeaderExtractor;
29
use opentelemetry_otlp::{LogExporter, MetricExporter, Protocol, SpanExporter, WithExportConfig};
30
use opentelemetry_sdk::Resource;
31
use opentelemetry_sdk::logs::SdkLoggerProvider;
32
use opentelemetry_sdk::metrics::SdkMeterProvider;
33
use opentelemetry_sdk::propagation::{BaggagePropagator, TraceContextPropagator};
34
use opentelemetry_sdk::trace::SdkTracerProvider;
35
use opentelemetry_semantic_conventions::attribute::ENDUSER_ID;
36
use prost::Message;
37
use tracing::debug;
38
use tracing::metadata::LevelFilter;
39
use tracing_opentelemetry::{MetricsLayer, layer};
40
use tracing_subscriber::filter::Directive;
41
use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt;
42
use tracing_subscriber::util::SubscriberInitExt;
43
use tracing_subscriber::{EnvFilter, Layer, Registry, fmt, registry};
44
use uuid::Uuid;
45
46
/// The OTLP "service.name" field for all nativelink services.
47
const NATIVELINK_SERVICE_NAME: &str = "nativelink";
48
49
// An `EnvFilter` to filter out non-nativelink information.
50
//
51
// See: https://github.com/open-telemetry/opentelemetry-rust/issues/2877
52
//
53
// Note that `EnvFilter` doesn't implement `clone`, so create a new one for
54
// each telemetry kind.
55
0
fn otlp_filter() -> EnvFilter {
56
0
    fn expect_parse(directive: &str) -> Directive {
57
0
        directive
58
0
            .parse()
59
0
            .unwrap_or_else(|_| panic!("Static directive '{directive}' failed to parse"))
60
0
    }
61
62
0
    EnvFilter::builder()
63
0
        .with_default_directive(LevelFilter::INFO.into())
64
0
        .from_env_lossy()
65
0
        .add_directive(expect_parse("hyper=off"))
66
0
        .add_directive(expect_parse("tonic=off"))
67
0
        .add_directive(expect_parse("h2=off"))
68
0
        .add_directive(expect_parse("reqwest=off"))
69
0
        .add_directive(expect_parse("tower=off"))
70
0
}
71
72
// Create a tracing layer intended for stdout printing.
73
//
74
// The output of this layer is configurable via the `NL_LOG_FMT` environment
75
// variable.
76
0
fn tracing_stdout_layer() -> impl Layer<Registry> {
77
0
    let nl_log_fmt = env::var("NL_LOG").unwrap_or_else(|_| "pretty".to_string());
78
79
0
    let stdout_filter = otlp_filter();
80
81
0
    match nl_log_fmt.as_str() {
82
0
        "compact" => fmt::layer()
83
0
            .compact()
84
0
            .with_timer(fmt::time::time())
85
0
            .with_filter(stdout_filter)
86
0
            .boxed(),
87
0
        "json" => fmt::layer()
88
0
            .json()
89
0
            .with_timer(fmt::time::time())
90
0
            .with_filter(stdout_filter)
91
0
            .boxed(),
92
0
        _ => fmt::layer()
93
0
            .pretty()
94
0
            .with_timer(fmt::time::time())
95
0
            .with_filter(stdout_filter)
96
0
            .boxed(),
97
    }
98
0
}
99
100
/// Initialize tracing with OpenTelemetry support.
101
///
102
/// # Errors
103
///
104
/// Returns `Err` if logging was already initialized or if the exporters can't
105
/// be initialized.
106
0
pub fn init_tracing() -> Result<(), nativelink_error::Error> {
107
    static INITIALIZED: OnceLock<()> = OnceLock::new();
108
109
0
    if INITIALIZED.get().is_some() {
  Branch (109:8): [True: 0, False: 0]
  Branch (109:8): [Folded - Ignored]
110
0
        return Err(make_err!(Code::Internal, "Logging already initialized"));
111
0
    }
112
113
    // We currently use a UUIDv4 for "service.instance.id" as per:
114
    // https://opentelemetry.io/docs/specs/semconv/attributes-registry/service/
115
    // This might change as we get a better understanding of its usecases in the
116
    // context of broader observability infrastructure.
117
0
    let resource = Resource::builder()
118
0
        .with_service_name(NATIVELINK_SERVICE_NAME)
119
0
        .with_attribute(KeyValue::new(
120
            "service.instance.id",
121
0
            Uuid::new_v4().to_string(),
122
        ))
123
0
        .build();
124
125
0
    let propagator = TextMapCompositePropagator::new(vec![
126
0
        Box::new(BaggagePropagator::new()),
127
0
        Box::new(TraceContextPropagator::new()),
128
    ]);
129
0
    global::set_text_map_propagator(propagator);
130
131
    // Logs
132
0
    let otlp_log_layer = OpenTelemetryTracingBridge::new(
133
0
        &SdkLoggerProvider::builder()
134
0
            .with_resource(resource.clone())
135
0
            .with_batch_exporter(
136
0
                LogExporter::builder()
137
0
                    .with_tonic()
138
0
                    .with_protocol(Protocol::Grpc)
139
0
                    .build()
140
0
                    .map_err(|e| make_err!(Code::Internal, "{e}"))
141
0
                    .err_tip(|| "While creating OpenTelemetry OTLP Log exporter")?,
142
            )
143
0
            .build(),
144
    )
145
0
    .with_filter(otlp_filter());
146
147
    // Traces
148
0
    let otlp_trace_layer = layer()
149
0
        .with_tracer(
150
0
            SdkTracerProvider::builder()
151
0
                .with_resource(resource.clone())
152
0
                .with_batch_exporter(
153
0
                    SpanExporter::builder()
154
0
                        .with_tonic()
155
0
                        .with_protocol(Protocol::Grpc)
156
0
                        .build()
157
0
                        .map_err(|e| make_err!(Code::Internal, "{e}"))
158
0
                        .err_tip(|| "While creating OpenTelemetry OTLP Span exporter")?,
159
                )
160
0
                .build()
161
0
                .tracer(NATIVELINK_SERVICE_NAME),
162
        )
163
0
        .with_filter(otlp_filter());
164
165
    // Metrics
166
0
    let meter_provider = SdkMeterProvider::builder()
167
0
        .with_resource(resource)
168
0
        .with_periodic_exporter(
169
0
            MetricExporter::builder()
170
0
                .with_tonic()
171
0
                .with_protocol(Protocol::Grpc)
172
0
                .build()
173
0
                .map_err(|e| make_err!(Code::Internal, "{e}"))
174
0
                .err_tip(|| "While creating OpenTelemetry OTLP Metric exporter")?,
175
        )
176
0
        .build();
177
178
0
    global::set_meter_provider(meter_provider.clone());
179
180
0
    let otlp_metrics_layer = MetricsLayer::new(meter_provider).with_filter(otlp_filter());
181
182
0
    registry()
183
0
        .with(tracing_stdout_layer())
184
0
        .with(otlp_log_layer)
185
0
        .with(otlp_trace_layer)
186
0
        .with(otlp_metrics_layer)
187
0
        .init();
188
189
0
    INITIALIZED.set(()).unwrap_or(());
190
191
0
    Ok(())
192
0
}
193
194
/// Custom metadata key field for Bazel metadata.
195
const BAZEL_METADATA_KEY: &str = "bazel.metadata";
196
197
/// This is the header that bazel sends when using the `--remote_header` flag.
198
/// TODO(aaronmondal): There are various other headers that bazel supports.
199
///                    Optimize their usage.
200
const BAZEL_REQUESTMETADATA_HEADER: &str = "build.bazel.remote.execution.v2.requestmetadata-bin";
201
202
use opentelemetry::baggage::BaggageExt;
203
use opentelemetry::context::FutureExt;
204
205
#[derive(Debug, Clone)]
206
pub struct OtlpMiddleware<S> {
207
    inner: S,
208
    identity_required: bool,
209
}
210
211
impl<S> OtlpMiddleware<S> {
212
0
    const fn new(inner: S, identity_required: bool) -> Self {
213
0
        Self {
214
0
            inner,
215
0
            identity_required,
216
0
        }
217
0
    }
218
}
219
220
impl<S, ReqBody, ResBody> tower::Service<hyper::http::Request<ReqBody>> for OtlpMiddleware<S>
221
where
222
    S: tower::Service<hyper::http::Request<ReqBody>, Response = Response<ResBody>>
223
        + Clone
224
        + Send
225
        + 'static,
226
    S::Future: Send + 'static,
227
    ReqBody: core::fmt::Debug + Send + 'static,
228
    ResBody: From<String> + Send + 'static + Default,
229
{
230
    type Response = S::Response;
231
    type Error = S::Error;
232
    type Future = futures::future::BoxFuture<'static, Result<Self::Response, Self::Error>>;
233
234
0
    fn poll_ready(
235
0
        &mut self,
236
0
        cx: &mut core::task::Context<'_>,
237
0
    ) -> core::task::Poll<Result<(), Self::Error>> {
238
0
        self.inner.poll_ready(cx)
239
0
    }
240
241
0
    fn call(&mut self, req: hyper::http::Request<ReqBody>) -> Self::Future {
242
        // We must take the current `inner` and not the clone.
243
        // See: https://docs.rs/tower/latest/tower/trait.Service.html#be-careful-when-cloning-inner-services
244
0
        let clone = self.inner.clone();
245
0
        let mut inner = core::mem::replace(&mut self.inner, clone);
246
247
0
        let parent_cx = global::get_text_map_propagator(|propagator| {
248
0
            propagator.extract(&HeaderExtractor(req.headers()))
249
0
        });
250
251
0
        let identity = parent_cx
252
0
            .baggage()
253
0
            .get(ENDUSER_ID)
254
0
            .map(|value| value.as_str().to_string())
255
0
            .unwrap_or_default();
256
257
0
        if identity.is_empty() && self.identity_required {
  Branch (257:12): [Folded - Ignored]
  Branch (257:35): [Folded - Ignored]
  Branch (257:12): [Folded - Ignored]
  Branch (257:35): [Folded - Ignored]
258
0
            return Box::pin(async move {
259
0
                Ok(tonic::Status::failed_precondition(
260
0
                    r"
261
0
262
0
NativeLink instance configured to require this OpenTelemetry Baggage header:
263
0
264
0
    `Baggage: enduser.id=YOUR_IDENTITY`
265
0
266
0
",
267
0
                )
268
0
                .into_http())
269
0
            });
270
0
        }
271
272
0
        debug!("Baggage enduser.id: {identity}");
273
274
0
        let tracer = global::tracer("origin_middleware");
275
0
        let span = tracer
276
0
            .span_builder("origin_request")
277
0
            .with_kind(opentelemetry::trace::SpanKind::Server)
278
0
            .start_with_context(&tracer, &parent_cx);
279
280
0
        let mut cx = parent_cx.with_span(span);
281
282
0
        if let Some(bazel_header) = req.headers().get(BAZEL_REQUESTMETADATA_HEADER) {
  Branch (282:16): [Folded - Ignored]
  Branch (282:16): [Folded - Ignored]
283
0
            if let Ok(decoded) = BASE64_STANDARD_NO_PAD.decode(bazel_header.as_bytes()) {
  Branch (283:20): [Folded - Ignored]
  Branch (283:20): [Folded - Ignored]
284
0
                if let Ok(metadata) = RequestMetadata::decode(decoded.as_slice()) {
  Branch (284:24): [Folded - Ignored]
  Branch (284:24): [Folded - Ignored]
285
0
                    let metadata_str = format!("{metadata:?}");
286
0
                    debug!("Baggage Bazel request metadata: {metadata_str}");
287
0
                    cx = cx.with_baggage(vec![
288
0
                        KeyValue::new(BAZEL_METADATA_KEY, metadata_str),
289
0
                        KeyValue::new(ENDUSER_ID, identity),
290
                    ]);
291
0
                }
292
0
            }
293
0
        }
294
295
0
        Box::pin(async move { inner.call(req).with_context(cx).await })
296
0
    }
297
}
298
299
#[derive(Debug, Clone, Copy)]
300
pub struct OtlpLayer {
301
    identity_required: bool,
302
}
303
304
impl OtlpLayer {
305
0
    pub const fn new(identity_required: bool) -> Self {
306
0
        Self { identity_required }
307
0
    }
308
}
309
310
impl<S> tower::Layer<S> for OtlpLayer {
311
    type Service = OtlpMiddleware<S>;
312
313
0
    fn layer(&self, service: S) -> Self::Service {
314
0
        OtlpMiddleware::new(service, self.identity_required)
315
0
    }
316
}