Coverage Report

Created: 2025-10-20 10:50

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/existence_cache_store.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::pin::Pin;
16
use std::borrow::Cow;
17
use std::sync::{Arc, Weak};
18
use std::time::SystemTime;
19
20
use async_trait::async_trait;
21
use futures::StreamExt;
22
use futures::stream::FuturesUnordered;
23
use nativelink_config::stores::{EvictionPolicy, ExistenceCacheSpec};
24
use nativelink_error::{Error, ResultExt, error_if};
25
use nativelink_metric::MetricsComponent;
26
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
27
use nativelink_util::common::DigestInfo;
28
use nativelink_util::evicting_map::{EvictingMap, LenEntry};
29
use nativelink_util::health_utils::{HealthStatus, HealthStatusIndicator};
30
use nativelink_util::instant_wrapper::InstantWrapper;
31
use nativelink_util::store_trait::{
32
    RemoveItemCallback, Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo,
33
};
34
use parking_lot::Mutex;
35
use tracing::{debug, info, trace};
36
37
#[derive(Clone, Debug)]
38
struct ExistenceItem(u64);
39
40
impl LenEntry for ExistenceItem {
41
    #[inline]
42
15
    fn len(&self) -> u64 {
43
15
        self.0
44
15
    }
45
46
    #[inline]
47
0
    fn is_empty(&self) -> bool {
48
0
        false
49
0
    }
50
}
51
52
#[derive(Debug, MetricsComponent)]
53
pub struct ExistenceCacheStore<I: InstantWrapper> {
54
    #[metric(group = "inner_store")]
55
    inner_store: Store,
56
    existence_cache: EvictingMap<DigestInfo, DigestInfo, ExistenceItem, I>,
57
58
    // We need to pause them temporarily when inserting into the inner store
59
    // as if it immediately expires them, we should only apply the remove callbacks
60
    // afterwards. If this is None, we're not pausing; if it's Some it's the location to
61
    // store them in temporarily
62
    pause_remove_callbacks: Mutex<Option<Vec<StoreKey<'static>>>>,
63
}
64
65
impl ExistenceCacheStore<SystemTime> {
66
4
    pub fn new(spec: &ExistenceCacheSpec, inner_store: Store) -> Arc<Self> {
67
4
        Self::new_with_time(spec, inner_store, SystemTime::now())
68
4
    }
69
}
70
71
impl<I: InstantWrapper> RemoveItemCallback for ExistenceCacheStore<I> {
72
2
    fn callback<'a>(
73
2
        &'a self,
74
2
        store_key: StoreKey<'a>,
75
2
    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
76
2
        debug!(?store_key, "Removing item from cache due to callback");
77
2
        let digest = store_key.borrow().into_digest();
78
2
        Box::pin(async move {
79
2
            let deleted_key = self.existence_cache.remove(&digest).await;
80
2
            if !deleted_key {
  Branch (80:16): [True: 0, False: 0]
  Branch (80:16): [True: 0, False: 1]
  Branch (80:16): [True: 0, False: 1]
  Branch (80:16): [Folded - Ignored]
81
0
                info!(?store_key, "Failed to delete key from cache on callback");
82
2
            }
83
2
        })
84
2
    }
85
}
86
87
#[derive(Debug)]
88
struct ExistenceCacheCallback<I: InstantWrapper> {
89
    cache: Weak<ExistenceCacheStore<I>>,
90
}
91
92
impl<I: InstantWrapper> RemoveItemCallback for ExistenceCacheCallback<I> {
93
2
    fn callback<'a>(
94
2
        &'a self,
95
2
        store_key: StoreKey<'a>,
96
2
    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
97
2
        let cache = self.cache.upgrade();
98
2
        if let Some(local_cache) = cache {
  Branch (98:16): [True: 1, False: 0]
  Branch (98:16): [True: 1, False: 0]
  Branch (98:16): [Folded - Ignored]
99
2
            if let Some(
callbacks1
) = local_cache.pause_remove_callbacks.lock().as_mut() {
  Branch (99:20): [True: 1, False: 0]
  Branch (99:20): [True: 0, False: 1]
  Branch (99:20): [Folded - Ignored]
100
1
                callbacks.push(store_key.into_owned());
101
1
            } else {
102
1
                let store_key = store_key.into_owned();
103
1
                return Box::pin(async move {
104
1
                    local_cache.callback(store_key).await;
105
1
                });
106
            }
107
        } else {
108
0
            debug!("Cache dropped, so not doing callback");
109
        }
110
1
        Box::pin(async {})
111
2
    }
