/build/source/nativelink-util/src/metrics_utils.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use std::mem::forget; |
16 | | use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; |
17 | | use std::thread_local; |
18 | | use std::time::{Instant, SystemTime, UNIX_EPOCH}; |
19 | | |
20 | | use futures::Future; |
21 | | use nativelink_metric::{ |
22 | | group, publish, MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent, |
23 | | }; |
24 | | |
25 | | thread_local! { |
26 | | /// This is a thread local variable that will enable or disable metrics for |
27 | | /// the current thread. This does not mean that metrics are "disabled" |
28 | | /// everywhere. It only means that metrics gathering for this specific thread |
29 | | /// will be disabled. Because tokio uses thread pools, if you change this |
30 | | /// value you'll need to change it on every thread tokio is using, often using |
31 | | /// the `tokio::runtime::Builder::on_thread_start` function. This field also |
32 | | /// does not mean that metrics cannot be pulled from the registry. It only |
33 | | /// removes the ability for metrics that are collected at runtime (hot path) |
34 | | /// from being collected. |
35 | | pub static METRICS_ENABLED: AtomicBool = const { AtomicBool::new(true) }; |
36 | | } |
37 | | |
38 | | #[inline] |
39 | 6.19k | pub fn metrics_enabled() -> bool { |
40 | 6.19k | METRICS_ENABLED.with( |
41 | 6.19k | #[inline] |
42 | 6.19k | |v| v.load(Ordering::Acquire), |
43 | 6.19k | ) |
44 | 6.19k | } |
45 | | |
46 | | /// This function will enable or disable metrics for the current thread. |
47 | | /// WARNING: This will only happen for this thread. Tokio uses thread pools |
48 | | /// so you'd need to run this function on every thread in the thread pool in |
49 | | /// order to enable it everywhere. |
50 | 0 | pub fn set_metrics_enabled_for_this_thread(enabled: bool) { |
51 | 0 | METRICS_ENABLED.with(|v| v.store(enabled, Ordering::Release)); |
52 | 0 | } |
53 | | |
54 | | #[derive(Default)] |
55 | | pub struct FuncCounterWrapper { |
56 | | pub successes: AtomicU64, |
57 | | pub failures: AtomicU64, |
58 | | } |
59 | | |
60 | | impl FuncCounterWrapper { |
61 | | #[inline] |
62 | 29 | pub fn wrap<T, E>(&self, func: impl FnOnce() -> Result<T, E>) -> Result<T, E> { |
63 | 29 | let result = (func)(); |
64 | 29 | if result.is_ok() { Branch (64:12): [True: 1, False: 0]
Branch (64:12): [True: 27, False: 1]
Branch (64:12): [Folded - Ignored]
Branch (64:12): [Folded - Ignored]
|
65 | 28 | self.successes.fetch_add(1, Ordering::Acquire); |
66 | 28 | } else { |
67 | 1 | self.failures.fetch_add(1, Ordering::Acquire); |
68 | 1 | } |
69 | 29 | result |
70 | 29 | } |
71 | | } |
72 | | |
73 | | // Derive-macros have no way to tell the collector that the parent |
74 | | // is now a group with the name of the group as the field so we |
75 | | // can attach multiple values on the same group, so we need to |
76 | | // manually implement the `MetricsComponent` trait to do so. |
77 | | impl MetricsComponent for FuncCounterWrapper { |
78 | 0 | fn publish( |
79 | 0 | &self, |
80 | 0 | _kind: MetricKind, |
81 | 0 | field_metadata: MetricFieldData, |
82 | 0 | ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> { |
83 | 0 | let _enter = group!(field_metadata.name).entered(); |
84 | 0 |
|
85 | 0 | publish!( |
86 | 0 | "successes", |
87 | 0 | &self.successes, |
88 | 0 | MetricKind::Counter, |
89 | 0 | format!( |
90 | 0 | "The number of times {} was successful.", |
91 | | field_metadata.name |
92 | | ) |
93 | | ); |
94 | 0 | publish!( |
95 | 0 | "failures", |
96 | 0 | &self.failures, |
97 | 0 | MetricKind::Counter, |
98 | 0 | format!("The number of times {} failed.", field_metadata.name) |
99 | | ); |
100 | | |
101 | 0 | Ok(MetricPublishKnownKindData::Component) |
102 | 0 | } |
103 | | } |
104 | | |
105 | | /// This is a utility that will only increment the referenced counter when it is dropped. |
106 | | /// This struct is zero cost and has a runtime cost only when it is dropped. |
107 | | /// This struct is very useful for tracking when futures are dropped. |
108 | | struct DropCounter<'a> { |
109 | | counter: &'a AtomicU64, |
110 | | } |
111 | | |
112 | | impl<'a> DropCounter<'a> { |
113 | | #[inline] |
114 | 103 | pub fn new(counter: &'a AtomicU64) -> Self { |
115 | 103 | Self { counter } |
116 | 103 | } |
117 | | } |
118 | | |
119 | | impl Drop for DropCounter<'_> { |
120 | | #[inline] |
121 | 0 | fn drop(&mut self) { |
122 | 0 | if !metrics_enabled() { Branch (122:12): [Folded - Ignored]
Branch (122:12): [Folded - Ignored]
|
123 | 0 | return; |
124 | 0 | } |
125 | 0 | self.counter.fetch_add(1, Ordering::Acquire); |
126 | 0 | } |
127 | | } |
128 | | |
129 | | pub struct AsyncTimer<'a> { |
130 | | start: Instant, |
131 | | drop_counter: DropCounter<'a>, |
132 | | counter: &'a AsyncCounterWrapper, |
133 | | } |
134 | | |
135 | | impl AsyncTimer<'_> { |
136 | | #[inline] |
137 | 0 | pub fn measure(self) { |
138 | 0 | if !metrics_enabled() { Branch (138:12): [Folded - Ignored]
|
139 | 0 | return; |
140 | 0 | } |
141 | 0 | self.counter |
142 | 0 | .sum_func_duration_ns |
143 | 0 | .fetch_add(self.start.elapsed().as_nanos() as u64, Ordering::Acquire); |
144 | 0 | self.counter.calls.fetch_add(1, Ordering::Acquire); |
145 | 0 | self.counter.successes.fetch_add(1, Ordering::Acquire); |
146 | 0 | // This causes DropCounter's drop to never be called. |
147 | 0 | forget(self.drop_counter); |
148 | 0 | } |
149 | | } |
150 | | |
151 | | /// Tracks the number of calls, successes, failures, and drops of an async function. |
152 | | /// call `.wrap(future)` to wrap a future and stats about the future are automatically |
153 | | /// tracked and can be published to a `CollectorState`. |
154 | | #[derive(Default)] |
155 | | pub struct AsyncCounterWrapper { |
156 | | pub calls: AtomicU64, |
157 | | pub successes: AtomicU64, |
158 | | pub failures: AtomicU64, |
159 | | pub drops: AtomicU64, |
160 | | // Time spent in nanoseconds in the future. |
161 | | // 64 bit address space gives ~584 years of nanoseconds. |
162 | | pub sum_func_duration_ns: AtomicU64, |
163 | | } |
164 | | |
165 | | // Derive-macros have no way to tell the collector that the parent |
166 | | // is now a group with the name of the group as the field so we |
167 | | // can attach multiple values on the same group, so we need to |
168 | | // manually implement the `MetricsComponent` trait to do so. |
169 | | impl MetricsComponent for AsyncCounterWrapper { |
170 | 0 | fn publish( |
171 | 0 | &self, |
172 | 0 | _kind: MetricKind, |
173 | 0 | field_metadata: MetricFieldData, |
174 | 0 | ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> { |
175 | 0 | let _enter = group!(field_metadata.name).entered(); |
176 | 0 |
|
177 | 0 | publish!( |
178 | 0 | "calls", |
179 | 0 | &self.calls, |
180 | 0 | MetricKind::Counter, |
181 | 0 | format!("The number of times {} was called.", field_metadata.name) |
182 | | ); |
183 | 0 | publish!( |
184 | 0 | "successes", |
185 | 0 | &self.successes, |
186 | 0 | MetricKind::Counter, |
187 | 0 | format!( |
188 | 0 | "The number of times {} was successful.", |
189 | | field_metadata.name |
190 | | ) |
191 | | ); |
192 | 0 | publish!( |
193 | 0 | "failures", |
194 | 0 | &self.failures, |
195 | 0 | MetricKind::Counter, |
196 | 0 | format!("The number of times {} failed.", field_metadata.name) |
197 | | ); |
198 | 0 | publish!( |
199 | 0 | "drops", |
200 | 0 | &self.drops, |
201 | 0 | MetricKind::Counter, |
202 | 0 | format!("The number of times {} was dropped.", field_metadata.name) |
203 | | ); |
204 | 0 | publish!( |
205 | 0 | "sum_func_duration_ns", |
206 | 0 | &self.sum_func_duration_ns, |
207 | 0 | MetricKind::Counter, |
208 | 0 | format!( |
209 | 0 | "The sum of the time spent in nanoseconds in {}.", |
210 | | field_metadata.name |
211 | | ) |
212 | | ); |
213 | | |
214 | 0 | Ok(MetricPublishKnownKindData::Component) |
215 | 0 | } |
216 | | } |
217 | | |
218 | | impl AsyncCounterWrapper { |
219 | | #[inline] |
220 | 0 | pub fn wrap_fn<'a, T: 'a, E>( |
221 | 0 | &'a self, |
222 | 0 | func: impl FnOnce() -> Result<T, E> + 'a, |
223 | 0 | ) -> Result<T, E> { |
224 | 0 | self.calls.fetch_add(1, Ordering::Acquire); |
225 | 0 | let result = (func)(); |
226 | 0 | if result.is_ok() { Branch (226:12): [Folded - Ignored]
Branch (226:12): [Folded - Ignored]
|
227 | 0 | self.successes.fetch_add(1, Ordering::Acquire); |
228 | 0 | } else { |
229 | 0 | self.failures.fetch_add(1, Ordering::Acquire); |
230 | 0 | } |
231 | 0 | result |
232 | 0 | } |
233 | | |
234 | | #[inline] |
235 | 98 | pub async fn wrap<'a, T, E, F: Future<Output = Result<T, E>> + 'a>( |
236 | 98 | &'a self, |
237 | 98 | future: F, |
238 | 98 | ) -> Result<T, E> { |
239 | 98 | if !metrics_enabled() { Branch (239:12): [Folded - Ignored]
Branch (239:12): [True: 0, False: 4]
Branch (239:12): [Folded - Ignored]
Branch (239:12): [True: 0, False: 6]
Branch (239:12): [True: 0, False: 5]
Branch (239:12): [True: 0, False: 5]
Branch (239:12): [True: 0, False: 5]
Branch (239:12): [True: 0, False: 14]
Branch (239:12): [True: 0, False: 7]
Branch (239:12): [True: 0, False: 7]
Branch (239:12): [True: 0, False: 7]
Branch (239:12): [True: 0, False: 7]
Branch (239:12): [True: 0, False: 7]
Branch (239:12): [True: 0, False: 5]
Branch (239:12): [True: 0, False: 5]
Branch (239:12): [True: 0, False: 7]
Branch (239:12): [True: 0, False: 7]
|
240 | 0 | return future.await; |
241 | 98 | } |
242 | 98 | let result = self.wrap_no_capture_result(future).await; |
243 | 98 | if result.is_ok() { Branch (243:12): [Folded - Ignored]
Branch (243:12): [True: 3, False: 1]
Branch (243:12): [Folded - Ignored]
Branch (243:12): [True: 6, False: 0]
Branch (243:12): [True: 5, False: 0]
Branch (243:12): [True: 5, False: 0]
Branch (243:12): [True: 5, False: 0]
Branch (243:12): [True: 14, False: 0]
Branch (243:12): [True: 7, False: 0]
Branch (243:12): [True: 7, False: 0]
Branch (243:12): [True: 7, False: 0]
Branch (243:12): [True: 7, False: 0]
Branch (243:12): [True: 7, False: 0]
Branch (243:12): [True: 5, False: 0]
Branch (243:12): [True: 5, False: 0]
Branch (243:12): [True: 7, False: 0]
Branch (243:12): [True: 7, False: 0]
|
244 | 97 | self.successes.fetch_add(1, Ordering::Acquire); |
245 | 97 | } else { |
246 | 1 | self.failures.fetch_add(1, Ordering::Acquire); |
247 | 1 | } |
248 | 98 | result |
249 | 98 | } |
250 | | |
251 | | #[inline] |
252 | 98 | pub async fn wrap_no_capture_result<'a, T, F: Future<Output = T> + 'a>( |
253 | 98 | &'a self, |
254 | 98 | future: F, |
255 | 98 | ) -> T { |
256 | 98 | if !metrics_enabled() { Branch (256:12): [Folded - Ignored]
Branch (256:12): [True: 0, False: 4]
Branch (256:12): [Folded - Ignored]
Branch (256:12): [True: 0, False: 6]
Branch (256:12): [True: 0, False: 5]
Branch (256:12): [True: 0, False: 5]
Branch (256:12): [True: 0, False: 5]
Branch (256:12): [True: 0, False: 14]
Branch (256:12): [True: 0, False: 7]
Branch (256:12): [True: 0, False: 7]
Branch (256:12): [True: 0, False: 7]
Branch (256:12): [True: 0, False: 7]
Branch (256:12): [True: 0, False: 7]
Branch (256:12): [True: 0, False: 5]
Branch (256:12): [True: 0, False: 5]
Branch (256:12): [True: 0, False: 7]
Branch (256:12): [True: 0, False: 7]
Branch (256:12): [True: 0, False: 0]
|
257 | 0 | return future.await; |
258 | 98 | } |
259 | 98 | self.calls.fetch_add(1, Ordering::Acquire); |
260 | 98 | let drop_counter = DropCounter::new(&self.drops); |
261 | 98 | let instant = Instant::now(); |
262 | 98 | let result = future.await; |
263 | | // By default `drop_counter` will increment the drop counter when it goes out of scope. |
264 | | // This will ensure we don't increment the counter if we make it here with a zero cost. |
265 | 98 | forget(drop_counter); |
266 | 98 | self.sum_func_duration_ns |
267 | 98 | .fetch_add(instant.elapsed().as_nanos() as u64, Ordering::Acquire); |
268 | 98 | result |
269 | 98 | } |
270 | | |
271 | | #[inline] |
272 | 0 | pub fn begin_timer(&self) -> AsyncTimer<'_> { |
273 | 0 | AsyncTimer { |
274 | 0 | start: Instant::now(), |
275 | 0 | drop_counter: DropCounter::new(&self.drops), |
276 | 0 | counter: self, |
277 | 0 | } |
278 | 0 | } |
279 | | } |
280 | | |
281 | | /// Tracks a number. |
282 | | #[derive(Default)] |
283 | | pub struct Counter(AtomicU64); |
284 | | |
285 | | impl Counter { |
286 | | #[inline] |
287 | 0 | pub fn inc(&self) { |
288 | 0 | self.add(1); |
289 | 0 | } |
290 | | |
291 | | #[inline] |
292 | 5.90k | pub fn add(&self, value: u64) { |
293 | 5.90k | if !metrics_enabled() { Branch (293:12): [True: 0, False: 5.90k]
Branch (293:12): [Folded - Ignored]
|
294 | 0 | return; |
295 | 5.90k | } |
296 | 5.90k | self.0.fetch_add(value, Ordering::Acquire); |
297 | 5.90k | } |
298 | | |
299 | | #[inline] |
300 | 0 | pub fn sub(&self, value: u64) { |
301 | 0 | if !metrics_enabled() { Branch (301:12): [Folded - Ignored]
Branch (301:12): [Folded - Ignored]
|
302 | 0 | return; |
303 | 0 | } |
304 | 0 | self.0.fetch_sub(value, Ordering::Acquire); |
305 | 0 | } |
306 | | } |
307 | | |
308 | | impl MetricsComponent for Counter { |
309 | 0 | fn publish( |
310 | 0 | &self, |
311 | 0 | kind: MetricKind, |
312 | 0 | field_metadata: MetricFieldData, |
313 | 0 | ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> { |
314 | 0 | self.0.publish(kind, field_metadata) |
315 | 0 | } |
316 | | } |
317 | | |
318 | | /// Tracks an counter through time and the last time the counter was changed. |
319 | | #[derive(Default)] |
320 | | pub struct CounterWithTime { |
321 | | pub counter: AtomicU64, |
322 | | pub last_time: AtomicU64, |
323 | | } |
324 | | |
325 | | impl CounterWithTime { |
326 | | #[inline] |
327 | 88 | pub fn inc(&self) { |
328 | 88 | if !metrics_enabled() { Branch (328:12): [True: 0, False: 88]
Branch (328:12): [Folded - Ignored]
|
329 | 0 | return; |
330 | 88 | } |
331 | 88 | self.counter.fetch_add(1, Ordering::Acquire); |
332 | 88 | self.last_time.store( |
333 | 88 | SystemTime::now() |
334 | 88 | .duration_since(UNIX_EPOCH) |
335 | 88 | .unwrap() |
336 | 88 | .as_secs(), |
337 | 88 | Ordering::Release, |
338 | 88 | ); |
339 | 88 | } |
340 | | } |
341 | | |
342 | | // Derive-macros have no way to tell the collector that the parent |
343 | | // is now a group with the name of the group as the field so we |
344 | | // can attach multiple values on the same group, so we need to |
345 | | // manually implement the `MetricsComponent` trait to do so. |
346 | | impl MetricsComponent for CounterWithTime { |
347 | 0 | fn publish( |
348 | 0 | &self, |
349 | 0 | _kind: MetricKind, |
350 | 0 | field_metadata: MetricFieldData, |
351 | 0 | ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> { |
352 | 0 | let _enter = group!(field_metadata.name).entered(); |
353 | 0 |
|
354 | 0 | publish!( |
355 | 0 | "counter", |
356 | 0 | &self.counter, |
357 | 0 | MetricKind::Counter, |
358 | 0 | format!("Current count of {}.", field_metadata.name) |
359 | | ); |
360 | 0 | publish!( |
361 | 0 | "last_time", |
362 | 0 | &self.last_time, |
363 | 0 | MetricKind::Counter, |
364 | 0 | format!("Last timestamp {} was published.", field_metadata.name) |
365 | | ); |
366 | | |
367 | 0 | Ok(MetricPublishKnownKindData::Component) |
368 | 0 | } |
369 | | } |