Coverage Report

Created: 2026-05-23 21:09

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/evicting_map.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::borrow::Borrow;
16
use core::cmp::Eq;
17
use core::fmt::Debug;
18
use core::future::Future;
19
use core::hash::Hash;
20
use core::marker::PhantomData;
21
use core::ops::RangeBounds;
22
use core::pin::Pin;
23
use std::collections::BTreeSet;
24
use std::sync::Arc;
25
26
use futures::StreamExt;
27
use futures::stream::FuturesUnordered;
28
use lru::LruCache;
29
use nativelink_config::stores::EvictionPolicy;
30
use nativelink_metric::MetricsComponent;
31
use parking_lot::Mutex;
32
use serde::{Deserialize, Serialize};
33
use tracing::{debug, info};
34
35
use crate::instant_wrapper::InstantWrapper;
36
use crate::metrics_utils::{Counter, CounterWithTime};
37
38
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone)]
39
pub struct SerializedLRU<K> {
40
    pub data: Vec<(K, i32)>,
41
    pub anchor_time: u64,
42
}
43
44
#[derive(Debug)]
45
struct EvictionItem<T: LenEntry + Debug> {
46
    seconds_since_anchor: i32,
47
    data: T,
48
}
49
50
pub trait LenEntry: 'static {
51
    /// Length of referenced data.
52
    fn len(&self) -> u64;
53
54
    /// Returns `true` if `self` has zero length.
55
    fn is_empty(&self) -> bool;
56
57
    /// This will be called when object is removed from map.
58
    /// Note: There may still be a reference to it held somewhere else, which
59
    /// is why it can't be mutable. This is a good place to mark the item
60
    /// to be deleted and then in the Drop call actually do the deleting.
61
    /// This will ensure nowhere else in the program still holds a reference
62
    /// to this object.
63
    /// You should not rely only on the Drop trait. Doing so might result in the
64
    /// program safely shutting down and calling the Drop method on each object,
65
    /// which if you are deleting items you may not want to do.
66
    /// It is undefined behavior to have `unref()` called more than once.
67
    /// During the execution of `unref()` no items can be added or removed to/from
68
    /// the `EvictionMap` globally (including inside `unref()`).
69
    #[inline]
70
52
    fn unref(&self) -> impl Future<Output = ()> + Send {
71
52
        core::future::ready(())
72
52
    }
73
}
74
75
impl<T: LenEntry + Send + Sync> LenEntry for Arc<T> {
76
    #[inline]
77
241
    fn len(&self) -> u64 {
78
241
        T::len(self.as_ref())
79
241
    }
80
81
    #[inline]
82
0
    fn is_empty(&self) -> bool {
83
0
        T::is_empty(self.as_ref())
84
0
    }
85
86
    #[inline]
87
13
    async fn unref(&self) {
88
13
        self.as_ref().unref().await;
89
13
    }
90
}
91
92
// Callback to be called when the EvictingMap removes an item
93
// either via eviction or direct deletion. This will be called with
94
// whatever key type the EvictingMap uses.
95
pub trait RemoveItemCallback<Q>: Debug + Send + Sync {
96
    fn callback(&self, store_key: &Q) -> Pin<Box<dyn Future<Output = ()> + Send>>;
97
}
98
99
#[derive(Debug, MetricsComponent)]
100
struct State<
101
    K: Ord + Hash + Eq + Clone + Debug + Send + Borrow<Q>,
102
    Q: Ord + Hash + Eq + Debug,
103
    T: LenEntry + Debug + Send,
104
    C: RemoveItemCallback<Q>,
105
> {
106
    lru: LruCache<K, EvictionItem<T>>,
107
    btree: Option<BTreeSet<K>>,
108
    #[metric(help = "Total size of all items in the store")]
109
    sum_store_size: u64,
110
111
    #[metric(help = "Number of bytes evicted from the store")]
112
    evicted_bytes: Counter,
113
    #[metric(help = "Number of items evicted from the store")]
114
    evicted_items: CounterWithTime,
115
    #[metric(help = "Number of bytes replaced in the store")]
116
    replaced_bytes: Counter,
117
    #[metric(help = "Number of items replaced in the store")]
118
    replaced_items: CounterWithTime,
119
    #[metric(help = "Number of bytes inserted into the store since it was created")]
120
    lifetime_inserted_bytes: Counter,
121
122
    _key_type: PhantomData<Q>,
123
    remove_callbacks: Vec<C>,
124
}
125
126
type RemoveFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
127
128
impl<
129
    K: Ord + Hash + Eq + Clone + Debug + Send + Sync + Borrow<Q>,
