Coverage Report

Created: 2024-10-22 12:33

/build/source/nativelink-util/src/metrics_utils.rs
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::mem::forget;
16
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
17
use std::thread_local;
18
use std::time::{Instant, SystemTime, UNIX_EPOCH};
19
20
use futures::Future;
21
use nativelink_metric::{
22
    group, publish, MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent,
23
};
24
25
thread_local! {
26
    /// This is a thread local variable that will enable or disable metrics for
27
    /// the current thread. This does not mean that metrics are "disabled"
28
    /// everywhere. It only means that metrics gathering for this specific thread
29
    /// will be disabled. Because tokio uses thread pools, if you change this
30
    /// value you'll need to change it on every thread tokio is using, often using
31
    /// the `tokio::runtime::Builder::on_thread_start` function. This field also
32
    /// does not mean that metrics cannot be pulled from the registry. It only
33
    /// removes the ability for metrics that are collected at runtime (hot path)
34
    /// from being collected.
35
    pub static METRICS_ENABLED: AtomicBool = const { AtomicBool::new(true) };
36
}
37
38
#[inline]
39
6.19k
pub fn metrics_enabled() -> bool {
40
6.19k
    METRICS_ENABLED.with(
41
6.19k
        #[inline]
42
6.19k
        |v| v.load(Ordering::Acquire),
43
6.19k
    )
44
6.19k
}
45
46
/// This function will enable or disable metrics for the current thread.
47
/// WARNING: This will only happen for this thread. Tokio uses thread pools
48
/// so you'd need to run this function on every thread in the thread pool in
49
/// order to enable it everywhere.
50
0
pub fn set_metrics_enabled_for_this_thread(enabled: bool) {
51
0
    METRICS_ENABLED.with(|v| v.store(enabled, Ordering::Release));
52
0
}
53
54
#[derive(Default)]
55
pub struct FuncCounterWrapper {
56
    pub successes: AtomicU64,
57
    pub failures: AtomicU64,
58
}
59
60
impl FuncCounterWrapper {
61
    #[inline]
62
29
    pub fn wrap<T, E>(&self, func: impl FnOnce() -> Result<T, E>) -> Result<T, E> {
63
29
        let result = (func)();
64
29
        if result.is_ok() {
  Branch (64:12): [True: 1, False: 0]
  Branch (64:12): [True: 27, False: 1]
  Branch (64:12): [Folded - Ignored]
  Branch (64:12): [Folded - Ignored]
65
28
            self.successes.fetch_add(1, Ordering::Acquire);
66
28
        } else {
67
1
            self.failures.fetch_add(1, Ordering::Acquire);
68
1
        }
69
29
        result
70
29
    }
71
}
72
73
// Derive-macros have no way to tell the collector that the parent
74
// is now a group with the name of the group as the field so we
75
// can attach multiple values on the same group, so we need to
76
// manually implement the `MetricsComponent` trait to do so.
77
impl MetricsComponent for FuncCounterWrapper {
78
0
    fn publish(
79
0
        &self,
80
0
        _kind: MetricKind,
81
0
        field_metadata: MetricFieldData,
82
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
83
0
        let _enter = group!(field_metadata.name).entered();
84
0
85
0
        publish!(
86
0
            "successes",
87
0
            &self.successes,
88
0
            MetricKind::Counter,
89
0
            format!(
90
0
                "The number of times {} was successful.",
91
                field_metadata.name
92
            )
93
        );
94
0
        publish!(
95
0
            "failures",
96
0
            &self.failures,
97
0
            MetricKind::Counter,
98
0
            format!("The number of times {} failed.", field_metadata.name)
99
        );
100
101
0
        Ok(MetricPublishKnownKindData::Component)
102
0
    }
103
}
104
105
/// This is a utility that will only increment the referenced counter when it is dropped.
106
/// This struct is zero cost and has a runtime cost only when it is dropped.
107
/// This struct is very useful for tracking when futures are dropped.
108
struct DropCounter<'a> {
109
    counter: &'a AtomicU64,
