Coverage Report

Created: 2026-06-04 10:48

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/cache_metrics_store.rs
Line
Count
Source
1
// Copyright 2026 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::borrow::BorrowMut;
16
use core::ops::Bound;
17
use core::pin::Pin;
18
use std::ffi::OsString;
19
use std::sync::Arc;
20
use std::time::Instant;
21
22
use async_trait::async_trait;
23
use bytes::Bytes;
24
use nativelink_config::stores::CacheMetricsSpec;
25
use nativelink_error::{Code, Error};
26
use nativelink_metric::{
27
    MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent, group, publish,
28
};
29
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
30
use nativelink_util::fs;
31
use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatusIndicator};
32
use nativelink_util::metrics::{CACHE_METRICS, CACHE_TYPE, CacheMetricAttrs};
33
use nativelink_util::store_trait::{
34
    RemoveItemCallback, Store, StoreDriver, StoreKey, StoreLike, StoreOptimizations, UploadSizeInfo,
35
};
36
use opentelemetry::KeyValue;
37
38
#[derive(Debug)]
39
pub struct CacheMetricsStore {
40
    backend: Store,
41
    cache_type: String,
42
    attrs: CacheMetricAttrs,
43
}
44
45
impl CacheMetricsStore {
46
2
    pub fn new(spec: &CacheMetricsSpec, backend: Store) -> Arc<Self> {
47
2
        Arc::new(Self {
48
2
            backend,
49
2
            cache_type: spec.cache_type.clone(),
50
2
            attrs: CacheMetricAttrs::new(&[KeyValue::new(CACHE_TYPE, spec.cache_type.clone())]),
51
2
        })
52
2
    }
53
54
4
    fn duration_ms(start: Instant) -> f64 {
55
4
        start.elapsed().as_secs_f64() * 1000.0
56
4
    }
57
58
0
    const fn size_info_bytes(size_info: UploadSizeInfo) -> Option<u64> {
59
0
        match size_info {
60
0
            UploadSizeInfo::ExactSize(size) => Some(size),
61
0
            UploadSizeInfo::MaxSize(_) => None,
62
        }
63
0
    }
64
65
4
    fn record_duration(&self, start: Instant, attrs: &[KeyValue]) {
66
4
        CACHE_METRICS
67
4
            .cache_operation_duration
68
4
            .record(Self::duration_ms(start), attrs);
69
4
    }
70
71
1
    fn record_write_io(&self, bytes: Option<u64>) {
72
1
        if let Some(bytes) = bytes {
73
1
            CACHE_METRICS
74
1
                .cache_io
75
1
                .add(bytes, self.attrs.write_success());
76
1
            CACHE_METRICS
77
1
                .cache_entry_size
78
1
                .record(bytes, self.attrs.write_success());
79
1
        
}0
80
1
    }
81
}
82
83
impl MetricsComponent for CacheMetricsStore {
84
0
    fn publish(
85
0
        &self,
86
0
        _kind: MetricKind,
87
0
        _field_metadata: MetricFieldData,
88
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
89
0
        publish!(
90
0
            "cache_type",
91
0
            &self.cache_type,
92
0
            MetricKind::String,
93
0
            "Low-cardinality cache type label emitted on cache metrics"
94
        );
95
0
        let _enter = group!("backend").entered();
96
0
        self.backend
97
0
            .publish(MetricKind::Component, MetricFieldData::default())?;
98
0
        Ok(MetricPublishKnownKindData::Component)
99
0
    }
100
}
101
102
#[async_trait]
103
impl StoreDriver for CacheMetricsStore {
104
    async fn has_with_results(
105
        self: Pin<&Self>,
106
        keys: &[StoreKey<'_>],
107
        results: &mut [Option<u64>],
108
1
    ) -> Result<(), Error> {
109
        let start = Instant::now();
110
        let result = self.backend.has_with_results(keys, results).await;
111
        if result.is_ok() {
112
1
            let hits = results.iter().filter(|result| result.is_some()).count();
113
            let misses = results.len().saturating_sub(hits);
114
            if hits > 0 {
115
                CACHE_METRICS
116
                    .cache_operations
117
                    .add(hits as u64, self.attrs.read_hit());
118
            }
119
            if misses > 0 {
120
                CACHE_METRICS
121
                    .cache_operations
122
                    .add(misses as u64, self.attrs.read_miss());
123
            }
124
            let duration_attrs = if hits > 0 {
125
                self.attrs.read_hit()
126
            } else {
127
                self.attrs.read_miss()
128
            };
129
            self.record_duration(start, duration_attrs);
130
        } else {
131
            CACHE_METRICS
132
                .cache_operations
133
                .add(keys.len() as u64, self.attrs.read_error());
134
            self.record_duration(start, self.attrs.read_error());
135
        }
136
        result
137
1
    }
138
139
    async fn list(
140
        self: Pin<&Self>,
141
        range: (Bound<StoreKey<'_>>, Bound<StoreKey<'_>>),
142
        handler: &mut (dyn for<'a> FnMut(&'a StoreKey) -> bool + Send + Sync + '_),
143
0
    ) -> Result<u64, Error> {
144
        self.backend.list(range, handler).await
145
0
    }
146
147
    async fn update(
148
        self: Pin<&Self>,
149
        key: StoreKey<'_>,
150
        reader: DropCloserReadHalf,
151
        upload_size: UploadSizeInfo,
152
0
    ) -> Result<u64, Error> {
153
        let start = Instant::now();
154
        let result = self.backend.update(key, reader, upload_size).await;
155
        if let Ok(size) = &result {
156
            CACHE_METRICS
157
                .cache_operations
158
                .add(1, self.attrs.write_success());
159
            self.record_write_io(Some(*size));
160
            self.record_duration(start, self.attrs.write_success());
161
        } else {
162
            CACHE_METRICS
163
                .cache_operations
164
                .add(1, self.attrs.write_error());
165
            self.record_duration(start, self.attrs.write_error());
166
        }
167
        result
168
0
    }
169
170
0
    fn optimized_for(&self, optimization: StoreOptimizations) -> bool {
171
0
        self.backend.optimized_for(optimization)
172
0
    }
173
174
    async fn update_with_whole_file(
175
        self: Pin<&Self>,
176
        key: StoreKey<'_>,
177
        path: OsString,
178
        file: fs::FileSlot,
179
        upload_size: UploadSizeInfo,
180
0
    ) -> Result<(u64, Option<fs::FileSlot>), Error> {
181
        let start = Instant::now();
182
        let bytes = Self::size_info_bytes(upload_size);
183
        let result = self
184
            .backend
185
            .update_with_whole_file(key, path, file, upload_size)
186
            .await;
187
        if result.is_ok() {
188
            CACHE_METRICS
189
                .cache_operations
190
                .add(1, self.attrs.write_success());
191
            self.record_write_io(bytes);
192
            self.record_duration(start, self.attrs.write_success());
193
        } else {
194
            CACHE_METRICS
195
                .cache_operations
196
                .add(1, self.attrs.write_error());
197
            self.record_duration(start, self.attrs.write_error());
198
        }
199
        result
200
0
    }
201
202
1
    async fn update_oneshot(self: Pin<&Self>, key: StoreKey<'_>, data: Bytes) -> Result<(), Error> {
203
        let start = Instant::now();
204
        let bytes = data.len() as u64;
205
        let result = self.backend.update_oneshot(key, data).await;
206
        if result.is_ok() {
207
            CACHE_METRICS
208
                .cache_operations
209
                .add(1, self.attrs.write_success());
210
            self.record_write_io(Some(bytes));
211
            self.record_duration(start, self.attrs.write_success());
212
        } else {
213
            CACHE_METRICS
214
                .cache_operations
215
                .add(1, self.attrs.write_error());
216
            self.record_duration(start, self.attrs.write_error());
217
        }
218
        result
219
1
    }
220
221
    async fn get_part(
222
        self: Pin<&Self>,
223
        key: StoreKey<'_>,
224
        writer: &mut DropCloserWriteHalf,
225
        offset: u64,
226
        length: Option<u64>,
227
2
    ) -> Result<(), Error> {
228
        let start = Instant::now();
229
        let result = self
230
            .backend
231
            .get_part(key, writer.borrow_mut(), offset, length)
232
            .await;
233
        match &result {
234
            Ok(()) => {
235
                CACHE_METRICS.cache_operations.add(1, self.attrs.read_hit());
236
                CACHE_METRICS
237
                    .cache_io
238
                    .add(writer.get_bytes_written(), self.attrs.read_hit());
239
                self.record_duration(start, self.attrs.read_hit());
240
            }
241
            Err(err) if err.code == Code::NotFound => {
242
                CACHE_METRICS
243
                    .cache_operations
244
                    .add(1, self.attrs.read_miss());
245
                self.record_duration(start, self.attrs.read_miss());
246
            }
247
            Err(_) => {
248
                CACHE_METRICS
249
                    .cache_operations
250
                    .add(1, self.attrs.read_error());
251
                self.record_duration(start, self.attrs.read_error());
252
            }
253
        }
254
        result
255
2
    }
256
257
1
    fn inner_store(&self, _key: Option<StoreKey>) -> &dyn StoreDriver {
258
1
        self
259
1
    }
260
261
1
    fn as_any(&self) -> &(dyn core::any::Any + Sync + Send + 'static) {
262
1
        self
263
1
    }
264
265
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> {
266
0
        self
267
0
    }
268
269
0
    fn register_health(self: Arc<Self>, registry: &mut HealthRegistryBuilder) {
270
0
        self.backend.clone().register_health(registry);
271
0
    }
272
273
0
    fn register_remove_callback(
274
0
        self: Arc<Self>,
275
0
        callback: Arc<dyn RemoveItemCallback>,
276
0
    ) -> Result<(), Error> {
277
0
        self.backend.register_remove_callback(callback)
278
0
    }
279
}
280
281
#[async_trait]
282
impl HealthStatusIndicator for CacheMetricsStore {
283
0
    fn get_name(&self) -> &'static str {
284
0
        "CacheMetricsStore"
285
0
    }
286
287
    async fn check_health(
288
        &self,
289
        namespace: std::borrow::Cow<'static, str>,
290
0
    ) -> nativelink_util::health_utils::HealthStatus {
291
        self.backend
292
            .as_store_driver_pin()
293
            .check_health(namespace)
294
            .await
295
0
    }
296
}