130
    Q: Ord + Hash + Eq + Debug + Sync,
131
    T: LenEntry + Debug + Sync + Send,
132
    C: RemoveItemCallback<Q>,
133
> State<K, Q, T, C>
134
{
135
    /// Removes an item from the cache and returns the data for deferred cleanup.
136
    /// The caller is responsible for calling `unref()` on the returned data outside of the lock.
137
    #[must_use]
138
65
    fn remove(
139
65
        &mut self,
140
65
        key: &Q,
141
65
        eviction_item: &EvictionItem<T>,
142
65
        replaced: bool,
143
65
    ) -> (T, Vec<RemoveFuture>)
144
65
    where
145
65
        T: Clone,
146
    {
147
65
        if let Some(
btree0
) = &mut self.btree {
148
0
            btree.remove(key);
149
65
        }
150
65
        self.sum_store_size -= eviction_item.data.len();
151
65
        if replaced {
152
30
            self.replaced_items.inc();
153
30
            self.replaced_bytes.add(eviction_item.data.len());
154
35
        } else {
155
35
            self.evicted_items.inc();
156
35
            self.evicted_bytes.add(eviction_item.data.len());
157
35
        }
158
159
65
        let callbacks = self
160
65
            .remove_callbacks
161
65
            .iter()
162
65
            .map(|callback| 
callback2
.
callback2
(
key2
))
163
65
            .collect();
164
165
        // Return the data for deferred unref outside of lock
166
65
        (eviction_item.data.clone(), callbacks)
167
65
    }
168
169
    /// Inserts a new item into the cache. If the key already exists, the old item is returned
170
    /// for deferred cleanup.
171
    #[must_use]
172
6.01k
    fn put(&mut self, key: &K, eviction_item: EvictionItem<T>) -> Option<(T, Vec<RemoveFuture>)>
173
6.01k
    where
174
6.01k
        K: Clone,
175
6.01k
        T: Clone,
176
    {
177
        // If we are maintaining a btree index, we need to update it.
178
6.01k
        if let Some(
btree0
) = &mut self.btree {
179
0
            btree.insert(key.clone());
180
6.01k
        }
181
6.01k
        self.lru
182
6.01k
            .put(key.clone(), eviction_item)
183
6.01k
            .map(|old_item| 
self30
.
remove30
(
key.borrow()30
,
&old_item30
, true))
184
6.01k
    }
185
186
5
    fn add_remove_callback(&mut self, callback: C) {
187
5
        self.remove_callbacks.push(callback);
188
5
    }
189
}
190
191
#[derive(Debug, Clone, Copy)]
192
pub struct NoopRemove;
193
194
impl<Q> RemoveItemCallback<Q> for NoopRemove {
195
0
    fn callback(&self, _store_key: &Q) -> Pin<Box<dyn Future<Output = ()> + Send>> {
196
0
        Box::pin(async {})
197
0
    }
198
}
199
200
#[derive(Debug, MetricsComponent)]
201
pub struct EvictingMap<
202
    K: Ord + Hash + Eq + Clone + Debug + Send + Borrow<Q>,
203
    Q: Ord + Hash + Eq + Debug,
204
    T: LenEntry + Debug + Send,
205
    I: InstantWrapper,
206
    C: RemoveItemCallback<Q> = NoopRemove,
207
> {
208
    #[metric]
209
    state: Mutex<State<K, Q, T, C>>,
210
    anchor_time: I,
211
    #[metric(help = "Maximum size of the store in bytes")]
212
    max_bytes: u64,
213
    #[metric(help = "Number of bytes to evict when the store is full")]
214
    evict_bytes: u64,
215
    #[metric(help = "Maximum number of seconds to keep an item in the store")]
216
    max_seconds: i32,
217
    #[metric(help = "Maximum number of items to keep in the store")]
218
    max_count: u64,
219
}
220
221
impl<K, Q, T, I, C> EvictingMap<K, Q, T, I, C>
222
where
223
    K: Ord + Hash + Eq + Clone + Debug + Send + Sync + Borrow<Q>,