112
}
113
114
impl<I: InstantWrapper> ExistenceCacheStore<I> {
115
5
    pub fn new_with_time(
116
5
        spec: &ExistenceCacheSpec,
117
5
        inner_store: Store,
118
5
        anchor_time: I,
119
5
    ) -> Arc<Self> {
120
5
        let empty_policy = EvictionPolicy::default();
121
5
        let eviction_policy = spec.eviction_policy.as_ref().unwrap_or(&empty_policy);
122
5
        let existence_cache_store = Arc::new(Self {
123
5
            inner_store,
124
5
            existence_cache: EvictingMap::new(eviction_policy, anchor_time),
125
5
            pause_remove_callbacks: Mutex::new(None),
126
5
        });
127
5
        let other_ref = Arc::downgrade(&existence_cache_store);
128
5
        existence_cache_store
129
5
            .inner_store
130
5
            .register_remove_callback(Arc::new(ExistenceCacheCallback { cache: other_ref }))
131
5
            .expect("Register remove callback should work");
132
5
        existence_cache_store
133
5
    }
134
135
4
    pub async fn exists_in_cache(&self, digest: &DigestInfo) -> bool {
136
4
        let mut results = [None];
137
4
        self.existence_cache
138
4
            .sizes_for_keys([digest], &mut results[..], true /* peek */)
139
4
            .await;
140
4
        results[0].is_some()
141
4
    }
142
143
1
    pub async fn remove_from_cache(&self, digest: &DigestInfo) {
144
1
        self.existence_cache.remove(digest).await;
145
1
    }
146
147
7
    async fn inner_has_with_results(
148
7
        self: Pin<&Self>,
149
7
        keys: &[DigestInfo],
150
7
        results: &mut [Option<u64>],
151
7
    ) -> Result<(), Error> {
152
7
        self.existence_cache
153
7
            .sizes_for_keys(keys, results, true /* peek */)
154
7
            .await;
155
156
7
        let not_cached_keys: Vec<_> = keys
157
7
            .iter()
158
7
            .zip(results.iter())
159
7
            .filter_map(|(digest, result)| result.map_or_else(|| Some(digest.into()), |_| None))
160
7
            .collect();
161
162
        // Hot path optimization when all keys are cached.
163
7
        if not_cached_keys.is_empty() {
  Branch (163:12): [True: 0, False: 0]
  Branch (163:12): [True: 0, False: 2]
  Branch (163:12): [True: 0, False: 5]
  Branch (163:12): [Folded - Ignored]
164
0
            return Ok(());
165
7
        }
166
167
        // Now query only the items not found in the cache.
168
7
        let mut inner_results = vec![None; not_cached_keys.len()];
169
7
        self.inner_store
170
7
            .has_with_results(&not_cached_keys, &mut inner_results)
171
7
            .await
172
7
            .err_tip(|| "In ExistenceCacheStore::inner_has_with_results")
?0
;
173
174
        // Insert found from previous query into our cache.
175
        {
176
            // Note: Sadly due to some weird lifetime issues we need to collect here, but
177
            // in theory we don't actually need to collect.
178
7
            let inserts = not_cached_keys
179
7
                .iter()
180
7
                .zip(inner_results.iter())
181
7
                .filter_map(|(key, result)| {
182
7
                    result.map(|size| (
key2
.
borrow2
().
into_digest2
(),
ExistenceItem(size)2
))
183
7
                })
184
7
                .collect::<Vec<_>>();
185
7
            drop(self.existence_cache.insert_many(inserts).await);
186
        }
187
188
        // Merge the results from the cache and the query.
189
        {
190
7
            let mut inner_results_iter = inner_results.into_iter();
191
            // We know at this point that any None in results was queried and will have
192
            // a result in inner_results_iter, so use this knowledge to fill in the results.
193
7
            for result in results.iter_mut() {
194
7
                if result.is_none() {
  Branch (194:20): [True: 0, False: 0]
  Branch (194:20): [True: 2, False: 0]
  Branch (194:20): [True: 5, False: 0]
  Branch (194:20): [Folded - Ignored]
195
7
                    *result = inner_results_iter
196
7
                        .next()
197
7
                        .expect("has_with_results returned less results than expected");
198
7
                
}0
199
            }
200
            // Ensure that there was no logic error by ensuring our iterator is not empty.
201
0
            error_if!(
202
7
                inner_results_iter.next().is_some(),
  Branch (202:17): [True: 0, False: 0]
  Branch (202:17): [True: 0, False: 2]
  Branch (202:17): [True: 0, False: 5]
  Branch (202:17): [Folded - Ignored]
203
                "has_with_results returned more results than expected"
204
            );
205
        }
206
207
7
        Ok(())
208
7
    }
209
}
210
211
#[async_trait]
212
impl<I: InstantWrapper> StoreDriver for ExistenceCacheStore<I> {
213
    async fn has_with_results(
214
        self: Pin<&Self>,
215
        digests: &[StoreKey<'_>],
216
        results: &mut [Option<u64>],
217
4
    ) -> Result<(), Error> {
218
        // TODO(palfrey) This is a bit of a hack to get around the lifetime issues with the
219
        // existence_cache. We need to convert the digests to owned values to be able to
220
        // insert them into the cache. In theory it should be able to elide this conversion
221
        // but it seems to be a bit tricky to get right.
222
        let digests: Vec<_> = digests
223
            .iter()
224
4
            .map(|key| key.borrow().into_digest())
225
            .collect();
226
        self.inner_has_with_results(&digests, results).await
227
4
    }
228
229
    async fn update(
230
        self: Pin<&Self>,
231
        key: StoreKey<'_>,
232
        mut reader: DropCloserReadHalf,
233
        size_info: UploadSizeInfo,
234
3
    ) -> Result<(), Error> {
235
        let digest = key.into_digest();
236
        let mut exists = [None];
237
        self.inner_has_with_results(&[digest], &mut exists)
238
            .await
239
            .err_tip(|| "In ExistenceCacheStore::update")?;
240
        if exists[0].is_some() {
241
            // We need to drain the reader to avoid the writer complaining that we dropped
242
            // the connection prematurely.
243
            reader
244
                .drain()
245
                .await
246
                .err_tip(|| "In ExistenceCacheStore::update")?;
247
            return Ok(());
248
        }
249
        {
250
            let mut locked_callbacks = self.pause_remove_callbacks.lock();
251
            if locked_callbacks.is_none() {
252
                locked_callbacks.replace(vec![]);
253
            }
254
        }
255
        trace!(?digest, "Inserting into inner cache");
256
        let result = self.inner_store.update(digest, reader, size_info).await;
257
        if result.is_ok() {
258
            trace!(?digest, "Inserting into existence cache");
259
            if let UploadSizeInfo::ExactSize(size) = size_info {
260
                let _ = self
261
                    .existence_cache
262
                    .insert(digest, ExistenceItem(size))
263
                    .await;
264
            }
265
        }
266
        {
267
            let maybe_keys = self.pause_remove_callbacks.lock().take();
268
            if let Some(keys) = maybe_keys {
269
                let mut callbacks: FuturesUnordered<_> = keys
270
                    .into_iter()
271
1
                    .map(|store_key| self.callback(store_key))
272
                    .collect();
273
                while callbacks.next().await.is_some() {}
274
            }
275
        }
276
        result
277
3
    }
278
279
    async fn get_part(
280
        self: Pin<&Self>,
281
        key: StoreKey<'_>,
282
        writer: &mut DropCloserWriteHalf,
283
        offset: u64,
284
        length: Option<u64>,
285
1
    ) -> Result<(), Error> {
286
        let digest = key.into_digest();
287
        let result = self
288
            .inner_store
289
            .get_part(digest, writer, offset, length)
290
            .await;
291
        if result.is_ok() {
292
            let _ = self
293
                .existence_cache
294
                .insert(digest, ExistenceItem(digest.size_bytes()))
295
                .await;
296
        }
297
        result
298
1
    }
299
300
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
301
0
        self
302
0
    }
303
304
0
    fn as_any<'a>(&'a self) -> &'a (dyn core::any::Any + Sync + Send + 'static) {
305
0
        self
306
0
    }
307
308
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> {
309
0
        self
310
0
    }
311
312
0
    fn register_remove_callback(
313
0
        self: Arc<Self>,
314
0
        callback: Arc<dyn RemoveItemCallback>,
315
0
    ) -> Result<(), Error> {
316
0
        self.inner_store.register_remove_callback(callback)
317
0
    }
318
}
319
320
#[async_trait]
321
impl<I: InstantWrapper> HealthStatusIndicator for ExistenceCacheStore<I> {
322
0
    fn get_name(&self) -> &'static str {
323
0
        "ExistenceCacheStore"
324
0
    }
325
326
0
    async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus {
327
        StoreDriver::check_health(Pin::new(self), namespace).await
328
0
    }
329
}