Coverage Report

Created: 2025-11-14 11:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/evicting_map.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::borrow::Borrow;
16
use core::cmp::Eq;
17
use core::fmt::Debug;
18
use core::future::Future;
19
use core::hash::Hash;
20
use core::marker::PhantomData;
21
use core::ops::RangeBounds;
22
use core::pin::Pin;
23
use std::collections::BTreeSet;
24
use std::sync::Arc;
25
26
use futures::StreamExt;
27
use futures::stream::FuturesUnordered;
28
use lru::LruCache;
29
use nativelink_config::stores::EvictionPolicy;
30
use nativelink_metric::MetricsComponent;
31
use parking_lot::Mutex;
32
use serde::{Deserialize, Serialize};
33
use tracing::{debug, info};
34
35
use crate::instant_wrapper::InstantWrapper;
36
use crate::metrics_utils::{Counter, CounterWithTime};
37
38
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone)]
39
pub struct SerializedLRU<K> {
40
    pub data: Vec<(K, i32)>,
41
    pub anchor_time: u64,
42
}
43
44
#[derive(Debug)]
45
struct EvictionItem<T: LenEntry + Debug> {
46
    seconds_since_anchor: i32,
47
    data: T,
48
}
49
50
pub trait LenEntry: 'static {
51
    /// Length of referenced data.
52
    fn len(&self) -> u64;
53
54
    /// Returns `true` if `self` has zero length.
55
    fn is_empty(&self) -> bool;
56
57
    /// This will be called when object is removed from map.
58
    /// Note: There may still be a reference to it held somewhere else, which
59
    /// is why it can't be mutable. This is a good place to mark the item
60
    /// to be deleted and then in the Drop call actually do the deleting.
61
    /// This will ensure nowhere else in the program still holds a reference
62
    /// to this object.
63
    /// You should not rely only on the Drop trait. Doing so might result in the
64
    /// program safely shutting down and calling the Drop method on each object,
65
    /// which if you are deleting items you may not want to do.
66
    /// It is undefined behavior to have `unref()` called more than once.
67
    /// During the execution of `unref()` no items can be added or removed to/from
68
    /// the `EvictionMap` globally (including inside `unref()`).
69
    #[inline]
70
46
    fn unref(&self) -> impl Future<Output = ()> + Send {
71
46
        core::future::ready(())
72
46
    }
73
}
74
75
impl<T: LenEntry + Send + Sync> LenEntry for Arc<T> {
76
    #[inline]
77
283
    fn len(&self) -> u64 {
78
283
        T::len(self.as_ref())
79
283
    }
80
81
    #[inline]
82
0
    fn is_empty(&self) -> bool {
83
0
        T::is_empty(self.as_ref())
84
0
    }
85
86
    #[inline]
87
28
    async fn unref(&self) {
88
28
        self.as_ref().unref().await;
89
28
    }
90
}
91
92
// Callback to be called when the EvictingMap removes an item
93
// either via eviction or direct deletion. This will be called with
94
// whatever key type the EvictingMap uses.
95
pub trait RemoveItemCallback<Q>: Debug + Send + Sync {
96
    fn callback(&self, store_key: &Q) -> Pin<Box<dyn Future<Output = ()> + Send>>;
97
}
98
99
#[derive(Debug, MetricsComponent)]
100
struct State<
101
    K: Ord + Hash + Eq + Clone + Debug + Send + Borrow<Q>,
102
    Q: Ord + Hash + Eq + Debug,
103
    T: LenEntry + Debug + Send,
104
    C: RemoveItemCallback<Q>,
105
> {
106
    lru: LruCache<K, EvictionItem<T>>,
107
    btree: Option<BTreeSet<K>>,
108
    #[metric(help = "Total size of all items in the store")]
109
    sum_store_size: u64,
110
111
    #[metric(help = "Number of bytes evicted from the store")]
112
    evicted_bytes: Counter,
113
    #[metric(help = "Number of items evicted from the store")]
114
    evicted_items: CounterWithTime,
115
    #[metric(help = "Number of bytes replaced in the store")]
116
    replaced_bytes: Counter,
117
    #[metric(help = "Number of items replaced in the store")]
118
    replaced_items: CounterWithTime,
119
    #[metric(help = "Number of bytes inserted into the store since it was created")]
120
    lifetime_inserted_bytes: Counter,
121
122
    _key_type: PhantomData<Q>,
123
    remove_callbacks: Vec<C>,
124
}
125
126
type RemoveFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
127
128
impl<
129
    K: Ord + Hash + Eq + Clone + Debug + Send + Sync + Borrow<Q>,
130
    Q: Ord + Hash + Eq + Debug + Sync,
131
    T: LenEntry + Debug + Sync + Send,
132
    C: RemoveItemCallback<Q>,
133
> State<K, Q, T, C>
134
{
135
    /// Removes an item from the cache and returns the data for deferred cleanup.
136
    /// The caller is responsible for calling `unref()` on the returned data outside of the lock.
137
    #[must_use]
138
73
    fn remove(
139
73
        &mut self,
140
73
        key: &Q,
141
73
        eviction_item: &EvictionItem<T>,
142
73
        replaced: bool,
143
73
    ) -> (T, Vec<RemoveFuture>)
144
73
    where
145
73
        T: Clone,
146
    {
147
73
        if let Some(
btree0
) = &mut self.btree {
  Branch (147:16): [True: 0, False: 0]
  Branch (147:16): [True: 0, False: 0]
  Branch (147:16): [True: 0, False: 20]
  Branch (147:16): [True: 0, False: 0]
  Branch (147:16): [Folded - Ignored]
  Branch (147:16): [True: 0, False: 6]
  Branch (147:16): [True: 0, False: 1]
  Branch (147:16): [True: 0, False: 1]
  Branch (147:16): [True: 0, False: 14]
  Branch (147:16): [True: 0, False: 0]
  Branch (147:16): [True: 0, False: 1]
  Branch (147:16): [True: 0, False: 3]
  Branch (147:16): [True: 0, False: 0]
  Branch (147:16): [True: 0, False: 1]
  Branch (147:16): [True: 0, False: 1]
  Branch (147:16): [True: 0, False: 1]
  Branch (147:16): [True: 0, False: 1]
  Branch (147:16): [True: 0, False: 1]
  Branch (147:16): [True: 0, False: 5]
  Branch (147:16): [True: 0, False: 0]
  Branch (147:16): [True: 0, False: 0]
  Branch (147:16): [True: 0, False: 0]
  Branch (147:16): [Folded - Ignored]
  Branch (147:16): [True: 0, False: 16]
  Branch (147:16): [True: 0, False: 0]
  Branch (147:16): [True: 0, False: 1]
148
0
            btree.remove(key);
149
73
        }
150
73
        self.sum_store_size -= eviction_item.data.len();
151
73
        if replaced {
  Branch (151:12): [True: 0, False: 0]
  Branch (151:12): [True: 0, False: 0]
  Branch (151:12): [True: 18, False: 2]
  Branch (151:12): [True: 0, False: 0]
  Branch (151:12): [Folded - Ignored]
  Branch (151:12): [True: 0, False: 6]
  Branch (151:12): [True: 0, False: 1]
  Branch (151:12): [True: 1, False: 0]
  Branch (151:12): [True: 0, False: 14]
  Branch (151:12): [True: 0, False: 0]
  Branch (151:12): [True: 0, False: 1]
  Branch (151:12): [True: 0, False: 3]
  Branch (151:12): [True: 0, False: 0]
  Branch (151:12): [True: 0, False: 1]
  Branch (151:12): [True: 0, False: 1]
  Branch (151:12): [True: 1, False: 0]
  Branch (151:12): [True: 1, False: 0]
  Branch (151:12): [True: 0, False: 1]
  Branch (151:12): [True: 3, False: 2]
  Branch (151:12): [True: 0, False: 0]
  Branch (151:12): [True: 0, False: 0]
  Branch (151:12): [True: 0, False: 0]
  Branch (151:12): [Folded - Ignored]
  Branch (151:12): [True: 16, False: 0]
  Branch (151:12): [True: 0, False: 0]
  Branch (151:12): [True: 0, False: 1]
152
40
            self.replaced_items.inc();
153
40
            self.replaced_bytes.add(eviction_item.data.len());
154
40
        } else {
155
33
            self.evicted_items.inc();
156
33
            self.evicted_bytes.add(eviction_item.data.len());
157
33
        }
158
159
73
        let callbacks = self
160
73
            .remove_callbacks
161
73
            .iter()
162
73
            .map(|callback| 
callback2
.
callback2
(
key2
))
163
73
            .collect();
164
165
        // Return the data for deferred unref outside of lock
166
73
        (eviction_item.data.clone(), callbacks)
167
73
    }
168
169
    /// Inserts a new item into the cache. If the key already exists, the old item is returned
170
    /// for deferred cleanup.
171
    #[must_use]
172
5.95k
    fn put(&mut self, key: &K, eviction_item: EvictionItem<T>) -> Option<(T, Vec<RemoveFuture>)>
173
5.95k
    where
174
5.95k
        K: Clone,
175
5.95k
        T: Clone,
176
    {
177
        // If we are maintaining a btree index, we need to update it.
178
5.95k
        if let Some(
btree0
) = &mut self.btree {
  Branch (178:16): [True: 0, False: 0]
  Branch (178:16): [True: 0, False: 0]
  Branch (178:16): [True: 0, False: 5.75k]
  Branch (178:16): [True: 0, False: 0]
  Branch (178:16): [Folded - Ignored]
  Branch (178:16): [True: 0, False: 2]
  Branch (178:16): [True: 0, False: 27]
  Branch (178:16): [True: 0, False: 3]
  Branch (178:16): [True: 0, False: 6]
  Branch (178:16): [True: 0, False: 2]
  Branch (178:16): [True: 0, False: 2]
  Branch (178:16): [True: 0, False: 2]
  Branch (178:16): [True: 0, False: 2]
  Branch (178:16): [True: 0, False: 2]
  Branch (178:16): [True: 0, False: 1]
  Branch (178:16): [True: 0, False: 16]
  Branch (178:16): [True: 0, False: 0]
  Branch (178:16): [True: 0, False: 2]
  Branch (178:16): [Folded - Ignored]
  Branch (178:16): [True: 0, False: 97]
  Branch (178:16): [True: 0, False: 27]
179
0
            btree.insert(key.clone());
180
5.95k
        }
181
5.95k
        self.lru
182
5.95k
            .put(key.clone(), eviction_item)
183
5.95k
            .map(|old_item| 
self40
.
remove40
(
key.borrow()40
,
&old_item40
, true))
184
5.95k
    }
185
186
5
    fn add_remove_callback(&mut self, callback: C) {
187
5
        self.remove_callbacks.push(callback);
188
5
    }
189
}
190
191
#[derive(Debug, Clone, Copy)]
192
pub struct NoopRemove;
193
194
impl<Q> RemoveItemCallback<Q> for NoopRemove {
195
0
    fn callback(&self, _store_key: &Q) -> Pin<Box<dyn Future<Output = ()> + Send>> {
196
0
        Box::pin(async {})
197
0
    }
198
}
199
200
#[derive(Debug, MetricsComponent)]
201
pub struct EvictingMap<
202
    K: Ord + Hash + Eq + Clone + Debug + Send + Borrow<Q>,