110
}
111
112
impl<'a> DropCounter<'a> {
113
    #[inline]
114
0
    pub fn new(counter: &'a AtomicU64) -> Self {
115
0
        Self { counter }
116
0
    }
117
}
118
119
impl<'a> Drop for DropCounter<'a> {
120
    #[inline]
121
0
    fn drop(&mut self) {
122
0
        if !metrics_enabled() {
  Branch (122:12): [Folded - Ignored]
  Branch (122:12): [Folded - Ignored]
123
0
            return;
124
0
        }
125
0
        self.counter.fetch_add(1, Ordering::Acquire);
126
0
    }
127
}
128
129
pub struct AsyncTimer<'a> {
130
    start: Instant,
131
    drop_counter: DropCounter<'a>,
132
    counter: &'a AsyncCounterWrapper,
133
}
134
135
impl<'a> AsyncTimer<'a> {
136
    #[inline]
137
0
    pub fn measure(self) {
138
0
        if !metrics_enabled() {
  Branch (138:12): [Folded - Ignored]
  Branch (138:12): [Folded - Ignored]
139
0
            return;
140
0
        }
141
0
        self.counter
142
0
            .sum_func_duration_ns
143
0
            .fetch_add(self.start.elapsed().as_nanos() as u64, Ordering::Acquire);
144
0
        self.counter.calls.fetch_add(1, Ordering::Acquire);
145
0
        self.counter.successes.fetch_add(1, Ordering::Acquire);
146
0
        // This causes DropCounter's drop to never be called.
147
0
        forget(self.drop_counter);
148
0
    }
149
}
150
151
/// Tracks the number of calls, successes, failures, and drops of an async function.
152
/// call `.wrap(future)` to wrap a future and stats about the future are automatically
153
/// tracked and can be published to a `CollectorState`.
154
#[derive(Default)]
155
pub struct AsyncCounterWrapper {
156
    pub calls: AtomicU64,
157
    pub successes: AtomicU64,
158
    pub failures: AtomicU64,
159
    pub drops: AtomicU64,
160
    // Time spent in nano seconds in the future.
161
    // 64 bit address space gives ~584 years of nanoseconds.
162
    pub sum_func_duration_ns: AtomicU64,
163
}
164
165
// Derive-macros have no way to tell the collector that the parent
166
// is now a group with the name of the group as the field so we
167
// can attach multiple values on the same group, so we need to
168
// manually implement the `MetricsComponent` trait to do so.
169
impl MetricsComponent for AsyncCounterWrapper {
170
0
    fn publish(
171
0
        &self,
172
0
        _kind: MetricKind,
173
0
        field_metadata: MetricFieldData,
174
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
175
0
        let _enter = group!(field_metadata.name).entered();
176
0
177
0
        publish!(
178
0
            "calls",
179
0
            &self.calls,
180
0
            MetricKind::Counter,
181
0
            format!("The number of times {} was called.", field_metadata.name)
182
        );
183
0
        publish!(
184
0
            "successes",
185
0
            &self.successes,
186
0
            MetricKind::Counter,
187
0
            format!(
188
0
                "The number of times {} was successful.",
189
                field_metadata.name
190
            )
191
        );
192
0
        publish!(
193
0
            "failures",
194
0
            &self.failures,
195
0
            MetricKind::Counter,
196
0
            format!("The number of times {} failed.", field_metadata.name)
197
        );
198
0
        publish!(
199
0
            "drops",
200
0
            &self.drops,
201
0
            MetricKind::Counter,
202
0
            format!("The number of times {} was dropped.", field_metadata.name)
203
        );
204
0
        publish!(
205
0
            "sum_func_duration_ns",
206
0
            &self.sum_func_duration_ns,
207
0
            MetricKind::Counter,
208
0
            format!(
209
0
                "The sum of the time spent in nanoseconds in {}.",
210
                field_metadata.name
211
            )
212
        );
213
214
0
        Ok(MetricPublishKnownKindData::Component)
215
0
    }
216
}
217
218
impl AsyncCounterWrapper {
219
    #[inline]
220
0
    pub fn wrap_fn<'a, T: 'a, E>(
221
0
        &'a self,
222
0
        func: impl FnOnce() -> Result<T, E> + 'a,
223
0
    ) -> Result<T, E> {
224
0
        self.calls.fetch_add(1, Ordering::Acquire);
225
0
        let result = (func)();
226
0
        if result.is_ok() {
  Branch (226:12): [Folded - Ignored]
  Branch (226:12): [Folded - Ignored]
227
0
            self.successes.fetch_add(1, Ordering::Acquire);
228
0
        } else {
229
0
            self.failures.fetch_add(1, Ordering::Acquire);
230
0
        }
231
0
        result
232
0
    }
233
234
    #[inline]
235
98
    pub async fn wrap<'a, T, E, F: Future<Output = Result<T, E>> + 'a>(
236
98
        &'a self,
237
98
        future: F,
238
98
    ) -> Result<T, E> {
239
98
        if !metrics_enabled() {
  Branch (239:12): [Folded - Ignored]
  Branch (239:12): [True: 0, False: 4]
  Branch (239:12): [Folded - Ignored]
  Branch (239:12): [True: 0, False: 6]
  Branch (239:12): [True: 0, False: 7]
  Branch (239:12): [True: 0, False: 14]
  Branch (239:12): [True: 0, False: 7]
  Branch (239:12): [True: 0, False: 7]
  Branch (239:12): [True: 0, False: 7]
  Branch (239:12): [True: 0, False: 7]
  Branch (239:12): [True: 0, False: 5]
  Branch (239:12): [True: 0, False: 5]
  Branch (239:12): [True: 0, False: 5]
  Branch (239:12): [True: 0, False: 5]
  Branch (239:12): [True: 0, False: 5]
  Branch (239:12): [True: 0, False: 7]
  Branch (239:12): [True: 0, False: 7]
240
0
            return future.await;
241
98
        }
242
554
        let 
result98
=
self.wrap_no_capture_result(future)98
.await;
243
98
        if result.is_ok() {
  Branch (243:12): [Folded - Ignored]
  Branch (243:12): [True: 3, False: 1]
  Branch (243:12): [Folded - Ignored]
  Branch (243:12): [True: 6, False: 0]
  Branch (243:12): [True: 7, False: 0]
  Branch (243:12): [True: 14, False: 0]
  Branch (243:12): [True: 7, False: 0]
  Branch (243:12): [True: 7, False: 0]
  Branch (243:12): [True: 7, False: 0]
  Branch (243:12): [True: 7, False: 0]
  Branch (243:12): [True: 5, False: 0]
  Branch (243:12): [True: 5, False: 0]
  Branch (243:12): [True: 5, False: 0]
  Branch (243:12): [True: 5, False: 0]
  Branch (243:12): [True: 5, False: 0]
  Branch (243:12): [True: 7, False: 0]
  Branch (243:12): [True: 7, False: 0]
244
97
            self.successes.fetch_add(1, Ordering::Acquire);
245
97
        } else {
246
1
            self.failures.fetch_add(1, Ordering::Acquire);
247
1
        }
248
98
        result
249
98
    }
250
251
    #[inline]
252
98
    pub async fn wrap_no_capture_result<'a, T, F: Future<Output = T> + 'a>(
253
98
        &'a self,
254
98
        future: F,
255
98
    ) -> T {
256
98
        if !metrics_enabled() {
  Branch (256:12): [Folded - Ignored]
  Branch (256:12): [True: 0, False: 4]
  Branch (256:12): [Folded - Ignored]
  Branch (256:12): [True: 0, False: 6]
  Branch (256:12): [True: 0, False: 7]
  Branch (256:12): [True: 0, False: 14]
  Branch (256:12): [True: 0, False: 7]
  Branch (256:12): [True: 0, False: 7]
  Branch (256:12): [True: 0, False: 7]
  Branch (256:12): [True: 0, False: 7]
  Branch (256:12): [True: 0, False: 5]
  Branch (256:12): [True: 0, False: 5]
  Branch (256:12): [True: 0, False: 5]
  Branch (256:12): [True: 0, False: 5]
  Branch (256:12): [True: 0, False: 5]
  Branch (256:12): [True: 0, False: 7]
  Branch (256:12): [True: 0, False: 7]
  Branch (256:12): [True: 0, False: 0]
257
0
            return future.await;
258
98
        }
259
98
        self.calls.fetch_add(1, Ordering::Acquire);
260
98
        let drop_counter = DropCounter::new(&self.drops);
261
98
        let instant = Instant::now();
262
554
        let 
result98
=
future98
.await;
263
        // By default `drop_counter` will increment the drop counter when it goes out of scope.
264
        // This will ensure we don't increment the counter if we make it here with a zero cost.
265
98
        forget(drop_counter);
266
98
        self.sum_func_duration_ns
267
98
            .fetch_add(instant.elapsed().as_nanos() as u64, Ordering::Acquire);
268
98
        result
269
98
    }
270
271
    #[inline]
272
0
    pub fn begin_timer(&self) -> AsyncTimer<'_> {
273
0
        AsyncTimer {
274
0
            start: Instant::now(),
275
0
            drop_counter: DropCounter::new(&self.drops),
276
0
            counter: self,
277
0
        }
278
0
    }
