Coverage Report

Created: 2025-05-30 16:37

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/telemetry.rs
Line
Count
Source
1
// Copyright 2025 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::default::Default;
16
use std::env;
17
use std::sync::OnceLock;
18
19
use base64::Engine;
20
use base64::prelude::BASE64_STANDARD_NO_PAD;
21
use hyper::http::Response;
22
use nativelink_error::{Code, ResultExt, make_err};
23
use nativelink_proto::build::bazel::remote::execution::v2::RequestMetadata;
24
use opentelemetry::propagation::TextMapCompositePropagator;
25
use opentelemetry::trace::{TraceContextExt, Tracer, TracerProvider};
26
use opentelemetry::{KeyValue, global};
27
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
28
use opentelemetry_http::HeaderExtractor;
29
use opentelemetry_otlp::{LogExporter, MetricExporter, Protocol, SpanExporter, WithExportConfig};
30
use opentelemetry_sdk::Resource;
31
use opentelemetry_sdk::logs::SdkLoggerProvider;
32
use opentelemetry_sdk::metrics::SdkMeterProvider;
33
use opentelemetry_sdk::propagation::{BaggagePropagator, TraceContextPropagator};
34
use opentelemetry_sdk::trace::SdkTracerProvider;
35
use opentelemetry_semantic_conventions::attribute::ENDUSER_ID;
36
use prost::Message;
37
use tracing::debug;
38
use tracing::metadata::LevelFilter;
39
use tracing_opentelemetry::{MetricsLayer, layer};
40
use tracing_subscriber::filter::Directive;
41
use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt;
42
use tracing_subscriber::util::SubscriberInitExt;
43
use tracing_subscriber::{EnvFilter, Layer, Registry, fmt, registry};
44
use uuid::Uuid;
45
46
/// The OTLP "service.name" field for all nativelink services.
47
const NATIVELINK_SERVICE_NAME: &str = "nativelink";
48
49
// An `EnvFilter` to filter out non-nativelink information.
50
//
51
// See: https://github.com/open-telemetry/opentelemetry-rust/issues/2877
52
//
53
// Note that `EnvFilter` doesn't implement `clone`, so create a new one for
54
// each telemetry kind.
55
42
fn otlp_filter() -> EnvFilter {
56
210
    fn expect_parse(directive: &str) -> Directive {
57
210
        directive
58
210
            .parse()
59
210
            .unwrap_or_else(|_| panic!(
"Static directive '{directive}' failed to parse"0
))
60
210
    }
61
62
42
    EnvFilter::builder()
63
42
        .with_default_directive(LevelFilter::INFO.into())
64
42
        .from_env_lossy()
65
42
        .add_directive(expect_parse("hyper=off"))
66
42
        .add_directive(expect_parse("tonic=off"))
67
42
        .add_directive(expect_parse("h2=off"))
68
42
        .add_directive(expect_parse("reqwest=off"))
69
42
        .add_directive(expect_parse("tower=off"))
70
42
}
71
72
// Create a tracing layer intended for stdout printing.
73
//
74
// The output of this layer is configurable via the `NL_LOG_FMT` environment
75
// variable.
76
42
fn tracing_stdout_layer() -> impl Layer<Registry> {
77
42
    let nl_log_fmt = env::var("NL_LOG").unwrap_or_else(|_| "pretty".to_string());
78
79
42
    let stdout_filter = otlp_filter();
80
81
42
    match nl_log_fmt.as_str() {
82
42
        "compact" => 
fmt::layer0
()
83
0
            .compact()
84
0
            .with_timer(fmt::time::time())
85
0
            .with_filter(stdout_filter)
86
0
            .boxed(),
87
42
        "json" => 
fmt::layer0
()
88
0
            .json()
89
0
            .with_timer(fmt::time::time())
90
0
            .with_filter(stdout_filter)
91
0
            .boxed(),
92
42
        _ => fmt::layer()
93
42
            .pretty()
94
42
            .with_timer(fmt::time::time())
95
42
            .with_filter(stdout_filter)
96
42
            .boxed(),
97
    }
98
42
}
99
100
/// Initialize a minimal tracing configuration for tests.
101
///
102
/// The OTLP logic in the main tracing loop causes issues with the tokio runtime
103
/// in tests, so we use a more naive logger implementation here. This function
104
/// is idempotent and can be called multiple times safely.
105
343
pub fn init_tracing_for_tests() {
106
    static INITIALIZED: OnceLock<()> = OnceLock::new();
107
343
    INITIALIZED.get_or_init(|| 
{42
108
42
        registry().with(tracing_stdout_layer()).init();
109
42
    });
110
343
}
111
112
/// Initialize tracing with OpenTelemetry support.
113
///
114
/// # Errors
115
///
116
/// Returns `Err` if logging was already initialized or if the exporters can't
117
/// be initialized.
118
0
pub fn init_tracing() -> Result<(), nativelink_error::Error> {
119
    static INITIALIZED: OnceLock<()> = OnceLock::new();
120
121
0
    if INITIALIZED.get().is_some() {
  Branch (121:8): [True: 0, False: 0]
  Branch (121:8): [Folded - Ignored]
122
0
        return Err(make_err!(Code::Internal, "Logging already initialized"));
123
0
    }
124
125
    // We currently use a UUIDv4 for "service.instance.id" as per:
126
    // https://opentelemetry.io/docs/specs/semconv/attributes-registry/service/
127
    // This might change as we get a better understanding of its usecases in the
128
    // context of broader observability infrastructure.
129
0
    let resource = Resource::builder()
130
0
        .with_service_name(NATIVELINK_SERVICE_NAME)
131
0
        .with_attribute(KeyValue::new(
132
            "service.instance.id",
133
0
            Uuid::new_v4().to_string(),
134
        ))
135
0
        .build();
136
137
0
    let propagator = TextMapCompositePropagator::new(vec![
138
0
        Box::new(BaggagePropagator::new()),
139
0
        Box::new(TraceContextPropagator::new()),
140
    ]);
141
0
    global::set_text_map_propagator(propagator);
142
143
    // Logs
144
0
    let otlp_log_layer = OpenTelemetryTracingBridge::new(
145
0
        &SdkLoggerProvider::builder()
146
0
            .with_resource(resource.clone())
147
0
            .with_batch_exporter(
148
0
                LogExporter::builder()
149
0
                    .with_tonic()
150
0
                    .with_protocol(Protocol::Grpc)
151
0
                    .build()
152
0
                    .map_err(|e| make_err!(Code::Internal, "{e}"))
153
0
                    .err_tip(|| "While creating OpenTelemetry OTLP Log exporter")?,
154
            )
155
0
            .build(),
156
    )
157
0
    .with_filter(otlp_filter());
158
159
    // Traces
160
0
    let otlp_trace_layer = layer()
161
0
        .with_tracer(
162
0
            SdkTracerProvider::builder()
163
0
                .with_resource(resource.clone())
164
0
                .with_batch_exporter(
165
0
                    SpanExporter::builder()
166
0
                        .with_tonic()
167
0
                        .with_protocol(Protocol::Grpc)
168
0
                        .build()
169
0
                        .map_err(|e| make_err!(Code::Internal, "{e}"))
170
0
                        .err_tip(|| "While creating OpenTelemetry OTLP Span exporter")?,
171
                )
172
0
                .build()
173
0
                .tracer(NATIVELINK_SERVICE_NAME),
174
        )
175
0
        .with_filter(otlp_filter());
176
177
    // Metrics
178
0
    let meter_provider = SdkMeterProvider::builder()
179
0
        .with_resource(resource)
180
0
        .with_periodic_exporter(
181
0
            MetricExporter::builder()
182
0
                .with_tonic()
183
0
                .with_protocol(Protocol::Grpc)
184
0
                .build()
185
0
                .map_err(|e| make_err!(Code::Internal, "{e}"))
186
0
                .err_tip(|| "While creating OpenTelemetry OTLP Metric exporter")?,
187
        )
188
0
        .build();
189
190
0
    global::set_meter_provider(meter_provider.clone());
191
192
0
    let otlp_metrics_layer = MetricsLayer::new(meter_provider).with_filter(otlp_filter());
193
194
0
    registry()
195
0
        .with(tracing_stdout_layer())
196
0
        .with(otlp_log_layer)
197
0
        .with(otlp_trace_layer)
198
0
        .with(otlp_metrics_layer)
199
0
        .init();
200
201
0
    INITIALIZED.set(()).unwrap_or(());
202
203
0
    Ok(())
204
0
}
205
206
/// Custom metadata key field for Bazel metadata.
207
const BAZEL_METADATA_KEY: &str = "bazel.metadata";
208
209
/// This is the header that bazel sends when using the `--remote_header` flag.
210
/// TODO(aaronmondal): There are various other headers that bazel supports.
211
///                    Optimize their usage.
212
const BAZEL_REQUESTMETADATA_HEADER: &str = "build.bazel.remote.execution.v2.requestmetadata-bin";
213
214
use opentelemetry::baggage::BaggageExt;
215
use opentelemetry::context::FutureExt;
216
217
#[derive(Debug, Clone)]
218
pub struct OtlpMiddleware<S> {
219
    inner: S,
220
    identity_required: bool,
221
}
222
223
impl<S> OtlpMiddleware<S> {
224
0
    const fn new(inner: S, identity_required: bool) -> Self {
225
0
        Self {
226
0
            inner,
227
0
            identity_required,
228
0
        }
229
0
    }
230
}
231
232
impl<S, ReqBody, ResBody> tower::Service<hyper::http::Request<ReqBody>> for OtlpMiddleware<S>
233
where
234
    S: tower::Service<hyper::http::Request<ReqBody>, Response = Response<ResBody>>