203
    Q: Ord + Hash + Eq + Debug,
204
    T: LenEntry + Debug + Send,
205
    I: InstantWrapper,
206
    C: RemoveItemCallback<Q> = NoopRemove,
207
> {
208
    #[metric]
209
    state: Mutex<State<K, Q, T, C>>,
210
    anchor_time: I,
211
    #[metric(help = "Maximum size of the store in bytes")]
212
    max_bytes: u64,
213
    #[metric(help = "Number of bytes to evict when the store is full")]
214
    evict_bytes: u64,
215
    #[metric(help = "Maximum number of seconds to keep an item in the store")]
216
    max_seconds: i32,
217
    #[metric(help = "Maximum number of items to keep in the store")]
218
    max_count: u64,
219
}
220
221
impl<K, Q, T, I, C> EvictingMap<K, Q, T, I, C>
222
where
223
    K: Ord + Hash + Eq + Clone + Debug + Send + Sync + Borrow<Q>,
224
    Q: Ord + Hash + Eq + Debug + Sync,
225
    T: LenEntry + Debug + Clone + Send + Sync,
226
    I: InstantWrapper,
227
    C: RemoveItemCallback<Q>,
228
{
229
422
    pub fn new(config: &EvictionPolicy, anchor_time: I) -> Self {
230
422
        Self {
231
422
            // We use unbounded because if we use the bounded version we can't call the delete
232
422
            // function on the LenEntry properly.
233
422
            state: Mutex::new(State {
234
422
                lru: LruCache::unbounded(),
235
422
                btree: None,
236
422
                sum_store_size: 0,
237
422
                evicted_bytes: Counter::default(),
238
422
                evicted_items: CounterWithTime::default(),
239
422
                replaced_bytes: Counter::default(),
240
422
                replaced_items: CounterWithTime::default(),
241
422
                lifetime_inserted_bytes: Counter::default(),
242
422
                _key_type: PhantomData,
243
422
                remove_callbacks: Vec::new(),
244
422
            }),
245
422
            anchor_time,
246
422
            max_bytes: config.max_bytes as u64,
247
422
            evict_bytes: config.evict_bytes as u64,
248
422
            max_seconds: config.max_seconds as i32,
249
422
            max_count: config.max_count,
250
422
        }
251
422
    }
252
253
0
    pub async fn enable_filtering(&self) {
254
0
        let mut state = self.state.lock();
255
0
        if state.btree.is_none() {
  Branch (255:12): [Folded - Ignored]
  Branch (255:12): [Folded - Ignored]
256
0
            Self::rebuild_btree_index(&mut state);
257
0
        }
258
0
    }
259
260
2
    fn rebuild_btree_index(state: &mut State<K, Q, T, C>) {
261
2
        state.btree = Some(state.lru.iter().map(|(k, _)| k).cloned().collect());
262
2
    }
263
264
    /// Run the `handler` function on each key-value pair that matches the `prefix_range`
265
    /// and return the number of items that were processed.
266
    /// The `handler` function should return `true` to continue processing the next item
267
    /// or `false` to stop processing.
268
11
    pub fn range<F>(&self, prefix_range: impl RangeBounds<Q> + Send, mut handler: F) -> u64
269
11
    where
270
11
        F: FnMut(&K, &T) -> bool + Send,
271
11
        K: Ord,
272
    {
273
11
        let mut state = self.state.lock();
274
11
        let btree = if let Some(
ref btree9
) = state.btree {
  Branch (274:28): [True: 6, False: 1]
  Branch (274:28): [Folded - Ignored]
  Branch (274:28): [True: 1, False: 0]
  Branch (274:28): [True: 2, False: 0]
  Branch (274:28): [True: 0, False: 1]
  Branch (274:28): [Folded - Ignored]
275
9
            btree
276
        } else {
277
2
            Self::rebuild_btree_index(&mut state);
278
2
            state.btree.as_ref().unwrap()
279
        };
280
11
        let mut continue_count = 0;
281
22
        for key in 
btree11
.
range11
(
prefix_range11
) {
282
22
            let value = &state.lru.peek(key.borrow()).unwrap().data;
283
22
            let should_continue = handler(key, value);
284
22
            if !should_continue {
  Branch (284:16): [True: 0, False: 13]
  Branch (284:16): [Folded - Ignored]
  Branch (284:16): [True: 0, False: 1]
  Branch (284:16): [True: 0, False: 5]
  Branch (284:16): [True: 0, False: 3]
  Branch (284:16): [Folded - Ignored]
285
0
                break;
286
22
            }
287
22
            continue_count += 1;
288
        }
289
11
        continue_count
290
11
    }
291
292
    /// Returns the number of key-value pairs that are currently in the the cache.
293
    /// Function is not for production code paths.
294
30
    pub fn len_for_test(&self) -> usize {
295
30
        self.state.lock().lru.len()
296
30
    }
297
298
10.6k
    fn should_evict(
299
10.6k
        &self,
300
10.6k
        lru_len: usize,
301
10.6k
        peek_entry: &EvictionItem<T>,
302
10.6k
        sum_store_size: u64,
303
10.6k
        max_bytes: u64,
304
10.6k
    ) -> bool {
305
10.6k
        let is_over_size = max_bytes != 0 && 
sum_store_size >= max_bytes216
;
  Branch (305:28): [True: 0, False: 0]
  Branch (305:28): [True: 0, False: 0]
  Branch (305:28): [True: 108, False: 10.1k]
  Branch (305:28): [True: 0, False: 0]
  Branch (305:28): [Folded - Ignored]
  Branch (305:28): [True: 0, False: 6]
  Branch (305:28): [True: 0, False: 1]
  Branch (305:28): [True: 0, False: 3]
  Branch (305:28): [True: 30, False: 33]
  Branch (305:28): [True: 0, False: 3]
  Branch (305:28): [True: 0, False: 1]
  Branch (305:28): [True: 0, False: 2]
  Branch (305:28): [True: 3, False: 7]
  Branch (305:28): [True: 0, False: 2]
  Branch (305:28): [True: 3, False: 0]
  Branch (305:28): [True: 0, False: 4]
  Branch (305:28): [True: 0, False: 2]
  Branch (305:28): [True: 0, False: 3]
  Branch (305:28): [True: 0, False: 2]
  Branch (305:28): [True: 1, False: 27]
  Branch (305:28): [True: 0, False: 0]
  Branch (305:28): [True: 0, False: 4]
  Branch (305:28): [True: 0, False: 3]
  Branch (305:28): [Folded - Ignored]
  Branch (305:28): [True: 71, False: 138]
  Branch (305:28): [True: 0, False: 0]
  Branch (305:28): [True: 0, False: 85]
306
307
10.6k
        let elapsed_seconds =
308
10.6k
            i32::try_from(self.anchor_time.elapsed().as_secs()).unwrap_or(i32::MAX);
309
10.6k
        let evict_older_than_seconds = elapsed_seconds.saturating_sub(self.max_seconds);
310
10.6k
        let old_item_exists =
311
10.6k
            self.max_seconds != 0 && 
peek_entry.seconds_since_anchor < evict_older_than_seconds115
;
  Branch (311:13): [True: 0, False: 0]
  Branch (311:13): [True: 0, False: 0]
  Branch (311:13): [True: 0, False: 10.2k]
  Branch (311:13): [True: 0, False: 0]
  Branch (311:13): [Folded - Ignored]
  Branch (311:13): [True: 0, False: 6]
  Branch (311:13): [True: 0, False: 1]
  Branch (311:13): [True: 0, False: 3]
  Branch (311:13): [True: 30, False: 33]
  Branch (311:13): [True: 0, False: 3]
  Branch (311:13): [True: 0, False: 1]
  Branch (311:13): [True: 0, False: 2]
  Branch (311:13): [True: 0, False: 10]
  Branch (311:13): [True: 0, False: 2]
  Branch (311:13): [True: 0, False: 3]
  Branch (311:13): [True: 0, False: 4]
  Branch (311:13): [True: 0, False: 2]
  Branch (311:13): [True: 0, False: 3]
  Branch (311:13): [True: 0, False: 2]
  Branch (311:13): [True: 0, False: 28]
  Branch (311:13): [True: 0, False: 0]
  Branch (311:13): [True: 0, False: 4]
  Branch (311:13): [True: 0, False: 3]
  Branch (311:13): [Folded - Ignored]
  Branch (311:13): [True: 0, False: 209]
  Branch (311:13): [True: 0, False: 0]
  Branch (311:13): [True: 85, False: 0]
312
313
10.6k
        let is_over_count =
314
10.6k
            self.max_count != 0 && 
u64::try_from20
(
lru_len20
).unwrap_or(u64::MAX) > self.max_count;
  Branch (314:13): [True: 0, False: 0]
  Branch (314:13): [True: 0, False: 0]
  Branch (314:13): [True: 0, False: 10.2k]
  Branch (314:13): [True: 0, False: 0]
  Branch (314:13): [Folded - Ignored]
  Branch (314:13): [True: 0, False: 6]
  Branch (314:13): [True: 0, False: 1]
  Branch (314:13): [True: 3, False: 0]
  Branch (314:13): [True: 8, False: 55]
  Branch (314:13): [True: 0, False: 3]
  Branch (314:13): [True: 0, False: 1]
  Branch (314:13): [True: 0, False: 2]
  Branch (314:13): [True: 0, False: 10]
  Branch (314:13): [True: 0, False: 2]
  Branch (314:13): [True: 0, False: 3]
  Branch (314:13): [True: 4, False: 0]
  Branch (314:13): [True: 2, False: 0]
  Branch (314:13): [True: 3, False: 0]
  Branch (314:13): [True: 0, False: 2]
  Branch (314:13): [True: 0, False: 28]
  Branch (314:13): [True: 0, False: 0]
  Branch (314:13): [True: 0, False: 4]
  Branch (314:13): [True: 0, False: 3]
  Branch (314:13): [Folded - Ignored]
  Branch (314:13): [True: 0, False: 209]
  Branch (314:13): [True: 0, False: 0]
  Branch (314:13): [True: 0, False: 85]
315
316
10.6k
        is_over_size || 
old_item_exists10.6k
||
is_over_count10.6k
  Branch (316:9): [True: 0, False: 0]
  Branch (316:25): [True: 0, False: 0]
  Branch (316:9): [True: 0, False: 0]
  Branch (316:25): [True: 0, False: 0]
  Branch (316:9): [True: 2, False: 10.2k]
  Branch (316:25): [True: 0, False: 10.2k]
  Branch (316:9): [True: 0, False: 0]
  Branch (316:25): [True: 0, False: 0]
  Branch (316:9): [Folded - Ignored]
  Branch (316:25): [Folded - Ignored]
  Branch (316:9): [True: 0, False: 6]
  Branch (316:25): [True: 0, False: 6]
  Branch (316:9): [True: 0, False: 1]
  Branch (316:25): [True: 0, False: 1]
  Branch (316:9): [True: 0, False: 3]
  Branch (316:25): [True: 0, False: 3]
  Branch (316:9): [True: 6, False: 57]
  Branch (316:25): [True: 9, False: 48]
  Branch (316:9): [True: 0, False: 3]
  Branch (316:25): [True: 0, False: 3]
  Branch (316:9): [True: 0, False: 1]
  Branch (316:25): [True: 0, False: 1]
  Branch (316:9): [True: 0, False: 2]
  Branch (316:25): [True: 0, False: 2]
  Branch (316:9): [True: 0, False: 10]
  Branch (316:25): [True: 0, False: 10]
  Branch (316:9): [True: 0, False: 2]
  Branch (316:25): [True: 0, False: 2]
  Branch (316:9): [True: 1, False: 2]
  Branch (316:25): [True: 0, False: 2]
  Branch (316:9): [True: 0, False: 4]
  Branch (316:25): [True: 0, False: 4]
  Branch (316:9): [True: 0, False: 2]
  Branch (316:25): [True: 0, False: 2]
  Branch (316:9): [True: 0, False: 3]
  Branch (316:25): [True: 0, False: 3]
  Branch (316:9): [True: 0, False: 2]
  Branch (316:25): [True: 0, False: 2]
  Branch (316:9): [True: 0, False: 28]
  Branch (316:25): [True: 0, False: 28]
  Branch (316:9): [True: 0, False: 0]
  Branch (316:25): [True: 0, False: 0]
  Branch (316:9): [True: 0, False: 4]
  Branch (316:25): [True: 0, False: 4]
  Branch (316:9): [True: 0, False: 3]
  Branch (316:25): [True: 0, False: 3]
  Branch (316:9): [Folded - Ignored]
  Branch (316:25): [Folded - Ignored]
  Branch (316:9): [True: 0, False: 209]
  Branch (316:25): [True: 0, False: 209]
  Branch (316:9): [True: 0, False: 0]
  Branch (316:25): [True: 0, False: 0]
  Branch (316:9): [True: 0, False: 85]
  Branch (316:25): [True: 1, False: 84]
317
10.6k
    }
318
319
    #[must_use]
320
5.96k
    fn evict_items(&self, state: &mut State<K, Q, T, C>) -> (Vec<T>, Vec<RemoveFuture>) {
321
5.96k
        let Some((_, mut peek_entry)) = state.lru.peek_lru() else {
  Branch (321:13): [True: 0, False: 0]
  Branch (321:13): [True: 0, False: 0]
  Branch (321:13): [True: 5.75k, False: 0]
  Branch (321:13): [True: 0, False: 0]
  Branch (321:13): [Folded - Ignored]
  Branch (321:13): [True: 6, False: 0]
  Branch (321:13): [True: 1, False: 0]
  Branch (321:13): [True: 2, False: 0]
  Branch (321:13): [True: 31, False: 0]
  Branch (321:13): [True: 3, False: 0]
  Branch (321:13): [True: 1, False: 0]
  Branch (321:13): [True: 2, False: 0]
  Branch (321:13): [True: 7, False: 0]
  Branch (321:13): [True: 2, False: 0]
  Branch (321:13): [True: 2, False: 0]
  Branch (321:13): [True: 2, False: 0]
  Branch (321:13): [True: 2, False: 0]
  Branch (321:13): [True: 2, False: 0]
  Branch (321:13): [True: 2, False: 0]
  Branch (321:13): [True: 18, False: 0]
  Branch (321:13): [True: 0, False: 0]
  Branch (321:13): [True: 0, False: 0]
  Branch (321:13): [True: 2, False: 0]
  Branch (321:13): [Folded - Ignored]
  Branch (321:13): [True: 97, False: 0]
  Branch (321:13): [True: 0, False: 0]
  Branch (321:13): [True: 27, False: 0]
322
0
            return (Vec::new(), Vec::new());
323
        };
324
325
5.96k
        let max_bytes = if self.max_bytes != 0
  Branch (325:28): [True: 0, False: 0]
  Branch (325:28): [True: 0, False: 0]
  Branch (325:28): [True: 8, False: 5.75k]
  Branch (325:28): [True: 0, False: 0]
  Branch (325:28): [Folded - Ignored]
  Branch (325:28): [True: 0, False: 6]
  Branch (325:28): [True: 0, False: 1]
  Branch (325:28): [True: 0, False: 2]
  Branch (325:28): [True: 8, False: 23]
  Branch (325:28): [True: 0, False: 3]
  Branch (325:28): [True: 0, False: 1]
  Branch (325:28): [True: 0, False: 2]
  Branch (325:28): [True: 0, False: 7]
  Branch (325:28): [True: 0, False: 2]
  Branch (325:28): [True: 2, False: 0]
  Branch (325:28): [True: 0, False: 2]
  Branch (325:28): [True: 0, False: 2]
  Branch (325:28): [True: 0, False: 2]
  Branch (325:28): [True: 0, False: 2]
  Branch (325:28): [True: 0, False: 18]
  Branch (325:28): [True: 0, False: 0]
  Branch (325:28): [True: 0, False: 0]
  Branch (325:28): [True: 0, False: 2]
  Branch (325:28): [Folded - Ignored]
  Branch (325:28): [True: 0, False: 97]
  Branch (325:28): [True: 0, False: 0]
  Branch (325:28): [True: 0, False: 27]
326
18
            && self.evict_bytes != 0
  Branch (326:16): [True: 0, False: 0]
  Branch (326:16): [True: 0, False: 0]
  Branch (326:16): [True: 0, False: 8]
  Branch (326:16): [True: 0, False: 0]
  Branch (326:16): [Folded - Ignored]
  Branch (326:16): [True: 0, False: 0]
  Branch (326:16): [True: 0, False: 0]
  Branch (326:16): [True: 0, False: 0]
  Branch (326:16): [True: 4, False: 4]
  Branch (326:16): [True: 0, False: 0]
  Branch (326:16): [True: 0, False: 0]
  Branch (326:16): [True: 0, False: 0]
  Branch (326:16): [True: 0, False: 0]
  Branch (326:16): [True: 0, False: 0]
  Branch (326:16): [True: 0, False: 2]
  Branch (326:16): [True: 0, False: 0]
  Branch (326:16): [True: 0, False: 0]
  Branch (326:16): [True: 0, False: 0]
  Branch (326:16): [True: 0, False: 0]
  Branch (326:16): [True: 0, False: 0]
  Branch (326:16): [True: 0, False: 0]
  Branch (326:16): [True: 0, False: 0]
  Branch (326:16): [True: 0, False: 0]
  Branch (326:16): [Folded - Ignored]
  Branch (326:16): [True: 0, False: 0]
  Branch (326:16): [True: 0, False: 0]
  Branch (326:16): [True: 0, False: 0]
327
4
            && self.should_evict(
  Branch (327:16): [True: 0, False: 0]
  Branch (327:16): [True: 0, False: 0]
  Branch (327:16): [True: 0, False: 0]
  Branch (327:16): [True: 0, False: 0]
  Branch (327:16): [Folded - Ignored]
  Branch (327:16): [True: 0, False: 0]
  Branch (327:16): [True: 0, False: 0]
  Branch (327:16): [True: 0, False: 0]
  Branch (327:16): [True: 1, False: 3]
  Branch (327:16): [True: 0, False: 0]
  Branch (327:16): [True: 0, False: 0]
  Branch (327:16): [True: 0, False: 0]
  Branch (327:16): [True: 0, False: 0]
  Branch (327:16): [True: 0, False: 0]
  Branch (327:16): [True: 0, False: 0]
  Branch (327:16): [True: 0, False: 0]
  Branch (327:16): [True: 0, False: 0]
  Branch (327:16): [True: 0, False: 0]
  Branch (327:16): [True: 0, False: 0]
  Branch (327:16): [True: 0, False: 0]
  Branch (327:16): [True: 0, False: 0]
  Branch (327:16): [True: 0, False: 0]
  Branch (327:16): [True: 0, False: 0]
  Branch (327:16): [Folded - Ignored]
  Branch (327:16): [True: 0, False: 0]
  Branch (327:16): [True: 0, False: 0]
  Branch (327:16): [True: 0, False: 0]
328
4
                state.lru.len(),
329
4
                peek_entry,
330
4
                state.sum_store_size,
331
4
                self.max_bytes,
332
            ) {
333
1
            self.max_bytes.saturating_sub(self.evict_bytes)
334
        } else {
335
5.96k
            self.max_bytes
336
        };
337
338
5.96k
        let mut items_to_unref = Vec::new();
339
5.96k
        let mut removal_futures = Vec::new();
340
341
5.98k
        while self.should_evict(state.lru.len(), peek_entry, state.sum_store_size, max_bytes) {
  Branch (341:15): [True: 0, False: 0]
  Branch (341:15): [True: 0, False: 0]
  Branch (341:15): [True: 2, False: 5.75k]
  Branch (341:15): [True: 0, False: 0]
  Branch (341:15): [Folded - Ignored]
  Branch (341:15): [True: 0, False: 6]
  Branch (341:15): [True: 0, False: 1]
  Branch (341:15): [True: 0, False: 2]
  Branch (341:15): [True: 12, False: 28]
  Branch (341:15): [True: 0, False: 3]
  Branch (341:15): [True: 0, False: 1]
  Branch (341:15): [True: 0, False: 2]
  Branch (341:15): [True: 0, False: 7]
  Branch (341:15): [True: 0, False: 2]
  Branch (341:15): [True: 1, False: 2]
  Branch (341:15): [True: 1, False: 2]
  Branch (341:15): [True: 0, False: 2]
  Branch (341:15): [True: 0, False: 2]
  Branch (341:15): [True: 0, False: 2]
  Branch (341:15): [True: 0, False: 18]
  Branch (341:15): [True: 0, False: 0]
  Branch (341:15): [True: 0, False: 0]
  Branch (341:15): [True: 0, False: 2]
  Branch (341:15): [Folded - Ignored]
  Branch (341:15): [True: 0, False: 97]
  Branch (341:15): [True: 0, False: 0]
  Branch (341:15): [True: 1, False: 27]
342
17
            let (key, eviction_item) = state
343
17
                .lru
344
17
                .pop_lru()
345
17
                .expect("Tried to peek() then pop() but failed");
346
17
            debug!(?key, "Evicting",);
347
17
            let (data, futures) = state.remove(key.borrow(), &eviction_item, false);
348
17
            items_to_unref.push(data);
349
17
            removal_futures.extend(futures.into_iter());
350
351
17
            peek_entry = if let Some((_, 
entry13
)) = state.lru.peek_lru() {
  Branch (351:33): [True: 0, False: 0]
  Branch (351:33): [True: 0, False: 0]
  Branch (351:33): [True: 1, False: 1]
  Branch (351:33): [True: 0, False: 0]
  Branch (351:33): [Folded - Ignored]
  Branch (351:33): [True: 0, False: 0]
  Branch (351:33): [True: 0, False: 0]
  Branch (351:33): [True: 0, False: 0]
  Branch (351:33): [True: 9, False: 3]
  Branch (351:33): [True: 0, False: 0]
  Branch (351:33): [True: 0, False: 0]
  Branch (351:33): [True: 0, False: 0]
  Branch (351:33): [True: 0, False: 0]
  Branch (351:33): [True: 0, False: 0]
  Branch (351:33): [True: 1, False: 0]
  Branch (351:33): [True: 1, False: 0]
  Branch (351:33): [True: 0, False: 0]
  Branch (351:33): [True: 0, False: 0]
  Branch (351:33): [True: 0, False: 0]
  Branch (351:33): [True: 0, False: 0]
  Branch (351:33): [True: 0, False: 0]
  Branch (351:33): [True: 0, False: 0]
  Branch (351:33): [True: 0, False: 0]
  Branch (351:33): [Folded - Ignored]
  Branch (351:33): [True: 0, False: 0]
  Branch (351:33): [True: 0, False: 0]
  Branch (351:33): [True: 1, False: 0]
352
13
                entry
353
            } else {
354
4
                break;
355
            };
356
        }
357
358
5.96k
        (items_to_unref, removal_futures)
359
5.96k
    }
360
361
    /// Return the size of a `key`, if not found `None` is returned.
362
25
    pub async fn size_for_key(&self, key: &Q) -> Option<u64> {
363
25
        let mut results = [None];
364
25
        self.sizes_for_keys([key], &mut results[..], false).await;
365
25
        results[0]
366
25
    }
367
368
    /// Return the sizes of a collection of `keys`. Expects `results` collection
369
    /// to be provided for storing the resulting key sizes. Each index value in
370
    /// `keys` maps directly to the size value for the key in `results`.
371
    /// If no key is found in the internal map, `None` is filled in its place.
372
    /// If `peek` is set to `true`, the items are not promoted to the front of the
373
    /// LRU cache. Note: peek may still evict, but won't promote.
374
311
    pub async fn sizes_for_keys<It, R>(&self, keys: It, results: &mut [Option<u64>], peek: bool)
375
311
    where
376
311
        It: IntoIterator<Item = R> + Send,
377
311
        // Note: It's not enough to have the inserts themselves be Send. The
378
311
        // returned iterator should be Send as well.
379
311
        <It as IntoIterator>::IntoIter: Send,
380
311
        // This may look strange, but what we are doing is saying:
381
311
        // * `K` must be able to borrow `Q`
382
311
        // * `R` (the input stream item type) must also be able to borrow `Q`
383
311
        // Note: That K and R do not need to be the same type, they just both need
384
311
        // to be able to borrow a `Q`.
385
311
        R: Borrow<Q> + Send,
386
311
    {
387
311
        let (removal_futures, data_to_unref) = {
388
311
            let mut state = self.state.lock();
389
390
311
            let lru_len = state.lru.len();
391
311
            let mut data_to_unref = Vec::new();
392
311
            let mut removal_futures = Vec::new();
393
346
            for (key, result) in 
keys311
.
into_iter311
().
zip311
(
results311
.
iter_mut311
()) {
394
346
                let maybe_entry = if peek {
  Branch (394:38): [True: 0, False: 0]
  Branch (394:38): [True: 0, False: 226]
  Branch (394:38): [True: 0, False: 0]
  Branch (394:38): [Folded - Ignored]
  Branch (394:38): [True: 0, False: 25]
  Branch (394:38): [True: 2, False: 0]
  Branch (394:38): [True: 5, False: 0]
  Branch (394:38): [True: 4, False: 0]
  Branch (394:38): [True: 0, False: 0]
  Branch (394:38): [True: 0, False: 0]
  Branch (394:38): [True: 0, False: 0]
  Branch (394:38): [True: 0, False: 0]
  Branch (394:38): [True: 0, False: 0]
  Branch (394:38): [True: 0, False: 1]
  Branch (394:38): [True: 0, False: 3]
  Branch (394:38): [True: 0, False: 0]
  Branch (394:38): [True: 0, False: 0]
  Branch (394:38): [Folded - Ignored]
  Branch (394:38): [True: 0, False: 80]
395
11
                    state.lru.peek_mut(key.borrow())
396
                } else {
397
335
                    state.lru.get_mut(key.borrow())
398
                };
399
346
                match maybe_entry {
400
188
                    Some(entry) => {
401
                        // Note: We need to check eviction because the item might be expired
402
                        // based on the current time. In such case, we remove the item while
403
                        // we are here.
404
188
                        if self.should_evict(lru_len, entry, 0, u64::MAX) {
  Branch (404:28): [True: 0, False: 0]
  Branch (404:28): [True: 0, False: 99]
  Branch (404:28): [True: 0, False: 0]
  Branch (404:28): [Folded - Ignored]
  Branch (404:28): [True: 1, False: 13]
  Branch (404:28): [True: 0, False: 0]
  Branch (404:28): [True: 0, False: 0]
  Branch (404:28): [True: 0, False: 3]
  Branch (404:28): [True: 0, False: 0]
  Branch (404:28): [True: 0, False: 0]
  Branch (404:28): [True: 0, False: 0]
  Branch (404:28): [True: 0, False: 0]
  Branch (404:28): [True: 0, False: 0]
  Branch (404:28): [True: 0, False: 0]
  Branch (404:28): [True: 0, False: 1]
  Branch (404:28): [True: 0, False: 0]
  Branch (404:28): [True: 0, False: 0]
  Branch (404:28): [Folded - Ignored]
  Branch (404:28): [True: 0, False: 71]
405
1
                            *result = None;
406
1
                            if let Some((key, eviction_item)) = state.lru.pop_entry(key.borrow()) {
  Branch (406:36): [True: 0, False: 0]
  Branch (406:36): [True: 0, False: 0]
  Branch (406:36): [True: 0, False: 0]
  Branch (406:36): [Folded - Ignored]
  Branch (406:36): [True: 1, False: 0]
  Branch (406:36): [True: 0, False: 0]
  Branch (406:36): [True: 0, False: 0]
  Branch (406:36): [True: 0, False: 0]
  Branch (406:36): [True: 0, False: 0]
  Branch (406:36): [True: 0, False: 0]
  Branch (406:36): [True: 0, False: 0]
  Branch (406:36): [True: 0, False: 0]
  Branch (406:36): [True: 0, False: 0]
  Branch (406:36): [True: 0, False: 0]
  Branch (406:36): [True: 0, False: 0]
  Branch (406:36): [True: 0, False: 0]
  Branch (406:36): [True: 0, False: 0]
  Branch (406:36): [Folded - Ignored]
  Branch (406:36): [True: 0, False: 0]
407
1
                                info!(?key, "Item expired, evicting");
408
1
                                let (data, futures) =
409
1
                                    state.remove(key.borrow(), &eviction_item, false);
410
                                // Store data for later unref - we can't drop state here as we're still iterating
411
1
                                data_to_unref.push(data);
412
1
                                removal_futures.extend(futures.into_iter());
413
0
                            }
414
                        } else {
415
187
                            if !peek {
  Branch (415:32): [True: 0, False: 0]
  Branch (415:32): [True: 99, False: 0]
  Branch (415:32): [True: 0, False: 0]
  Branch (415:32): [Folded - Ignored]
  Branch (415:32): [True: 13, False: 0]
  Branch (415:32): [True: 0, False: 0]
  Branch (415:32): [True: 0, False: 0]
  Branch (415:32): [True: 0, False: 3]
  Branch (415:32): [True: 0, False: 0]
  Branch (415:32): [True: 0, False: 0]
  Branch (415:32): [True: 0, False: 0]
  Branch (415:32): [True: 0, False: 0]
  Branch (415:32): [True: 0, False: 0]
  Branch (415:32): [True: 0, False: 0]
  Branch (415:32): [True: 1, False: 0]
  Branch (415:32): [True: 0, False: 0]
  Branch (415:32): [True: 0, False: 0]
  Branch (415:32): [Folded - Ignored]
  Branch (415:32): [True: 71, False: 0]
416
184
                                entry.seconds_since_anchor =
417
184
                                    i32::try_from(self.anchor_time.elapsed().as_secs())
418
184
                                        .unwrap_or(i32::MAX);
419
184
                            
}3
420
187
                            *result = Some(entry.data.len());
421
                        }
422
                    }
423
158
                    None => *result = None,
424
                }
425
            }
426
311
            (removal_futures, data_to_unref)
427
        };
428
429
        // Perform the async callbacks outside of the lock
430
311
        let mut callbacks: FuturesUnordered<_> = removal_futures.into_iter().collect();
431
311
        while callbacks.next().await.is_some() 
{}0
  Branch (431:15): [True: 0, False: 0]
  Branch (431:15): [True: 0, False: 191]
  Branch (431:15): [True: 0, False: 0]
  Branch (431:15): [Folded - Ignored]
  Branch (431:15): [True: 0, False: 25]
  Branch (431:15): [True: 0, False: 2]
  Branch (431:15): [True: 0, False: 5]
  Branch (431:15): [True: 0, False: 4]
  Branch (431:15): [True: 0, False: 0]
  Branch (431:15): [True: 0, False: 0]
  Branch (431:15): [True: 0, False: 0]
  Branch (431:15): [True: 0, False: 0]
  Branch (431:15): [True: 0, False: 0]
  Branch (431:15): [True: 0, False: 1]
  Branch (431:15): [True: 0, False: 3]
  Branch (431:15): [True: 0, False: 0]
  Branch (431:15): [True: 0, False: 0]
  Branch (431:15): [Folded - Ignored]
  Branch (431:15): [True: 0, False: 80]
432
311
        let mut callbacks: FuturesUnordered<_> =
433
311
            data_to_unref.iter().map(LenEntry::unref).collect();
434
312
        while callbacks.next().await.is_some() 
{}1
  Branch (434:15): [True: 0, False: 0]
  Branch (434:15): [True: 0, False: 191]
  Branch (434:15): [True: 0, False: 0]
  Branch (434:15): [Folded - Ignored]
  Branch (434:15): [True: 1, False: 25]
  Branch (434:15): [True: 0, False: 2]
  Branch (434:15): [True: 0, False: 5]
  Branch (434:15): [True: 0, False: 4]
  Branch (434:15): [True: 0, False: 0]
  Branch (434:15): [True: 0, False: 0]
  Branch (434:15): [True: 0, False: 0]
  Branch (434:15): [True: 0, False: 0]
  Branch (434:15): [True: 0, False: 0]
  Branch (434:15): [True: 0, False: 1]
  Branch (434:15): [True: 0, False: 3]
  Branch (434:15): [True: 0, False: 0]
  Branch (434:15): [True: 0, False: 0]
  Branch (434:15): [Folded - Ignored]
  Branch (434:15): [True: 0, False: 80]
435
311
    }
436
437
4.49k
    pub async fn get(&self, key: &Q) -> Option<T> {
438
        // Fast path: Check if we need eviction before acquiring lock for eviction
439
4.49k
        let needs_eviction = {
440
4.49k
            let state = self.state.lock();
441
4.49k
            if let Some((_, 
peek_entry4.48k
)) = state.lru.peek_lru() {
  Branch (441:20): [True: 0, False: 0]
  Branch (441:20): [True: 0, False: 0]
  Branch (441:20): [True: 4.36k, False: 4]
  Branch (441:20): [Folded - Ignored]
  Branch (441:20): [True: 1, False: 0]
  Branch (441:20): [True: 5, False: 0]
  Branch (441:20): [True: 0, False: 0]
  Branch (441:20): [True: 0, False: 0]
  Branch (441:20): [True: 1, False: 0]
  Branch (441:20): [True: 0, False: 0]
  Branch (441:20): [True: 1, False: 0]
  Branch (441:20): [True: 0, False: 0]
  Branch (441:20): [True: 9, False: 0]
  Branch (441:20): [True: 0, False: 0]
  Branch (441:20): [True: 4, False: 0]
  Branch (441:20): [True: 1, False: 1]
  Branch (441:20): [Folded - Ignored]
  Branch (441:20): [True: 41, False: 0]
  Branch (441:20): [True: 0, False: 0]
  Branch (441:20): [True: 57, False: 0]
442
4.48k
                self.should_evict(
443
4.48k
                    state.lru.len(),
444
4.48k
                    peek_entry,
445
4.48k
                    state.sum_store_size,
446
4.48k
                    self.max_bytes,
447
                )
448
            } else {
449
5
                false
450
            }
451
        };
452
453
        // Perform eviction if needed
454
4.49k
        if needs_eviction {
  Branch (454:12): [True: 0, False: 0]
  Branch (454:12): [True: 0, False: 0]
  Branch (454:12): [True: 0, False: 4.37k]
  Branch (454:12): [Folded - Ignored]
  Branch (454:12): [True: 0, False: 1]
  Branch (454:12): [True: 2, False: 3]
  Branch (454:12): [True: 0, False: 0]
  Branch (454:12): [True: 0, False: 0]
  Branch (454:12): [True: 0, False: 1]
  Branch (454:12): [True: 0, False: 0]
  Branch (454:12): [True: 0, False: 1]
  Branch (454:12): [True: 0, False: 0]
  Branch (454:12): [True: 0, False: 9]
  Branch (454:12): [True: 0, False: 0]
  Branch (454:12): [True: 0, False: 4]
  Branch (454:12): [True: 0, False: 2]
  Branch (454:12): [Folded - Ignored]
  Branch (454:12): [True: 0, False: 41]
  Branch (454:12): [True: 0, False: 0]
  Branch (454:12): [True: 0, False: 57]
455
2
            let (items_to_unref, removal_futures) = {
456
2
                let mut state = self.state.lock();
457
2
                self.evict_items(&mut *state)
458
2
            };
459
            // Unref items outside of lock
460
2
            let mut callbacks: FuturesUnordered<_> = removal_futures.into_iter().collect();
461
2
            while callbacks.next().await.is_some() 
{}0
  Branch (461:19): [True: 0, False: 0]
  Branch (461:19): [True: 0, False: 0]
  Branch (461:19): [True: 0, False: 0]
  Branch (461:19): [Folded - Ignored]
  Branch (461:19): [True: 0, False: 0]
  Branch (461:19): [True: 0, False: 2]
  Branch (461:19): [True: 0, False: 0]
  Branch (461:19): [True: 0, False: 0]
  Branch (461:19): [True: 0, False: 0]
  Branch (461:19): [True: 0, False: 0]
  Branch (461:19): [True: 0, False: 0]
  Branch (461:19): [True: 0, False: 0]
  Branch (461:19): [True: 0, False: 0]
  Branch (461:19): [True: 0, False: 0]
  Branch (461:19): [True: 0, False: 0]
  Branch (461:19): [True: 0, False: 0]
  Branch (461:19): [Folded - Ignored]
  Branch (461:19): [True: 0, False: 0]
  Branch (461:19): [True: 0, False: 0]
  Branch (461:19): [True: 0, False: 0]
462
2
            let mut callbacks: FuturesUnordered<_> =
463
2
                items_to_unref.iter().map(LenEntry::unref).collect();
464
4
            while callbacks.next().await.is_some() 
{}2
  Branch (464:19): [True: 0, False: 0]
  Branch (464:19): [True: 0, False: 0]
  Branch (464:19): [True: 0, False: 0]
  Branch (464:19): [Folded - Ignored]
  Branch (464:19): [True: 0, False: 0]
  Branch (464:19): [True: 2, False: 2]
  Branch (464:19): [True: 0, False: 0]
  Branch (464:19): [True: 0, False: 0]
  Branch (464:19): [True: 0, False: 0]
  Branch (464:19): [True: 0, False: 0]
  Branch (464:19): [True: 0, False: 0]
  Branch (464:19): [True: 0, False: 0]
  Branch (464:19): [True: 0, False: 0]
  Branch (464:19): [True: 0, False: 0]
  Branch (464:19): [True: 0, False: 0]
  Branch (464:19): [True: 0, False: 0]
  Branch (464:19): [Folded - Ignored]
  Branch (464:19): [True: 0, False: 0]
  Branch (464:19): [True: 0, False: 0]
  Branch (464:19): [True: 0, False: 0]
465
4.49k
        }
466
467
        // Now get the item
468
4.49k
        let mut state = self.state.lock();
469
4.49k
        let 
entry4.48k
= state.lru.get_mut(key.borrow())
?10
;
470
4.48k
        entry.seconds_since_anchor =
471
4.48k
            i32::try_from(self.anchor_time.elapsed().as_secs()).unwrap_or(i32::MAX);
472
4.48k
        Some(entry.data.clone())
473
4.49k
    }
474
475
    /// Returns the replaced item if any.
476
5.94k
    pub async fn insert(&self, key: K, data: T) -> Option<T>
477
5.94k
    where
478
5.94k
        K: 'static,
479
5.94k
    {
480
5.94k
        self.insert_with_time(
481
5.94k
            key,
482
5.94k
            data,
483
5.94k
            i32::try_from(self.anchor_time.elapsed().as_secs()).unwrap_or(i32::MAX),
484
5.94k
        )
485
5.94k
        .await
486
5.94k
    }
487
488
    /// Returns the replaced item if any.
489
5.94k
    pub async fn insert_with_time(&self, key: K, data: T, seconds_since_anchor: i32) -> Option<T> {
490
5.94k
        let (items_to_unref, removal_futures) = {
491
5.94k
            let mut state = self.state.lock();
492
5.94k
            self.inner_insert_many(&mut state, [(key, data)], seconds_since_anchor)
493
5.94k
        };
494
495
5.94k
        let mut futures: FuturesUnordered<_> = removal_futures.into_iter().collect();
496
5.94k
        while futures.next().await.is_some() 
{}1
  Branch (496:15): [True: 0, False: 0]
  Branch (496:15): [True: 1, False: 5.75k]
  Branch (496:15): [True: 0, False: 0]
  Branch (496:15): [True: 0, False: 0]
  Branch (496:15): [Folded - Ignored]
  Branch (496:15): [True: 0, False: 2]
  Branch (496:15): [True: 0, False: 27]
  Branch (496:15): [True: 0, False: 3]
  Branch (496:15): [True: 0, False: 0]
  Branch (496:15): [True: 0, False: 4]
  Branch (496:15): [True: 0, False: 2]
  Branch (496:15): [True: 0, False: 2]
  Branch (496:15): [True: 0, False: 2]
  Branch (496:15): [True: 0, False: 2]
  Branch (496:15): [True: 0, False: 2]
  Branch (496:15): [True: 0, False: 1]
  Branch (496:15): [True: 0, False: 16]
  Branch (496:15): [True: 0, False: 0]
  Branch (496:15): [True: 0, False: 2]
  Branch (496:15): [Folded - Ignored]
  Branch (496:15): [True: 0, False: 97]
  Branch (496:15): [True: 0, False: 27]
497
498
        // Unref items outside of lock
499
5.94k
        let futures: FuturesUnordered<_> = items_to_unref
500
5.94k
            .into_iter()
501
5.94k
            .map(|item| async move 
{54
502
54
                item.unref().await;
503
54
                item
504
108
            })
505
5.94k
            .collect();
506
5.94k
        futures.collect::<Vec<_>>().await.into_iter().next()
507
5.94k
    }
508
509
    /// Same as `insert()`, but optimized for multiple inserts.
510
    /// Returns the replaced items if any.
511
7
    pub async fn insert_many<It>(&self, inserts: It) -> Vec<T>
512
7
    where
513
7
        It: IntoIterator<Item = (K, T)> + Send,
514
7
        // Note: It's not enough to have the inserts themselves be Send. The
515
7
        // returned iterator should be Send as well.
516
7
        <It as IntoIterator>::IntoIter: Send,
517
7
        K: 'static,
518
7
    {
519
7
        let mut inserts = inserts.into_iter().peekable();
520
        // Shortcut for cases where there are no inserts, so we don't need to lock.
521
7
        if inserts.peek().is_none() {
  Branch (521:12): [True: 0, False: 0]
  Branch (521:12): [Folded - Ignored]
  Branch (521:12): [True: 1, False: 1]
  Branch (521:12): [True: 4, False: 1]
  Branch (521:12): [Folded - Ignored]
522
5
            return Vec::new();
523
2
        }
524
525
2
        let (items_to_unref, removal_futures) = {
526
2
            let mut state = self.state.lock();
527
2
            self.inner_insert_many(
528
2
                &mut state,
529
2
                inserts,
530
2
                i32::try_from(self.anchor_time.elapsed().as_secs()).unwrap_or(i32::MAX),
531
2
            )
532
2
        };
533
534
2
        let mut futures: FuturesUnordered<_> = removal_futures.into_iter().collect();
535
2
        while futures.next().await.is_some() 
{}0
  Branch (535:15): [True: 0, False: 0]
  Branch (535:15): [Folded - Ignored]
  Branch (535:15): [True: 0, False: 1]
  Branch (535:15): [True: 0, False: 1]
  Branch (535:15): [Folded - Ignored]
536
537
        // Unref items outside of lock
538
2
        items_to_unref
539
2
            .into_iter()
540
2
            .map(|item| async move 
{0
541
0
                item.unref().await;
542
0
                item
543
0
            })
544
2
            .collect::<FuturesUnordered<_>>()
545
2
            .collect::<Vec<_>>()
546
2
            .await
547
7
    }
548
549
5.95k
    fn inner_insert_many<It>(
550
5.95k
        &self,
551
5.95k
        state: &mut State<K, Q, T, C>,
552
5.95k
        inserts: It,
553
5.95k
        seconds_since_anchor: i32,
554
5.95k
    ) -> (Vec<T>, Vec<RemoveFuture>)
555
5.95k
    where
556
5.95k
        It: IntoIterator<Item = (K, T)> + Send,
557
5.95k
        // Note: It's not enough to have the inserts themselves be Send. The
558
5.95k
        // returned iterator should be Send as well.
559
5.95k
        <It as IntoIterator>::IntoIter: Send,
560
    {
561
5.95k
        let mut replaced_items = Vec::new();
562
5.95k
        let mut removal_futures = Vec::new();
563
11.9k
        for (
key5.95k
,
data5.95k
) in inserts {
564
5.95k
            let new_item_size = data.len();
565
5.95k
            let eviction_item = EvictionItem {
566
5.95k
                seconds_since_anchor,
567
5.95k
                data,
568
5.95k
            };
569
570
5.95k
            if let Some((
old_item40
,
futures40
)) = state.put(&key, eviction_item) {
  Branch (570:20): [True: 0, False: 0]
  Branch (570:20): [True: 0, False: 0]
  Branch (570:20): [True: 18, False: 5.74k]
  Branch (570:20): [True: 0, False: 0]
  Branch (570:20): [True: 0, False: 0]
  Branch (570:20): [Folded - Ignored]
  Branch (570:20): [True: 1, False: 1]
  Branch (570:20): [True: 0, False: 27]
  Branch (570:20): [True: 0, False: 3]
  Branch (570:20): [True: 0, False: 0]
  Branch (570:20): [True: 0, False: 1]
  Branch (570:20): [True: 0, False: 4]
  Branch (570:20): [True: 0, False: 1]
  Branch (570:20): [True: 0, False: 2]
  Branch (570:20): [True: 0, False: 2]
  Branch (570:20): [True: 0, False: 2]
  Branch (570:20): [True: 1, False: 1]
  Branch (570:20): [True: 1, False: 1]
  Branch (570:20): [True: 0, False: 1]
  Branch (570:20): [True: 3, False: 13]
  Branch (570:20): [True: 0, False: 0]
  Branch (570:20): [True: 0, False: 2]
  Branch (570:20): [Folded - Ignored]
  Branch (570:20): [True: 16, False: 81]
  Branch (570:20): [True: 0, False: 27]
571
40
                removal_futures.extend(futures.into_iter());
572
40
                replaced_items.push(old_item);
573
5.91k
            }
574
5.95k
            state.sum_store_size += new_item_size;
575
5.95k
            state.lifetime_inserted_bytes.add(new_item_size);
576
        }
577
578
        // Perform eviction after all insertions
579
5.95k
        let (items_to_unref, futures) = self.evict_items(state);
580
5.95k
        removal_futures.extend(futures);
581
582
        // Note: We cannot drop the state lock here since we're borrowing it,
583
        // but the caller will handle unreffing these items after releasing the lock
584
5.95k
        replaced_items.extend(items_to_unref);
585
586
5.95k
        (replaced_items, removal_futures)
587
5.95k
    }
588
589
15
    pub async fn remove(&self, key: &Q) -> bool {
590
15
        let (items_to_unref, removed_item, removal_futures) = {
591
15
            let mut state = self.state.lock();
592
593
            // First perform eviction
594
15
            let (evicted_items, mut removal_futures) = self.evict_items(&mut *state);
595
596
            // Then try to remove the requested item
597
15
            let removed = if let Some(
entry14
) = state.lru.pop(key.borrow()) {
  Branch (597:34): [True: 0, False: 0]
  Branch (597:34): [True: 0, False: 0]
  Branch (597:34): [Folded - Ignored]
  Branch (597:34): [True: 6, False: 0]
  Branch (597:34): [True: 1, False: 0]
  Branch (597:34): [True: 1, False: 1]
  Branch (597:34): [True: 1, False: 0]
  Branch (597:34): [True: 1, False: 0]
  Branch (597:34): [True: 2, False: 0]
  Branch (597:34): [True: 0, False: 0]
  Branch (597:34): [True: 0, False: 0]
  Branch (597:34): [True: 0, False: 0]
  Branch (597:34): [True: 0, False: 0]
  Branch (597:34): [True: 0, False: 0]
  Branch (597:34): [True: 0, False: 0]
  Branch (597:34): [True: 2, False: 0]
  Branch (597:34): [True: 0, False: 0]
  Branch (597:34): [True: 0, False: 0]
  Branch (597:34): [Folded - Ignored]
  Branch (597:34): [True: 0, False: 0]
598
14
                let (removed_item, more_removal_futures) = state.remove(key, &entry, false);
599
14
                removal_futures.extend(more_removal_futures.into_iter());
600
14
                Some(removed_item)
601
            } else {
602
1
                None
603
            };
604
605
15
            (evicted_items, removed, removal_futures)
606
        };
607
608
15
        let mut callbacks: FuturesUnordered<_> = removal_futures.into_iter().collect();
609
16
        while callbacks.next().await.is_some() 
{}1
  Branch (609:15): [True: 0, False: 0]
  Branch (609:15): [True: 0, False: 0]
  Branch (609:15): [Folded - Ignored]
  Branch (609:15): [True: 0, False: 6]
  Branch (609:15): [True: 0, False: 1]
  Branch (609:15): [True: 0, False: 2]
  Branch (609:15): [True: 0, False: 1]
  Branch (609:15): [True: 1, False: 1]
  Branch (609:15): [True: 0, False: 2]
  Branch (609:15): [True: 0, False: 0]
  Branch (609:15): [True: 0, False: 0]
  Branch (609:15): [True: 0, False: 0]
  Branch (609:15): [True: 0, False: 0]
  Branch (609:15): [True: 0, False: 0]
  Branch (609:15): [True: 0, False: 0]
  Branch (609:15): [True: 0, False: 2]
  Branch (609:15): [True: 0, False: 0]
  Branch (609:15): [True: 0, False: 0]
  Branch (609:15): [Folded - Ignored]
  Branch (609:15): [True: 0, False: 0]
610
611
        // Unref evicted items outside of lock
612
15
        let mut callbacks: FuturesUnordered<_> =
613
15
            items_to_unref.iter().map(LenEntry::unref).collect();
614
16
        while callbacks.next().await.is_some() 
{}1
  Branch (614:15): [True: 0, False: 0]
  Branch (614:15): [True: 0, False: 0]
  Branch (614:15): [Folded - Ignored]
  Branch (614:15): [True: 0, False: 6]
  Branch (614:15): [True: 0, False: 1]
  Branch (614:15): [True: 1, False: 2]
  Branch (614:15): [True: 0, False: 1]
  Branch (614:15): [True: 0, False: 1]
  Branch (614:15): [True: 0, False: 2]
  Branch (614:15): [True: 0, False: 0]
  Branch (614:15): [True: 0, False: 0]
  Branch (614:15): [True: 0, False: 0]
  Branch (614:15): [True: 0, False: 0]
  Branch (614:15): [True: 0, False: 0]
  Branch (614:15): [True: 0, False: 0]
  Branch (614:15): [True: 0, False: 2]
  Branch (614:15): [True: 0, False: 0]
  Branch (614:15): [True: 0, False: 0]
  Branch (614:15): [Folded - Ignored]
  Branch (614:15): [True: 0, False: 0]
615
616
        // Unref removed item if any
617
15
        if let Some(
item14
) = removed_item {
  Branch (617:16): [True: 0, False: 0]
  Branch (617:16): [True: 0, False: 0]
  Branch (617:16): [Folded - Ignored]
  Branch (617:16): [True: 6, False: 0]
  Branch (617:16): [True: 1, False: 0]
  Branch (617:16): [True: 1, False: 1]
  Branch (617:16): [True: 1, False: 0]
  Branch (617:16): [True: 1, False: 0]
  Branch (617:16): [True: 2, False: 0]
  Branch (617:16): [True: 0, False: 0]
  Branch (617:16): [True: 0, False: 0]
  Branch (617:16): [True: 0, False: 0]
  Branch (617:16): [True: 0, False: 0]
  Branch (617:16): [True: 0, False: 0]
  Branch (617:16): [True: 0, False: 0]
  Branch (617:16): [True: 2, False: 0]
  Branch (617:16): [True: 0, False: 0]
  Branch (617:16): [True: 0, False: 0]
  Branch (617:16): [Folded - Ignored]
  Branch (617:16): [True: 0, False: 0]
618
14
            item.unref().await;
619
14
            return true;
620
1
        }
621
622
1
        false
623
15
    }
624
625
    /// Same as `remove()`, but allows for a conditional to be applied to the
626
    /// entry before removal in an atomic fashion.
627
1
    pub async fn remove_if<F>(&self, key: &Q, cond: F) -> bool
628
1
    where
629
1
        F: FnOnce(&T) -> bool + Send,
630
1
    {
631
1
        let (evicted_items, removal_futures, removed_item) = {
632
1
            let mut state = self.state.lock();
633
1
            if let Some(entry) = state.lru.get(key.borrow()) {
  Branch (633:20): [True: 0, False: 0]
  Branch (633:20): [Folded - Ignored]
  Branch (633:20): [True: 0, False: 0]
  Branch (633:20): [True: 0, False: 0]
  Branch (633:20): [True: 0, False: 0]
  Branch (633:20): [True: 0, False: 0]
  Branch (633:20): [True: 0, False: 0]
  Branch (633:20): [True: 1, False: 0]
  Branch (633:20): [True: 0, False: 0]
  Branch (633:20): [True: 0, False: 0]
  Branch (633:20): [True: 0, False: 0]
  Branch (633:20): [Folded - Ignored]
  Branch (633:20): [True: 0, False: 0]
634
1
                if !cond(&entry.data) {
  Branch (634:20): [True: 0, False: 0]
  Branch (634:20): [Folded - Ignored]
  Branch (634:20): [True: 0, False: 0]
  Branch (634:20): [True: 0, False: 0]
  Branch (634:20): [True: 0, False: 0]
  Branch (634:20): [True: 0, False: 0]
  Branch (634:20): [True: 0, False: 0]
  Branch (634:20): [True: 0, False: 1]
  Branch (634:20): [True: 0, False: 0]
  Branch (634:20): [True: 0, False: 0]
  Branch (634:20): [True: 0, False: 0]
  Branch (634:20): [Folded - Ignored]
  Branch (634:20): [True: 0, False: 0]
635
0
                    return false;
636
1
                }
637
                // First perform eviction
638
1
                let (evicted_items, mut removal_futures) = self.evict_items(&mut state);
639
640
                // Then try to remove the requested item
641
1
                let removed_item = if let Some(entry) = state.lru.pop(key.borrow()) {
  Branch (641:43): [True: 0, False: 0]
  Branch (641:43): [Folded - Ignored]
  Branch (641:43): [True: 0, False: 0]
  Branch (641:43): [True: 0, False: 0]
  Branch (641:43): [True: 0, False: 0]
  Branch (641:43): [True: 0, False: 0]
  Branch (641:43): [True: 0, False: 0]
  Branch (641:43): [True: 1, False: 0]
  Branch (641:43): [True: 0, False: 0]
  Branch (641:43): [True: 0, False: 0]
  Branch (641:43): [True: 0, False: 0]
  Branch (641:43): [Folded - Ignored]
  Branch (641:43): [True: 0, False: 0]
642
1
                    let (item, more_removal_futures) = state.remove(key, &entry, false);
643
1
                    removal_futures.extend(more_removal_futures.into_iter());
644
1
                    Some(item)
645
                } else {
646
0
                    None
647
                };
648
649
1
                (evicted_items, removal_futures, removed_item)
650
            } else {
651
0
                (vec![], vec![].into_iter().collect(), None)
652
            }
653
        };
654
655
        // Perform the async callbacks outside of the lock
656
1
        let mut removal_futures: FuturesUnordered<_> = removal_futures.into_iter().collect();
657
1
        while removal_futures.next().await.is_some() 
{}0
  Branch (657:15): [True: 0, False: 0]
  Branch (657:15): [Folded - Ignored]
  Branch (657:15): [True: 0, False: 0]
  Branch (657:15): [True: 0, False: 0]
  Branch (657:15): [True: 0, False: 0]
  Branch (657:15): [True: 0, False: 0]
  Branch (657:15): [True: 0, False: 0]
  Branch (657:15): [True: 0, False: 1]
  Branch (657:15): [True: 0, False: 0]
  Branch (657:15): [True: 0, False: 0]
  Branch (657:15): [True: 0, False: 0]
  Branch (657:15): [Folded - Ignored]
  Branch (657:15): [True: 0, False: 0]
658
659
        // Unref evicted items
660
1
        let mut callbacks: FuturesUnordered<_> =
661
1
            evicted_items.iter().map(LenEntry::unref).collect();
662
1
        while callbacks.next().await.is_some() 
{}0
  Branch (662:15): [True: 0, False: 0]
  Branch (662:15): [Folded - Ignored]
  Branch (662:15): [True: 0, False: 0]
  Branch (662:15): [True: 0, False: 0]
  Branch (662:15): [True: 0, False: 0]
  Branch (662:15): [True: 0, False: 0]
  Branch (662:15): [True: 0, False: 0]
  Branch (662:15): [True: 0, False: 1]
  Branch (662:15): [True: 0, False: 0]
  Branch (662:15): [True: 0, False: 0]
  Branch (662:15): [True: 0, False: 0]
  Branch (662:15): [Folded - Ignored]
  Branch (662:15): [True: 0, False: 0]
663
664
        // Unref removed item if any
665
1
        if let Some(item) = removed_item {
  Branch (665:16): [True: 0, False: 0]
  Branch (665:16): [Folded - Ignored]
  Branch (665:16): [True: 0, False: 0]
  Branch (665:16): [True: 0, False: 0]
  Branch (665:16): [True: 0, False: 0]
  Branch (665:16): [True: 0, False: 0]
  Branch (665:16): [True: 0, False: 0]
  Branch (665:16): [True: 1, False: 0]
  Branch (665:16): [True: 0, False: 0]
  Branch (665:16): [True: 0, False: 0]
  Branch (665:16): [True: 0, False: 0]
  Branch (665:16): [Folded - Ignored]
  Branch (665:16): [True: 0, False: 0]
666
1
            item.unref().await;
667
1
            true
668
        } else {
669
0
            false
670
        }
671
1
    }
672
673
5
    pub fn add_remove_callback(&self, callback: C) {
674
5
        self.state.lock().add_remove_callback(callback);
675
5
    }
676
}