279
}
280
281
/// Tracks an number.
282
#[derive(Default)]
283
pub struct Counter(AtomicU64);
284
285
impl Counter {
286
    #[inline]
287
0
    pub fn inc(&self) {
288
0
        self.add(1);
289
0
    }
290
291
    #[inline]
292
5.90k
    pub fn add(&self, value: u64) {
293
5.90k
        if !metrics_enabled() {
  Branch (293:12): [True: 0, False: 5.90k]
  Branch (293:12): [Folded - Ignored]
294
0
            return;
295
5.90k
        }
296
5.90k
        self.0.fetch_add(value, Ordering::Acquire);
297
5.90k
    }
298
299
    #[inline]
300
0
    pub fn sub(&self, value: u64) {
301
0
        if !metrics_enabled() {
  Branch (301:12): [Folded - Ignored]
  Branch (301:12): [Folded - Ignored]
302
0
            return;
303
0
        }
304
0
        self.0.fetch_sub(value, Ordering::Acquire);
305
0
    }
306
}
307
308
impl MetricsComponent for Counter {
309
0
    fn publish(
310
0
        &self,
311
0
        kind: MetricKind,
312
0
        field_metadata: MetricFieldData,
313
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
314
0
        self.0.publish(kind, field_metadata)
315
0
    }
316
}
317
318
/// Tracks an counter through time and the last time the counter was changed.
319
#[derive(Default)]
320
pub struct CounterWithTime {
321
    pub counter: AtomicU64,
322
    pub last_time: AtomicU64,
323
}
324
325
impl CounterWithTime {
326
    #[inline]
327
88
    pub fn inc(&self) {
328
88
        if !metrics_enabled() {
  Branch (328:12): [True: 0, False: 88]
  Branch (328:12): [Folded - Ignored]
329
0
            return;
330
88
        }
331
88
        self.counter.fetch_add(1, Ordering::Acquire);
332
88
        self.last_time.store(
333
88
            SystemTime::now()
334
88
                .duration_since(UNIX_EPOCH)
335
88
                .unwrap()
336
88
                .as_secs(),
337
88
            Ordering::Release,
338
88
        );
339
88
    }
340
}
341
342
// Derive-macros have no way to tell the collector that the parent
343
// is now a group with the name of the group as the field so we
344
// can attach multiple values on the same group, so we need to
345
// manually implement the `MetricsComponent` trait to do so.
346
impl MetricsComponent for CounterWithTime {
347
0
    fn publish(
348
0
        &self,
349
0
        _kind: MetricKind,
350
0
        field_metadata: MetricFieldData,
351
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
352
0
        let _enter = group!(field_metadata.name).entered();
353
0
354
0
        publish!(
355
0
            "counter",
356
0
            &self.counter,
357
0
            MetricKind::Counter,
358
0
            format!("Current count of {}.", field_metadata.name)
359
        );
360
0
        publish!(
361
0
            "last_time",
362
0
            &self.last_time,
363
0
            MetricKind::Counter,
364
0
            format!("Last timestamp {} was published.", field_metadata.name)
365
        );
366
367
0
        Ok(MetricPublishKnownKindData::Component)
368
0
    }
369
}