235
        + Clone
236
        + Send
237
        + 'static,
238
    S::Future: Send + 'static,
239
    ReqBody: core::fmt::Debug + Send + 'static,
240
    ResBody: From<String> + Send + 'static + Default,
241
{
242
    type Response = S::Response;
243
    type Error = S::Error;
244
    type Future = futures::future::BoxFuture<'static, Result<Self::Response, Self::Error>>;
245
246
0
    fn poll_ready(
247
0
        &mut self,
248
0
        cx: &mut core::task::Context<'_>,
249
0
    ) -> core::task::Poll<Result<(), Self::Error>> {
250
0
        self.inner.poll_ready(cx)
251
0
    }
252
253
0
    fn call(&mut self, req: hyper::http::Request<ReqBody>) -> Self::Future {
254
        // We must take the current `inner` and not the clone.
255
        // See: https://docs.rs/tower/latest/tower/trait.Service.html#be-careful-when-cloning-inner-services
256
0
        let clone = self.inner.clone();
257
0
        let mut inner = core::mem::replace(&mut self.inner, clone);
258
259
0
        let parent_cx = global::get_text_map_propagator(|propagator| {
260
0
            propagator.extract(&HeaderExtractor(req.headers()))
261
0
        });
262
263
0
        let identity = parent_cx
264
0
            .baggage()
265
0
            .get(ENDUSER_ID)
266
0
            .map(|value| value.as_str().to_string())
267
0
            .unwrap_or_default();
268
269
0
        if identity.is_empty() && self.identity_required {
  Branch (269:12): [Folded - Ignored]
  Branch (269:35): [Folded - Ignored]
  Branch (269:12): [Folded - Ignored]
  Branch (269:35): [Folded - Ignored]
270
0
            return Box::pin(async move {
271
0
                Ok(tonic::Status::failed_precondition(
272
0
                    r"
273
0
274
0
NativeLink instance configured to require this OpenTelemetry Baggage header:
275
0
276
0
    `Baggage: enduser.id=YOUR_IDENTITY`
277
0
278
0
",
279
0
                )
280
0
                .into_http())
281
0
            });
282
0
        }
283
284
0
        debug!("Baggage enduser.id: {identity}");
285
286
0
        let tracer = global::tracer("origin_middleware");
287
0
        let span = tracer
288
0
            .span_builder("origin_request")
289
0
            .with_kind(opentelemetry::trace::SpanKind::Server)
290
0
            .start_with_context(&tracer, &parent_cx);
291
292
0
        let mut cx = parent_cx.with_span(span);
293
294
0
        if let Some(bazel_header) = req.headers().get(BAZEL_REQUESTMETADATA_HEADER) {
  Branch (294:16): [Folded - Ignored]
  Branch (294:16): [Folded - Ignored]
295
0
            if let Ok(decoded) = BASE64_STANDARD_NO_PAD.decode(bazel_header.as_bytes()) {
  Branch (295:20): [Folded - Ignored]
  Branch (295:20): [Folded - Ignored]
296
0
                if let Ok(metadata) = RequestMetadata::decode(decoded.as_slice()) {
  Branch (296:24): [Folded - Ignored]
  Branch (296:24): [Folded - Ignored]
297
0
                    let metadata_str = format!("{metadata:?}");
298
0
                    debug!("Baggage Bazel request metadata: {metadata_str}");
299
0
                    cx = cx.with_baggage(vec![
300
0
                        KeyValue::new(BAZEL_METADATA_KEY, metadata_str),
301
0
                        KeyValue::new(ENDUSER_ID, identity),
302
                    ]);
303
0
                }
304
0
            }
305
0
        }
306
307
0
        Box::pin(async move { inner.call(req).with_context(cx).await })
308
0
    }
309
}
310
311
#[derive(Debug, Clone, Copy)]
312
pub struct OtlpLayer {
313
    identity_required: bool,
314
}
315
316
impl OtlpLayer {
317
0
    pub const fn new(identity_required: bool) -> Self {
318
0
        Self { identity_required }
319
0
    }
320
}
321
322
impl<S> tower::Layer<S> for OtlpLayer {
323
    type Service = OtlpMiddleware<S>;
324
325
0
    fn layer(&self, service: S) -> Self::Service {
326
0
        OtlpMiddleware::new(service, self.identity_required)
327
0
    }
328
}