224
    Q: Ord + Hash + Eq + Debug + Sync,
225
    T: LenEntry + Debug + Clone + Send + Sync,
226
    I: InstantWrapper,
227
    C: RemoveItemCallback<Q>,
228
{
229
483
    pub fn new(config: &EvictionPolicy, anchor_time: I) -> Self {
230
483
        Self {
231
483
            // We use unbounded because if we use the bounded version we can't call the delete
232
483
            // function on the LenEntry properly.
233
483
            state: Mutex::new(State {
234
483
                lru: LruCache::unbounded(),
235
483
                btree: None,
236
483
                sum_store_size: 0,
237
483
                evicted_bytes: Counter::default(),
238
483
                evicted_items: CounterWithTime::default(),
239
483
                replaced_bytes: Counter::default(),
240
483
                replaced_items: CounterWithTime::default(),
241
483
                lifetime_inserted_bytes: Counter::default(),
242
483
                _key_type: PhantomData,
243
483
                remove_callbacks: Vec::new(),
244
483
            }),
245
483
            anchor_time,
246
483
            max_bytes: config.max_bytes as u64,
247
483
            evict_bytes: config.evict_bytes as u64,
248
483
            max_seconds: config.max_seconds as i32,
249
483
            max_count: config.max_count,
250
483
        }
251
483
    }
252
253
0
    pub async fn enable_filtering(&self) {
254
0
        let mut state = self.state.lock();
255
0
        if state.btree.is_none() {
256
0
            Self::rebuild_btree_index(&mut state);
257
0
        }
258
0
    }
259
260
2
    fn rebuild_btree_index(state: &mut State<K, Q, T, C>) {
261
2
        state.btree = Some(state.lru.iter().map(|(k, _)| k).cloned().collect());
262
2
    }
263
264
    /// Run the `handler` function on each key-value pair that matches the `prefix_range`
265
    /// and return the number of items that were processed.
266
    /// The `handler` function should return `true` to continue processing the next item
267
    /// or `false` to stop processing.
268
11
    pub fn range<F>(&self, prefix_range: impl RangeBounds<Q> + Send, mut handler: F) -> u64
269
11
    where
270
11
        F: FnMut(&K, &T) -> bool + Send,
271
11
        K: Ord,
272
    {
273
11
        let mut state = self.state.lock();
274
11
        let btree = if let Some(
ref btree9
) = state.btree {
275
9
            btree
276
        } else {
277
2
            Self::rebuild_btree_index(&mut state);
278
2
            state.btree.as_ref().unwrap()
279
        };
280
11
        let mut continue_count = 0;
281
22
        for key in 
btree11
.
range11
(
prefix_range11
) {
282
22
            let value = &state.lru.peek(key.borrow()).unwrap().data;
283
22
            let should_continue = handler(key, value);
284
22
            if !should_continue {
285
0
                break;
286
22
            }
287
22
            continue_count += 1;
288
        }
289
11
        continue_count
290
11
    }
291
292
    /// Returns the number of key-value pairs that are currently in the the cache.
293
    /// Function is not for production code paths.
294
30
    pub fn len_for_test(&self) -> usize {
295
30
        self.state.lock().lru.len()
296
30
    }
297
298
10.9k
    fn should_evict(
299
10.9k
        &self,
300
10.9k
        lru_len: usize,
301
10.9k
        peek_entry: &EvictionItem<T>,
302
10.9k
        sum_store_size: u64,
303
10.9k
        max_bytes: u64,
304
10.9k
    ) -> bool {
305
10.9k
        let is_over_size = max_bytes != 0 && 
sum_store_size >= max_bytes4.92k
;
306
307
10.9k
        let elapsed_seconds =
308
10.9k
            i32::try_from(self.anchor_time.elapsed().as_secs()).unwrap_or(i32::MAX);
309
10.9k
        let evict_older_than_seconds = elapsed_seconds.saturating_sub(self.max_seconds);
310
10.9k
        let old_item_exists =
311
10.9k
            self.max_seconds != 0 && 
peek_entry.seconds_since_anchor < evict_older_than_seconds128
;
312
313
10.9k
        let is_over_count =
314
10.9k
            self.max_count != 0 && 
u64::try_from26
(
lru_len26
).unwrap_or(u64::MAX) > self.max_count;
315
316
10.9k
        is_over_size || 
old_item_exists10.9k
||
is_over_count10.9k
317
10.9k
    }
318
319
    #[must_use]
320
6.03k
    fn evict_items(&self, state: &mut State<K, Q, T, C>) -> (Vec<T>, Vec<RemoveFuture>) {
321
6.03k
        let Some((_, mut peek_entry)) = state.lru.peek_lru() else {
322
0
            return (Vec::new(), Vec::new());
323
        };
324
325
6.03k
        let max_bytes = if self.max_bytes != 0
326
19
            && self.evict_bytes != 0
327
4
            && self.should_evict(
328
4
                state.lru.len(),
329
4
                peek_entry,
330
4
                state.sum_store_size,
331
4
                self.max_bytes,
332
            ) {
333
1
            self.max_bytes.saturating_sub(self.evict_bytes)
334
        } else {
335
6.03k
            self.max_bytes
336
        };
337
338
6.03k
        let mut items_to_unref = Vec::new();
339
6.03k
        let mut removal_futures = Vec::new();
340
341
6.04k
        while self.should_evict(state.lru.len(), peek_entry, state.sum_store_size, max_bytes) {
342
16
            let (key, eviction_item) = state
343
16
                .lru
344
16
                .pop_lru()
345
16
                .expect("Tried to peek() then pop() but failed");
346
16
            debug!(?key, "Evicting",);
347
16
            let (data, futures) = state.remove(key.borrow(), &eviction_item, false);
348
16
            items_to_unref.push(data);
349
16
            removal_futures.extend(futures.into_iter());
350
351
16
            peek_entry = if let Some((_, 
entry12
)) = state.lru.peek_lru() {
352
12
                entry
353
            } else {
354
4
                break;
355
            };
356
        }
357
358
6.03k
        (items_to_unref, removal_futures)
359
6.03k
    }
360
361
    /// Return the size of a `key`, if not found `None` is returned.
362
25
    pub async fn size_for_key(&self, key: &Q) -> Option<u64> {
363
25
        let mut results = [None];
364
25
        self.sizes_for_keys([key], &mut results[..], false).await;
365
25
        results[0]
366
25
    }
367
368
    /// Return the sizes of a collection of `keys`. Expects `results` collection
369
    /// to be provided for storing the resulting key sizes. Each index value in
370
    /// `keys` maps directly to the size value for the key in `results`.
371
    /// If no key is found in the internal map, `None` is filled in its place.
372
    /// If `peek` is set to `true`, the items are not promoted to the front of the
373
    /// LRU cache. Note: peek may still evict, but won't promote.
374
404
    pub async fn sizes_for_keys<It, R>(&self, keys: It, results: &mut [Option<u64>], peek: bool)
375
404
    where
376
404
        It: IntoIterator<Item = R> + Send,
377
404
        // Note: It's not enough to have the inserts themselves be Send. The
378
404
        // returned iterator should be Send as well.
379
404
        <It as IntoIterator>::IntoIter: Send,
380
404
        // This may look strange, but what we are doing is saying:
381
404
        // * `K` must be able to borrow `Q`
382
404
        // * `R` (the input stream item type) must also be able to borrow `Q`
383
404
        // Note: That K and R do not need to be the same type, they just both need
384
404
        // to be able to borrow a `Q`.
385
404
        R: Borrow<Q> + Send,
386
404
    {
387
404
        let (removal_futures, data_to_unref) = {
388
404
            let mut state = self.state.lock();
389
390
404
            let lru_len = state.lru.len();
391
404
            let mut data_to_unref = Vec::new();
392
404
            let mut removal_futures = Vec::new();
393
447
            for (key, result) in 
keys404
.
into_iter404
().
zip404
(
results404
.
iter_mut404
()) {
394
447
                let maybe_entry = if peek {
395
11
                    state.lru.peek_mut(key.borrow())
396
                } else {
397
436
                    state.lru.get_mut(key.borrow())
398
                };
399
447
                match maybe_entry {
400
203
                    Some(entry) => {
401
                        // Note: We need to check eviction because the item might be expired
402
                        // based on the current time. In such case, we remove the item while
403
                        // we are here.
404
203
                        if self.should_evict(lru_len, entry, 0, u64::MAX) {
405
1
                            *result = None;
406
1
                            if let Some((key, eviction_item)) = state.lru.pop_entry(key.borrow()) {
407
1
                                info!(?key, "Item expired, evicting");
408
1
                                let (data, futures) =
409
1
                                    state.remove(key.borrow(), &eviction_item, false);
410
                                // Store data for later unref - we can't drop state here as we're still iterating
411
1
                                data_to_unref.push(data);
412
1
                                removal_futures.extend(futures.into_iter());
413
0
                            }
414
                        } else {
415
202
                            if !peek {
416
199
                                entry.seconds_since_anchor =
417
199
                                    i32::try_from(self.anchor_time.elapsed().as_secs())
418
199
                                        .unwrap_or(i32::MAX);
419
199
                            
}3
420
202
                            *result = Some(entry.data.len());
421
                        }
422
                    }
423
244
                    None => *result = None,
424
                }
425
            }
426
404
            (removal_futures, data_to_unref)
427
        };
428
429
        // Perform the async callbacks outside of the lock
430
404
        let mut callbacks: FuturesUnordered<_> = removal_futures.into_iter().collect();
431
404
        while callbacks.next().await.is_some() 
{}0
432
404
        let mut callbacks: FuturesUnordered<_> =
433
404
            data_to_unref.iter().map(LenEntry::unref).collect();
434
405
        while callbacks.next().await.is_some() 
{}1
435
404
    }
436
437
    /// Returns the value for `key` if present and not expired, refreshing
438
    /// its LRU/atime position. If the entry is present but TTL- or
439
    /// count-expired, it is reaped and `None` is returned.
440
    ///
441
    /// A read never cascades into other entries — only the queried key is
442
    /// ever touched. Global eviction (size/count overflow trim) runs on
443
    /// inserts; it is not driven by reads, since `sum_store_size` cannot
444
    /// grow without an insert.
445
4.70k
    pub async fn get(&self, key: &Q) -> Option<T> {
446
        // Lazily reap *only* the requested entry if it is itself expired;
447
        // leave the rest for inserts (which already run the global eviction
448
        // loop).
449
4.69k
        let (data, expired_data, removal_futures) = {
450
4.70k
            let mut state = self.state.lock();
451
4.70k
            let lru_len = state.lru.len();
452
4.70k
            let 
entry4.69k
= state.lru.get_mut(key.borrow())
?13
;
453
            // Pass `sum_store_size=0` and `max_bytes=u64::MAX` so we only
454
            // consult TTL / count predicates — never the global byte budget.
455
            // Mirrors the per-key reap path in `sizes_for_keys`.
456
4.69k
            if self.should_evict(lru_len, entry, 0, u64::MAX) {
457
3
                let (popped_key, eviction_item) = state
458
3
                    .lru
459
3
                    .pop_entry(key.borrow())
460
3
                    .expect("entry was just observed via get_mut");
461
3
                info!(?popped_key, "Item expired, evicting");
462
3
                let (data, futures) = state.remove(popped_key.borrow(), &eviction_item, false);
463
3
                (None, Some(data), futures)
464
            } else {
465
4.68k
                entry.seconds_since_anchor =
466
4.68k
                    i32::try_from(self.anchor_time.elapsed().as_secs()).unwrap_or(i32::MAX);
467
4.68k
                (Some(entry.data.clone()), None, Vec::new())
468
            }
469
        };
470
471
        // Drain remove_callbacks and unref the reaped entry outside the lock.
472
4.69k
        let mut callbacks: FuturesUnordered<_> = removal_futures.into_iter().collect();
473
4.69k
        while callbacks.next().await.is_some() 
{}0
474
4.69k
        if let Some(
d3
) = expired_data {
475
3
            d.unref().await;
476
4.68k
        }
477
478
4.69k
        data
479
4.70k
    }
480
481
    /// Returns the replaced item if any.
482
6.01k
    pub async fn insert(&self, key: K, data: T) -> Option<T>
483
6.01k
    where
484
6.01k
        K: 'static,
485
6.01k
    {
486
6.01k
        self.insert_with_time(
487
6.01k
            key,
488
6.01k
            data,
489
6.01k
            i32::try_from(self.anchor_time.elapsed().as_secs()).unwrap_or(i32::MAX),
490
6.01k
        )
491
6.01k
        .await
492
6.01k
    }
493
494
    /// Returns the replaced item if any.
495
6.01k
    pub async fn insert_with_time(&self, key: K, data: T, seconds_since_anchor: i32) -> Option<T> {
496
6.01k
        let (items_to_unref, removal_futures) = {
497
6.01k
            let mut state = self.state.lock();
498
6.01k
            self.inner_insert_many(&mut state, [(key, data)], seconds_since_anchor)
499
6.01k
        };
500
501
6.01k
        let mut futures: FuturesUnordered<_> = removal_futures.into_iter().collect();
502
6.01k
        while futures.next().await.is_some() 
{}1
503
504
        // Unref items outside of lock
505
6.01k
        let futures: FuturesUnordered<_> = items_to_unref
506
6.01k
            .into_iter()
507
6.01k
            .map(|item| async move 
{45
508
45
                item.unref().await;
509
45
                item
510
90
            })
511
6.01k
            .collect();
512
6.01k
        futures.collect::<Vec<_>>().await.into_iter().next()
513
6.01k
    }
514
515
    /// Same as `insert()`, but optimized for multiple inserts.
516
    /// Returns the replaced items if any.
517
7
    pub async fn insert_many<It>(&self, inserts: It) -> Vec<T>
518
7
    where
519
7
        It: IntoIterator<Item = (K, T)> + Send,
520
7
        // Note: It's not enough to have the inserts themselves be Send. The
521
7
        // returned iterator should be Send as well.
522
7
        <It as IntoIterator>::IntoIter: Send,
523
7
        K: 'static,
524
7
    {
525
7
        let mut inserts = inserts.into_iter().peekable();
526
        // Shortcut for cases where there are no inserts, so we don't need to lock.
527
7
        if inserts.peek().is_none() {
528
5
            return Vec::new();
529
2
        }
530
531
2
        let (items_to_unref, removal_futures) = {
532
2
            let mut state = self.state.lock();
533
2
            self.inner_insert_many(
534
2
                &mut state,
535
2
                inserts,
536
2
                i32::try_from(self.anchor_time.elapsed().as_secs()).unwrap_or(i32::MAX),
537
2
            )
538
2
        };
539
540
2
        let mut futures: FuturesUnordered<_> = removal_futures.into_iter().collect();
541
2
        while futures.next().await.is_some() 
{}0
542
543
        // Unref items outside of lock
544
2
        items_to_unref
545
2
            .into_iter()
546
2
            .map(|item| async move 
{0
547
0
                item.unref().await;
548
0
                item
549
0
            })
550
2
            .collect::<FuturesUnordered<_>>()
551
2
            .collect::<Vec<_>>()
552
2
            .await
553
7
    }
554
555
6.01k
    fn inner_insert_many<It>(
556
6.01k
        &self,
557
6.01k
        state: &mut State<K, Q, T, C>,
558
6.01k
        inserts: It,
559
6.01k
        seconds_since_anchor: i32,
560
6.01k
    ) -> (Vec<T>, Vec<RemoveFuture>)
561
6.01k
    where
562
6.01k
        It: IntoIterator<Item = (K, T)> + Send,
563
6.01k
        // Note: It's not enough to have the inserts themselves be Send. The
564
6.01k
        // returned iterator should be Send as well.
565
6.01k
        <It as IntoIterator>::IntoIter: Send,
566
    {
567
6.01k
        let mut replaced_items = Vec::new();
568
6.01k
        let mut removal_futures = Vec::new();
569
6.01k
        for (key, data) in inserts {
570
6.01k
            let new_item_size = data.len();
571
6.01k
            let eviction_item = EvictionItem {
572
6.01k
                seconds_since_anchor,
573
6.01k
                data,
574
6.01k
            };
575
576
6.01k
            if let Some((
old_item30
,
futures30
)) = state.put(&key, eviction_item) {
577
30
                removal_futures.extend(futures.into_iter());
578
30
                replaced_items.push(old_item);
579
5.98k
            }
580
6.01k
            state.sum_store_size += new_item_size;
581
6.01k
            state.lifetime_inserted_bytes.add(new_item_size);
582
        }
583
584
        // Perform eviction after all insertions
585
6.01k
        let (items_to_unref, futures) = self.evict_items(state);
586
6.01k
        removal_futures.extend(futures);
587
588
        // Note: We cannot drop the state lock here since we're borrowing it,
589
        // but the caller will handle unreffing these items after releasing the lock
590
6.01k
        replaced_items.extend(items_to_unref);
591
592
6.01k
        (replaced_items, removal_futures)
593
6.01k
    }
594
595
15
    pub async fn remove(&self, key: &Q) -> bool {
596
15
        let (items_to_unref, removed_item, removal_futures) = {
597
15
            let mut state = self.state.lock();
598
599
            // First perform eviction
600
15
            let (evicted_items, mut removal_futures) = self.evict_items(&mut *state);
601
602
            // Then try to remove the requested item
603
15
            let removed = if let Some(
entry14
) = state.lru.pop(key.borrow()) {
604
14
                let (removed_item, more_removal_futures) = state.remove(key, &entry, false);
605
14
                removal_futures.extend(more_removal_futures.into_iter());
606
14
                Some(removed_item)
607
            } else {
608
1
                None
609
            };
610
611
15
            (evicted_items, removed, removal_futures)
612
        };
613
614
15
        let mut callbacks: FuturesUnordered<_> = removal_futures.into_iter().collect();
615
16
        while callbacks.next().await.is_some() 
{}1
616
617
        // Unref evicted items outside of lock
618
15
        let mut callbacks: FuturesUnordered<_> =
619
15
            items_to_unref.iter().map(LenEntry::unref).collect();
620
16
        while callbacks.next().await.is_some() 
{}1
621
622
        // Unref removed item if any
623
15
        if let Some(
item14
) = removed_item {
624
14
            item.unref().await;
625
14
            return true;
626
1
        }
627
628
1
        false
629
15
    }
630
631
    /// Same as `remove()`, but allows for a conditional to be applied to the
632
    /// entry before removal in an atomic fashion.
633
1
    pub async fn remove_if<F>(&self, key: &Q, cond: F) -> bool
634
1
    where
635
1
        F: FnOnce(&T) -> bool + Send,
636
1
    {
637
1
        let (evicted_items, removal_futures, removed_item) = {
638
1
            let mut state = self.state.lock();
639
1
            if let Some(entry) = state.lru.get(key.borrow()) {
640
1
                if !cond(&entry.data) {
641
0
                    return false;
642
1
                }
643
                // First perform eviction
644
1
                let (evicted_items, mut removal_futures) = self.evict_items(&mut state);
645
646
                // Then try to remove the requested item
647
1
                let removed_item = if let Some(entry) = state.lru.pop(key.borrow()) {
648
1
                    let (item, more_removal_futures) = state.remove(key, &entry, false);
649
1
                    removal_futures.extend(more_removal_futures.into_iter());
650
1
                    Some(item)
651
                } else {
652
0
                    None
653
                };
654
655
1
                (evicted_items, removal_futures, removed_item)
656
            } else {
657
0
                (vec![], vec![].into_iter().collect(), None)
658
            }
659
        };
660
661
        // Perform the async callbacks outside of the lock
662
1
        let mut removal_futures: FuturesUnordered<_> = removal_futures.into_iter().collect();
663
1
        while removal_futures.next().await.is_some() 
{}0
664
665
        // Unref evicted items
666
1
        let mut callbacks: FuturesUnordered<_> =
667
1
            evicted_items.iter().map(LenEntry::unref).collect();
668
1
        while callbacks.next().await.is_some() 
{}0
669
670
        // Unref removed item if any
671
1
        if let Some(item) = removed_item {
672
1
            item.unref().await;
673
1
            true
674
        } else {
675
0
            false
676
        }
677
1
    }
678
679
5
    pub fn add_remove_callback(&self, callback: C) {
680
5
        self.state.lock().add_remove_callback(callback);
681
5
    }
682
}