/build/source/nativelink-util/src/evicting_map.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // See LICENSE file for details |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use core::borrow::Borrow; |
16 | | use core::cmp::Eq; |
17 | | use core::fmt::Debug; |
18 | | use core::future::Future; |
19 | | use core::hash::Hash; |
20 | | use core::marker::PhantomData; |
21 | | use core::ops::RangeBounds; |
22 | | use core::pin::Pin; |
23 | | use std::collections::BTreeSet; |
24 | | use std::sync::Arc; |
25 | | |
26 | | use futures::StreamExt; |
27 | | use futures::stream::FuturesUnordered; |
28 | | use lru::LruCache; |
29 | | use nativelink_config::stores::EvictionPolicy; |
30 | | use nativelink_metric::MetricsComponent; |
31 | | use parking_lot::Mutex; |
32 | | use serde::{Deserialize, Serialize}; |
33 | | use tracing::{debug, info}; |
34 | | |
35 | | use crate::instant_wrapper::InstantWrapper; |
36 | | use crate::metrics_utils::{Counter, CounterWithTime}; |
37 | | |
38 | | #[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone)] |
39 | | pub struct SerializedLRU<K> { |
40 | | pub data: Vec<(K, i32)>, |
41 | | pub anchor_time: u64, |
42 | | } |
43 | | |
44 | | #[derive(Debug)] |
45 | | struct EvictionItem<T: LenEntry + Debug> { |
46 | | seconds_since_anchor: i32, |
47 | | data: T, |
48 | | } |
49 | | |
50 | | pub trait LenEntry: 'static { |
51 | | /// Length of referenced data. |
52 | | fn len(&self) -> u64; |
53 | | |
54 | | /// Returns `true` if `self` has zero length. |
55 | | fn is_empty(&self) -> bool; |
56 | | |
57 | | /// This will be called when object is removed from map. |
58 | | /// Note: There may still be a reference to it held somewhere else, which |
59 | | /// is why it can't be mutable. This is a good place to mark the item |
60 | | /// to be deleted and then in the Drop call actually do the deleting. |
61 | | /// This will ensure nowhere else in the program still holds a reference |
62 | | /// to this object. |
63 | | /// You should not rely only on the Drop trait. Doing so might result in the |
64 | | /// program safely shutting down and calling the Drop method on each object, |
65 | | /// which if you are deleting items you may not want to do. |
66 | | /// It is undefined behavior to have `unref()` called more than once. |
67 | | /// During the execution of `unref()` no items can be added or removed to/from |
68 | | /// the `EvictionMap` globally (including inside `unref()`). |
69 | | #[inline] |
70 | 46 | fn unref(&self) -> impl Future<Output = ()> + Send { |
71 | 46 | core::future::ready(()) |
72 | 46 | } |
73 | | } |
74 | | |
75 | | impl<T: LenEntry + Send + Sync> LenEntry for Arc<T> { |
76 | | #[inline] |
77 | 283 | fn len(&self) -> u64 { |
78 | 283 | T::len(self.as_ref()) |
79 | 283 | } |
80 | | |
81 | | #[inline] |
82 | 0 | fn is_empty(&self) -> bool { |
83 | 0 | T::is_empty(self.as_ref()) |
84 | 0 | } |
85 | | |
86 | | #[inline] |
87 | 28 | async fn unref(&self) { |
88 | 28 | self.as_ref().unref().await; |
89 | 28 | } |
90 | | } |
91 | | |
92 | | // Callback to be called when the EvictingMap removes an item |
93 | | // either via eviction or direct deletion. This will be called with |
94 | | // whatever key type the EvictingMap uses. |
95 | | pub trait RemoveItemCallback<Q>: Debug + Send + Sync { |
96 | | fn callback(&self, store_key: &Q) -> Pin<Box<dyn Future<Output = ()> + Send>>; |
97 | | } |
98 | | |
99 | | #[derive(Debug, MetricsComponent)] |
100 | | struct State< |
101 | | K: Ord + Hash + Eq + Clone + Debug + Send + Borrow<Q>, |
102 | | Q: Ord + Hash + Eq + Debug, |
103 | | T: LenEntry + Debug + Send, |
104 | | C: RemoveItemCallback<Q>, |
105 | | > { |
106 | | lru: LruCache<K, EvictionItem<T>>, |
107 | | btree: Option<BTreeSet<K>>, |
108 | | #[metric(help = "Total size of all items in the store")] |
109 | | sum_store_size: u64, |
110 | | |
111 | | #[metric(help = "Number of bytes evicted from the store")] |
112 | | evicted_bytes: Counter, |
113 | | #[metric(help = "Number of items evicted from the store")] |
114 | | evicted_items: CounterWithTime, |
115 | | #[metric(help = "Number of bytes replaced in the store")] |
116 | | replaced_bytes: Counter, |
117 | | #[metric(help = "Number of items replaced in the store")] |
118 | | replaced_items: CounterWithTime, |
119 | | #[metric(help = "Number of bytes inserted into the store since it was created")] |
120 | | lifetime_inserted_bytes: Counter, |
121 | | |
122 | | _key_type: PhantomData<Q>, |
123 | | remove_callbacks: Vec<C>, |
124 | | } |
125 | | |
126 | | type RemoveFuture = Pin<Box<dyn Future<Output = ()> + Send>>; |
127 | | |
128 | | impl< |
129 | | K: Ord + Hash + Eq + Clone + Debug + Send + Sync + Borrow<Q>, |
130 | | Q: Ord + Hash + Eq + Debug + Sync, |
131 | | T: LenEntry + Debug + Sync + Send, |
132 | | C: RemoveItemCallback<Q>, |
133 | | > State<K, Q, T, C> |
134 | | { |
135 | | /// Removes an item from the cache and returns the data for deferred cleanup. |
136 | | /// The caller is responsible for calling `unref()` on the returned data outside of the lock. |
137 | | #[must_use] |
138 | 73 | fn remove( |
139 | 73 | &mut self, |
140 | 73 | key: &Q, |
141 | 73 | eviction_item: &EvictionItem<T>, |
142 | 73 | replaced: bool, |
143 | 73 | ) -> (T, Vec<RemoveFuture>) |
144 | 73 | where |
145 | 73 | T: Clone, |
146 | | { |
147 | 73 | if let Some(btree0 ) = &mut self.btree { Branch (147:16): [True: 0, False: 0]
Branch (147:16): [True: 0, False: 0]
Branch (147:16): [True: 0, False: 20]
Branch (147:16): [True: 0, False: 0]
Branch (147:16): [Folded - Ignored]
Branch (147:16): [True: 0, False: 6]
Branch (147:16): [True: 0, False: 1]
Branch (147:16): [True: 0, False: 1]
Branch (147:16): [True: 0, False: 14]
Branch (147:16): [True: 0, False: 0]
Branch (147:16): [True: 0, False: 1]
Branch (147:16): [True: 0, False: 3]
Branch (147:16): [True: 0, False: 0]
Branch (147:16): [True: 0, False: 1]
Branch (147:16): [True: 0, False: 1]
Branch (147:16): [True: 0, False: 1]
Branch (147:16): [True: 0, False: 1]
Branch (147:16): [True: 0, False: 1]
Branch (147:16): [True: 0, False: 5]
Branch (147:16): [True: 0, False: 0]
Branch (147:16): [True: 0, False: 0]
Branch (147:16): [True: 0, False: 0]
Branch (147:16): [Folded - Ignored]
Branch (147:16): [True: 0, False: 16]
Branch (147:16): [True: 0, False: 0]
Branch (147:16): [True: 0, False: 1]
|
148 | 0 | btree.remove(key); |
149 | 73 | } |
150 | 73 | self.sum_store_size -= eviction_item.data.len(); |
151 | 73 | if replaced { Branch (151:12): [True: 0, False: 0]
Branch (151:12): [True: 0, False: 0]
Branch (151:12): [True: 18, False: 2]
Branch (151:12): [True: 0, False: 0]
Branch (151:12): [Folded - Ignored]
Branch (151:12): [True: 0, False: 6]
Branch (151:12): [True: 0, False: 1]
Branch (151:12): [True: 1, False: 0]
Branch (151:12): [True: 0, False: 14]
Branch (151:12): [True: 0, False: 0]
Branch (151:12): [True: 0, False: 1]
Branch (151:12): [True: 0, False: 3]
Branch (151:12): [True: 0, False: 0]
Branch (151:12): [True: 0, False: 1]
Branch (151:12): [True: 0, False: 1]
Branch (151:12): [True: 1, False: 0]
Branch (151:12): [True: 1, False: 0]
Branch (151:12): [True: 0, False: 1]
Branch (151:12): [True: 3, False: 2]
Branch (151:12): [True: 0, False: 0]
Branch (151:12): [True: 0, False: 0]
Branch (151:12): [True: 0, False: 0]
Branch (151:12): [Folded - Ignored]
Branch (151:12): [True: 16, False: 0]
Branch (151:12): [True: 0, False: 0]
Branch (151:12): [True: 0, False: 1]
|
152 | 40 | self.replaced_items.inc(); |
153 | 40 | self.replaced_bytes.add(eviction_item.data.len()); |
154 | 40 | } else { |
155 | 33 | self.evicted_items.inc(); |
156 | 33 | self.evicted_bytes.add(eviction_item.data.len()); |
157 | 33 | } |
158 | | |
159 | 73 | let callbacks = self |
160 | 73 | .remove_callbacks |
161 | 73 | .iter() |
162 | 73 | .map(|callback| callback2 .callback2 (key2 )) |
163 | 73 | .collect(); |
164 | | |
165 | | // Return the data for deferred unref outside of lock |
166 | 73 | (eviction_item.data.clone(), callbacks) |
167 | 73 | } |
168 | | |
169 | | /// Inserts a new item into the cache. If the key already exists, the old item is returned |
170 | | /// for deferred cleanup. |
171 | | #[must_use] |
172 | 5.95k | fn put(&mut self, key: &K, eviction_item: EvictionItem<T>) -> Option<(T, Vec<RemoveFuture>)> |
173 | 5.95k | where |
174 | 5.95k | K: Clone, |
175 | 5.95k | T: Clone, |
176 | | { |
177 | | // If we are maintaining a btree index, we need to update it. |
178 | 5.95k | if let Some(btree0 ) = &mut self.btree { Branch (178:16): [True: 0, False: 0]
Branch (178:16): [True: 0, False: 0]
Branch (178:16): [True: 0, False: 5.75k]
Branch (178:16): [True: 0, False: 0]
Branch (178:16): [Folded - Ignored]
Branch (178:16): [True: 0, False: 2]
Branch (178:16): [True: 0, False: 27]
Branch (178:16): [True: 0, False: 3]
Branch (178:16): [True: 0, False: 6]
Branch (178:16): [True: 0, False: 2]
Branch (178:16): [True: 0, False: 2]
Branch (178:16): [True: 0, False: 2]
Branch (178:16): [True: 0, False: 2]
Branch (178:16): [True: 0, False: 2]
Branch (178:16): [True: 0, False: 1]
Branch (178:16): [True: 0, False: 16]
Branch (178:16): [True: 0, False: 0]
Branch (178:16): [True: 0, False: 2]
Branch (178:16): [Folded - Ignored]
Branch (178:16): [True: 0, False: 97]
Branch (178:16): [True: 0, False: 27]
|
179 | 0 | btree.insert(key.clone()); |
180 | 5.95k | } |
181 | 5.95k | self.lru |
182 | 5.95k | .put(key.clone(), eviction_item) |
183 | 5.95k | .map(|old_item| self40 .remove40 (key.borrow()40 , &old_item40 , true)) |
184 | 5.95k | } |
185 | | |
186 | 5 | fn add_remove_callback(&mut self, callback: C) { |
187 | 5 | self.remove_callbacks.push(callback); |
188 | 5 | } |
189 | | } |
190 | | |
191 | | #[derive(Debug, Clone, Copy)] |
192 | | pub struct NoopRemove; |
193 | | |
194 | | impl<Q> RemoveItemCallback<Q> for NoopRemove { |
195 | 0 | fn callback(&self, _store_key: &Q) -> Pin<Box<dyn Future<Output = ()> + Send>> { |
196 | 0 | Box::pin(async {}) |
197 | 0 | } |
198 | | } |
199 | | |
200 | | #[derive(Debug, MetricsComponent)] |
201 | | pub struct EvictingMap< |
202 | | K: Ord + Hash + Eq + Clone + Debug + Send + Borrow<Q>, |
203 | | Q: Ord + Hash + Eq + Debug, |
204 | | T: LenEntry + Debug + Send, |
205 | | I: InstantWrapper, |
206 | | C: RemoveItemCallback<Q> = NoopRemove, |
207 | | > { |
208 | | #[metric] |
209 | | state: Mutex<State<K, Q, T, C>>, |
210 | | anchor_time: I, |
211 | | #[metric(help = "Maximum size of the store in bytes")] |
212 | | max_bytes: u64, |
213 | | #[metric(help = "Number of bytes to evict when the store is full")] |
214 | | evict_bytes: u64, |
215 | | #[metric(help = "Maximum number of seconds to keep an item in the store")] |
216 | | max_seconds: i32, |
217 | | #[metric(help = "Maximum number of items to keep in the store")] |
218 | | max_count: u64, |
219 | | } |
220 | | |
221 | | impl<K, Q, T, I, C> EvictingMap<K, Q, T, I, C> |
222 | | where |
223 | | K: Ord + Hash + Eq + Clone + Debug + Send + Sync + Borrow<Q>, |
224 | | Q: Ord + Hash + Eq + Debug + Sync, |
225 | | T: LenEntry + Debug + Clone + Send + Sync, |
226 | | I: InstantWrapper, |
227 | | C: RemoveItemCallback<Q>, |
228 | | { |
229 | 422 | pub fn new(config: &EvictionPolicy, anchor_time: I) -> Self { |
230 | 422 | Self { |
231 | 422 | // We use unbounded because if we use the bounded version we can't call the delete |
232 | 422 | // function on the LenEntry properly. |
233 | 422 | state: Mutex::new(State { |
234 | 422 | lru: LruCache::unbounded(), |
235 | 422 | btree: None, |
236 | 422 | sum_store_size: 0, |
237 | 422 | evicted_bytes: Counter::default(), |
238 | 422 | evicted_items: CounterWithTime::default(), |
239 | 422 | replaced_bytes: Counter::default(), |
240 | 422 | replaced_items: CounterWithTime::default(), |
241 | 422 | lifetime_inserted_bytes: Counter::default(), |
242 | 422 | _key_type: PhantomData, |
243 | 422 | remove_callbacks: Vec::new(), |
244 | 422 | }), |
245 | 422 | anchor_time, |
246 | 422 | max_bytes: config.max_bytes as u64, |
247 | 422 | evict_bytes: config.evict_bytes as u64, |
248 | 422 | max_seconds: config.max_seconds as i32, |
249 | 422 | max_count: config.max_count, |
250 | 422 | } |
251 | 422 | } |
252 | | |
253 | 0 | pub async fn enable_filtering(&self) { |
254 | 0 | let mut state = self.state.lock(); |
255 | 0 | if state.btree.is_none() { Branch (255:12): [Folded - Ignored]
Branch (255:12): [Folded - Ignored]
|
256 | 0 | Self::rebuild_btree_index(&mut state); |
257 | 0 | } |
258 | 0 | } |
259 | | |
260 | 2 | fn rebuild_btree_index(state: &mut State<K, Q, T, C>) { |
261 | 2 | state.btree = Some(state.lru.iter().map(|(k, _)| k).cloned().collect()); |
262 | 2 | } |
263 | | |
264 | | /// Run the `handler` function on each key-value pair that matches the `prefix_range` |
265 | | /// and return the number of items that were processed. |
266 | | /// The `handler` function should return `true` to continue processing the next item |
267 | | /// or `false` to stop processing. |
268 | 11 | pub fn range<F>(&self, prefix_range: impl RangeBounds<Q> + Send, mut handler: F) -> u64 |
269 | 11 | where |
270 | 11 | F: FnMut(&K, &T) -> bool + Send, |
271 | 11 | K: Ord, |
272 | | { |
273 | 11 | let mut state = self.state.lock(); |
274 | 11 | let btree = if let Some(ref btree9 ) = state.btree { Branch (274:28): [True: 6, False: 1]
Branch (274:28): [Folded - Ignored]
Branch (274:28): [True: 1, False: 0]
Branch (274:28): [True: 2, False: 0]
Branch (274:28): [True: 0, False: 1]
Branch (274:28): [Folded - Ignored]
|
275 | 9 | btree |
276 | | } else { |
277 | 2 | Self::rebuild_btree_index(&mut state); |
278 | 2 | state.btree.as_ref().unwrap() |
279 | | }; |
280 | 11 | let mut continue_count = 0; |
281 | 22 | for key in btree11 .range11 (prefix_range11 ) { |
282 | 22 | let value = &state.lru.peek(key.borrow()).unwrap().data; |
283 | 22 | let should_continue = handler(key, value); |
284 | 22 | if !should_continue { Branch (284:16): [True: 0, False: 13]
Branch (284:16): [Folded - Ignored]
Branch (284:16): [True: 0, False: 1]
Branch (284:16): [True: 0, False: 5]
Branch (284:16): [True: 0, False: 3]
Branch (284:16): [Folded - Ignored]
|
285 | 0 | break; |
286 | 22 | } |
287 | 22 | continue_count += 1; |
288 | | } |
289 | 11 | continue_count |
290 | 11 | } |
291 | | |
292 | | /// Returns the number of key-value pairs that are currently in the the cache. |
293 | | /// Function is not for production code paths. |
294 | 30 | pub fn len_for_test(&self) -> usize { |
295 | 30 | self.state.lock().lru.len() |
296 | 30 | } |
297 | | |
298 | 10.6k | fn should_evict( |
299 | 10.6k | &self, |
300 | 10.6k | lru_len: usize, |
301 | 10.6k | peek_entry: &EvictionItem<T>, |
302 | 10.6k | sum_store_size: u64, |
303 | 10.6k | max_bytes: u64, |
304 | 10.6k | ) -> bool { |
305 | 10.6k | let is_over_size = max_bytes != 0 && sum_store_size >= max_bytes216 ; Branch (305:28): [True: 0, False: 0]
Branch (305:28): [True: 0, False: 0]
Branch (305:28): [True: 108, False: 10.1k]
Branch (305:28): [True: 0, False: 0]
Branch (305:28): [Folded - Ignored]
Branch (305:28): [True: 0, False: 6]
Branch (305:28): [True: 0, False: 1]
Branch (305:28): [True: 0, False: 3]
Branch (305:28): [True: 30, False: 33]
Branch (305:28): [True: 0, False: 3]
Branch (305:28): [True: 0, False: 1]
Branch (305:28): [True: 0, False: 2]
Branch (305:28): [True: 3, False: 7]
Branch (305:28): [True: 0, False: 2]
Branch (305:28): [True: 3, False: 0]
Branch (305:28): [True: 0, False: 4]
Branch (305:28): [True: 0, False: 2]
Branch (305:28): [True: 0, False: 3]
Branch (305:28): [True: 0, False: 2]
Branch (305:28): [True: 1, False: 27]
Branch (305:28): [True: 0, False: 0]
Branch (305:28): [True: 0, False: 4]
Branch (305:28): [True: 0, False: 3]
Branch (305:28): [Folded - Ignored]
Branch (305:28): [True: 71, False: 138]
Branch (305:28): [True: 0, False: 0]
Branch (305:28): [True: 0, False: 85]
|
306 | | |
307 | 10.6k | let elapsed_seconds = |
308 | 10.6k | i32::try_from(self.anchor_time.elapsed().as_secs()).unwrap_or(i32::MAX); |
309 | 10.6k | let evict_older_than_seconds = elapsed_seconds.saturating_sub(self.max_seconds); |
310 | 10.6k | let old_item_exists = |
311 | 10.6k | self.max_seconds != 0 && peek_entry.seconds_since_anchor < evict_older_than_seconds115 ; Branch (311:13): [True: 0, False: 0]
Branch (311:13): [True: 0, False: 0]
Branch (311:13): [True: 0, False: 10.2k]
Branch (311:13): [True: 0, False: 0]
Branch (311:13): [Folded - Ignored]
Branch (311:13): [True: 0, False: 6]
Branch (311:13): [True: 0, False: 1]
Branch (311:13): [True: 0, False: 3]
Branch (311:13): [True: 30, False: 33]
Branch (311:13): [True: 0, False: 3]
Branch (311:13): [True: 0, False: 1]
Branch (311:13): [True: 0, False: 2]
Branch (311:13): [True: 0, False: 10]
Branch (311:13): [True: 0, False: 2]
Branch (311:13): [True: 0, False: 3]
Branch (311:13): [True: 0, False: 4]
Branch (311:13): [True: 0, False: 2]
Branch (311:13): [True: 0, False: 3]
Branch (311:13): [True: 0, False: 2]
Branch (311:13): [True: 0, False: 28]
Branch (311:13): [True: 0, False: 0]
Branch (311:13): [True: 0, False: 4]
Branch (311:13): [True: 0, False: 3]
Branch (311:13): [Folded - Ignored]
Branch (311:13): [True: 0, False: 209]
Branch (311:13): [True: 0, False: 0]
Branch (311:13): [True: 85, False: 0]
|
312 | | |
313 | 10.6k | let is_over_count = |
314 | 10.6k | self.max_count != 0 && u64::try_from20 (lru_len20 ).unwrap_or(u64::MAX) > self.max_count; Branch (314:13): [True: 0, False: 0]
Branch (314:13): [True: 0, False: 0]
Branch (314:13): [True: 0, False: 10.2k]
Branch (314:13): [True: 0, False: 0]
Branch (314:13): [Folded - Ignored]
Branch (314:13): [True: 0, False: 6]
Branch (314:13): [True: 0, False: 1]
Branch (314:13): [True: 3, False: 0]
Branch (314:13): [True: 8, False: 55]
Branch (314:13): [True: 0, False: 3]
Branch (314:13): [True: 0, False: 1]
Branch (314:13): [True: 0, False: 2]
Branch (314:13): [True: 0, False: 10]
Branch (314:13): [True: 0, False: 2]
Branch (314:13): [True: 0, False: 3]
Branch (314:13): [True: 4, False: 0]
Branch (314:13): [True: 2, False: 0]
Branch (314:13): [True: 3, False: 0]
Branch (314:13): [True: 0, False: 2]
Branch (314:13): [True: 0, False: 28]
Branch (314:13): [True: 0, False: 0]
Branch (314:13): [True: 0, False: 4]
Branch (314:13): [True: 0, False: 3]
Branch (314:13): [Folded - Ignored]
Branch (314:13): [True: 0, False: 209]
Branch (314:13): [True: 0, False: 0]
Branch (314:13): [True: 0, False: 85]
|
315 | | |
316 | 10.6k | is_over_size || old_item_exists10.6k || is_over_count10.6k Branch (316:9): [True: 0, False: 0]
Branch (316:25): [True: 0, False: 0]
Branch (316:9): [True: 0, False: 0]
Branch (316:25): [True: 0, False: 0]
Branch (316:9): [True: 2, False: 10.2k]
Branch (316:25): [True: 0, False: 10.2k]
Branch (316:9): [True: 0, False: 0]
Branch (316:25): [True: 0, False: 0]
Branch (316:9): [Folded - Ignored]
Branch (316:25): [Folded - Ignored]
Branch (316:9): [True: 0, False: 6]
Branch (316:25): [True: 0, False: 6]
Branch (316:9): [True: 0, False: 1]
Branch (316:25): [True: 0, False: 1]
Branch (316:9): [True: 0, False: 3]
Branch (316:25): [True: 0, False: 3]
Branch (316:9): [True: 6, False: 57]
Branch (316:25): [True: 9, False: 48]
Branch (316:9): [True: 0, False: 3]
Branch (316:25): [True: 0, False: 3]
Branch (316:9): [True: 0, False: 1]
Branch (316:25): [True: 0, False: 1]
Branch (316:9): [True: 0, False: 2]
Branch (316:25): [True: 0, False: 2]
Branch (316:9): [True: 0, False: 10]
Branch (316:25): [True: 0, False: 10]
Branch (316:9): [True: 0, False: 2]
Branch (316:25): [True: 0, False: 2]
Branch (316:9): [True: 1, False: 2]
Branch (316:25): [True: 0, False: 2]
Branch (316:9): [True: 0, False: 4]
Branch (316:25): [True: 0, False: 4]
Branch (316:9): [True: 0, False: 2]
Branch (316:25): [True: 0, False: 2]
Branch (316:9): [True: 0, False: 3]
Branch (316:25): [True: 0, False: 3]
Branch (316:9): [True: 0, False: 2]
Branch (316:25): [True: 0, False: 2]
Branch (316:9): [True: 0, False: 28]
Branch (316:25): [True: 0, False: 28]
Branch (316:9): [True: 0, False: 0]
Branch (316:25): [True: 0, False: 0]
Branch (316:9): [True: 0, False: 4]
Branch (316:25): [True: 0, False: 4]
Branch (316:9): [True: 0, False: 3]
Branch (316:25): [True: 0, False: 3]
Branch (316:9): [Folded - Ignored]
Branch (316:25): [Folded - Ignored]
Branch (316:9): [True: 0, False: 209]
Branch (316:25): [True: 0, False: 209]
Branch (316:9): [True: 0, False: 0]
Branch (316:25): [True: 0, False: 0]
Branch (316:9): [True: 0, False: 85]
Branch (316:25): [True: 1, False: 84]
|
317 | 10.6k | } |
318 | | |
319 | | #[must_use] |
320 | 5.96k | fn evict_items(&self, state: &mut State<K, Q, T, C>) -> (Vec<T>, Vec<RemoveFuture>) { |
321 | 5.96k | let Some((_, mut peek_entry)) = state.lru.peek_lru() else { Branch (321:13): [True: 0, False: 0]
Branch (321:13): [True: 0, False: 0]
Branch (321:13): [True: 5.75k, False: 0]
Branch (321:13): [True: 0, False: 0]
Branch (321:13): [Folded - Ignored]
Branch (321:13): [True: 6, False: 0]
Branch (321:13): [True: 1, False: 0]
Branch (321:13): [True: 2, False: 0]
Branch (321:13): [True: 31, False: 0]
Branch (321:13): [True: 3, False: 0]
Branch (321:13): [True: 1, False: 0]
Branch (321:13): [True: 2, False: 0]
Branch (321:13): [True: 7, False: 0]
Branch (321:13): [True: 2, False: 0]
Branch (321:13): [True: 2, False: 0]
Branch (321:13): [True: 2, False: 0]
Branch (321:13): [True: 2, False: 0]
Branch (321:13): [True: 2, False: 0]
Branch (321:13): [True: 2, False: 0]
Branch (321:13): [True: 18, False: 0]
Branch (321:13): [True: 0, False: 0]
Branch (321:13): [True: 0, False: 0]
Branch (321:13): [True: 2, False: 0]
Branch (321:13): [Folded - Ignored]
Branch (321:13): [True: 97, False: 0]
Branch (321:13): [True: 0, False: 0]
Branch (321:13): [True: 27, False: 0]
|
322 | 0 | return (Vec::new(), Vec::new()); |
323 | | }; |
324 | | |
325 | 5.96k | let max_bytes = if self.max_bytes != 0 Branch (325:28): [True: 0, False: 0]
Branch (325:28): [True: 0, False: 0]
Branch (325:28): [True: 8, False: 5.75k]
Branch (325:28): [True: 0, False: 0]
Branch (325:28): [Folded - Ignored]
Branch (325:28): [True: 0, False: 6]
Branch (325:28): [True: 0, False: 1]
Branch (325:28): [True: 0, False: 2]
Branch (325:28): [True: 8, False: 23]
Branch (325:28): [True: 0, False: 3]
Branch (325:28): [True: 0, False: 1]
Branch (325:28): [True: 0, False: 2]
Branch (325:28): [True: 0, False: 7]
Branch (325:28): [True: 0, False: 2]
Branch (325:28): [True: 2, False: 0]
Branch (325:28): [True: 0, False: 2]
Branch (325:28): [True: 0, False: 2]
Branch (325:28): [True: 0, False: 2]
Branch (325:28): [True: 0, False: 2]
Branch (325:28): [True: 0, False: 18]
Branch (325:28): [True: 0, False: 0]
Branch (325:28): [True: 0, False: 0]
Branch (325:28): [True: 0, False: 2]
Branch (325:28): [Folded - Ignored]
Branch (325:28): [True: 0, False: 97]
Branch (325:28): [True: 0, False: 0]
Branch (325:28): [True: 0, False: 27]
|
326 | 18 | && self.evict_bytes != 0 Branch (326:16): [True: 0, False: 0]
Branch (326:16): [True: 0, False: 0]
Branch (326:16): [True: 0, False: 8]
Branch (326:16): [True: 0, False: 0]
Branch (326:16): [Folded - Ignored]
Branch (326:16): [True: 0, False: 0]
Branch (326:16): [True: 0, False: 0]
Branch (326:16): [True: 0, False: 0]
Branch (326:16): [True: 4, False: 4]
Branch (326:16): [True: 0, False: 0]
Branch (326:16): [True: 0, False: 0]
Branch (326:16): [True: 0, False: 0]
Branch (326:16): [True: 0, False: 0]
Branch (326:16): [True: 0, False: 0]
Branch (326:16): [True: 0, False: 2]
Branch (326:16): [True: 0, False: 0]
Branch (326:16): [True: 0, False: 0]
Branch (326:16): [True: 0, False: 0]
Branch (326:16): [True: 0, False: 0]
Branch (326:16): [True: 0, False: 0]
Branch (326:16): [True: 0, False: 0]
Branch (326:16): [True: 0, False: 0]
Branch (326:16): [True: 0, False: 0]
Branch (326:16): [Folded - Ignored]
Branch (326:16): [True: 0, False: 0]
Branch (326:16): [True: 0, False: 0]
Branch (326:16): [True: 0, False: 0]
|
327 | 4 | && self.should_evict( Branch (327:16): [True: 0, False: 0]
Branch (327:16): [True: 0, False: 0]
Branch (327:16): [True: 0, False: 0]
Branch (327:16): [True: 0, False: 0]
Branch (327:16): [Folded - Ignored]
Branch (327:16): [True: 0, False: 0]
Branch (327:16): [True: 0, False: 0]
Branch (327:16): [True: 0, False: 0]
Branch (327:16): [True: 1, False: 3]
Branch (327:16): [True: 0, False: 0]
Branch (327:16): [True: 0, False: 0]
Branch (327:16): [True: 0, False: 0]
Branch (327:16): [True: 0, False: 0]
Branch (327:16): [True: 0, False: 0]
Branch (327:16): [True: 0, False: 0]
Branch (327:16): [True: 0, False: 0]
Branch (327:16): [True: 0, False: 0]
Branch (327:16): [True: 0, False: 0]
Branch (327:16): [True: 0, False: 0]
Branch (327:16): [True: 0, False: 0]
Branch (327:16): [True: 0, False: 0]
Branch (327:16): [True: 0, False: 0]
Branch (327:16): [True: 0, False: 0]
Branch (327:16): [Folded - Ignored]
Branch (327:16): [True: 0, False: 0]
Branch (327:16): [True: 0, False: 0]
Branch (327:16): [True: 0, False: 0]
|
328 | 4 | state.lru.len(), |
329 | 4 | peek_entry, |
330 | 4 | state.sum_store_size, |
331 | 4 | self.max_bytes, |
332 | | ) { |
333 | 1 | self.max_bytes.saturating_sub(self.evict_bytes) |
334 | | } else { |
335 | 5.96k | self.max_bytes |
336 | | }; |
337 | | |
338 | 5.96k | let mut items_to_unref = Vec::new(); |
339 | 5.96k | let mut removal_futures = Vec::new(); |
340 | | |
341 | 5.98k | while self.should_evict(state.lru.len(), peek_entry, state.sum_store_size, max_bytes) { Branch (341:15): [True: 0, False: 0]
Branch (341:15): [True: 0, False: 0]
Branch (341:15): [True: 2, False: 5.75k]
Branch (341:15): [True: 0, False: 0]
Branch (341:15): [Folded - Ignored]
Branch (341:15): [True: 0, False: 6]
Branch (341:15): [True: 0, False: 1]
Branch (341:15): [True: 0, False: 2]
Branch (341:15): [True: 12, False: 28]
Branch (341:15): [True: 0, False: 3]
Branch (341:15): [True: 0, False: 1]
Branch (341:15): [True: 0, False: 2]
Branch (341:15): [True: 0, False: 7]
Branch (341:15): [True: 0, False: 2]
Branch (341:15): [True: 1, False: 2]
Branch (341:15): [True: 1, False: 2]
Branch (341:15): [True: 0, False: 2]
Branch (341:15): [True: 0, False: 2]
Branch (341:15): [True: 0, False: 2]
Branch (341:15): [True: 0, False: 18]
Branch (341:15): [True: 0, False: 0]
Branch (341:15): [True: 0, False: 0]
Branch (341:15): [True: 0, False: 2]
Branch (341:15): [Folded - Ignored]
Branch (341:15): [True: 0, False: 97]
Branch (341:15): [True: 0, False: 0]
Branch (341:15): [True: 1, False: 27]
|
342 | 17 | let (key, eviction_item) = state |
343 | 17 | .lru |
344 | 17 | .pop_lru() |
345 | 17 | .expect("Tried to peek() then pop() but failed"); |
346 | 17 | debug!(?key, "Evicting",); |
347 | 17 | let (data, futures) = state.remove(key.borrow(), &eviction_item, false); |
348 | 17 | items_to_unref.push(data); |
349 | 17 | removal_futures.extend(futures.into_iter()); |
350 | | |
351 | 17 | peek_entry = if let Some((_, entry13 )) = state.lru.peek_lru() { Branch (351:33): [True: 0, False: 0]
Branch (351:33): [True: 0, False: 0]
Branch (351:33): [True: 1, False: 1]
Branch (351:33): [True: 0, False: 0]
Branch (351:33): [Folded - Ignored]
Branch (351:33): [True: 0, False: 0]
Branch (351:33): [True: 0, False: 0]
Branch (351:33): [True: 0, False: 0]
Branch (351:33): [True: 9, False: 3]
Branch (351:33): [True: 0, False: 0]
Branch (351:33): [True: 0, False: 0]
Branch (351:33): [True: 0, False: 0]
Branch (351:33): [True: 0, False: 0]
Branch (351:33): [True: 0, False: 0]
Branch (351:33): [True: 1, False: 0]
Branch (351:33): [True: 1, False: 0]
Branch (351:33): [True: 0, False: 0]
Branch (351:33): [True: 0, False: 0]
Branch (351:33): [True: 0, False: 0]
Branch (351:33): [True: 0, False: 0]
Branch (351:33): [True: 0, False: 0]
Branch (351:33): [True: 0, False: 0]
Branch (351:33): [True: 0, False: 0]
Branch (351:33): [Folded - Ignored]
Branch (351:33): [True: 0, False: 0]
Branch (351:33): [True: 0, False: 0]
Branch (351:33): [True: 1, False: 0]
|
352 | 13 | entry |
353 | | } else { |
354 | 4 | break; |
355 | | }; |
356 | | } |
357 | | |
358 | 5.96k | (items_to_unref, removal_futures) |
359 | 5.96k | } |
360 | | |
361 | | /// Return the size of a `key`, if not found `None` is returned. |
362 | 25 | pub async fn size_for_key(&self, key: &Q) -> Option<u64> { |
363 | 25 | let mut results = [None]; |
364 | 25 | self.sizes_for_keys([key], &mut results[..], false).await; |
365 | 25 | results[0] |
366 | 25 | } |
367 | | |
368 | | /// Return the sizes of a collection of `keys`. Expects `results` collection |
369 | | /// to be provided for storing the resulting key sizes. Each index value in |
370 | | /// `keys` maps directly to the size value for the key in `results`. |
371 | | /// If no key is found in the internal map, `None` is filled in its place. |
372 | | /// If `peek` is set to `true`, the items are not promoted to the front of the |
373 | | /// LRU cache. Note: peek may still evict, but won't promote. |
374 | 311 | pub async fn sizes_for_keys<It, R>(&self, keys: It, results: &mut [Option<u64>], peek: bool) |
375 | 311 | where |
376 | 311 | It: IntoIterator<Item = R> + Send, |
377 | 311 | // Note: It's not enough to have the inserts themselves be Send. The |
378 | 311 | // returned iterator should be Send as well. |
379 | 311 | <It as IntoIterator>::IntoIter: Send, |
380 | 311 | // This may look strange, but what we are doing is saying: |
381 | 311 | // * `K` must be able to borrow `Q` |
382 | 311 | // * `R` (the input stream item type) must also be able to borrow `Q` |
383 | 311 | // Note: That K and R do not need to be the same type, they just both need |
384 | 311 | // to be able to borrow a `Q`. |
385 | 311 | R: Borrow<Q> + Send, |
386 | 311 | { |
387 | 311 | let (removal_futures, data_to_unref) = { |
388 | 311 | let mut state = self.state.lock(); |
389 | | |
390 | 311 | let lru_len = state.lru.len(); |
391 | 311 | let mut data_to_unref = Vec::new(); |
392 | 311 | let mut removal_futures = Vec::new(); |
393 | 346 | for (key, result) in keys311 .into_iter311 ().zip311 (results311 .iter_mut311 ()) { |
394 | 346 | let maybe_entry = if peek { Branch (394:38): [True: 0, False: 0]
Branch (394:38): [True: 0, False: 226]
Branch (394:38): [True: 0, False: 0]
Branch (394:38): [Folded - Ignored]
Branch (394:38): [True: 0, False: 25]
Branch (394:38): [True: 2, False: 0]
Branch (394:38): [True: 5, False: 0]
Branch (394:38): [True: 4, False: 0]
Branch (394:38): [True: 0, False: 0]
Branch (394:38): [True: 0, False: 0]
Branch (394:38): [True: 0, False: 0]
Branch (394:38): [True: 0, False: 0]
Branch (394:38): [True: 0, False: 0]
Branch (394:38): [True: 0, False: 1]
Branch (394:38): [True: 0, False: 3]
Branch (394:38): [True: 0, False: 0]
Branch (394:38): [True: 0, False: 0]
Branch (394:38): [Folded - Ignored]
Branch (394:38): [True: 0, False: 80]
|
395 | 11 | state.lru.peek_mut(key.borrow()) |
396 | | } else { |
397 | 335 | state.lru.get_mut(key.borrow()) |
398 | | }; |
399 | 346 | match maybe_entry { |
400 | 188 | Some(entry) => { |
401 | | // Note: We need to check eviction because the item might be expired |
402 | | // based on the current time. In such case, we remove the item while |
403 | | // we are here. |
404 | 188 | if self.should_evict(lru_len, entry, 0, u64::MAX) { Branch (404:28): [True: 0, False: 0]
Branch (404:28): [True: 0, False: 99]
Branch (404:28): [True: 0, False: 0]
Branch (404:28): [Folded - Ignored]
Branch (404:28): [True: 1, False: 13]
Branch (404:28): [True: 0, False: 0]
Branch (404:28): [True: 0, False: 0]
Branch (404:28): [True: 0, False: 3]
Branch (404:28): [True: 0, False: 0]
Branch (404:28): [True: 0, False: 0]
Branch (404:28): [True: 0, False: 0]
Branch (404:28): [True: 0, False: 0]
Branch (404:28): [True: 0, False: 0]
Branch (404:28): [True: 0, False: 0]
Branch (404:28): [True: 0, False: 1]
Branch (404:28): [True: 0, False: 0]
Branch (404:28): [True: 0, False: 0]
Branch (404:28): [Folded - Ignored]
Branch (404:28): [True: 0, False: 71]
|
405 | 1 | *result = None; |
406 | 1 | if let Some((key, eviction_item)) = state.lru.pop_entry(key.borrow()) { Branch (406:36): [True: 0, False: 0]
Branch (406:36): [True: 0, False: 0]
Branch (406:36): [True: 0, False: 0]
Branch (406:36): [Folded - Ignored]
Branch (406:36): [True: 1, False: 0]
Branch (406:36): [True: 0, False: 0]
Branch (406:36): [True: 0, False: 0]
Branch (406:36): [True: 0, False: 0]
Branch (406:36): [True: 0, False: 0]
Branch (406:36): [True: 0, False: 0]
Branch (406:36): [True: 0, False: 0]
Branch (406:36): [True: 0, False: 0]
Branch (406:36): [True: 0, False: 0]
Branch (406:36): [True: 0, False: 0]
Branch (406:36): [True: 0, False: 0]
Branch (406:36): [True: 0, False: 0]
Branch (406:36): [True: 0, False: 0]
Branch (406:36): [Folded - Ignored]
Branch (406:36): [True: 0, False: 0]
|
407 | 1 | info!(?key, "Item expired, evicting"); |
408 | 1 | let (data, futures) = |
409 | 1 | state.remove(key.borrow(), &eviction_item, false); |
410 | | // Store data for later unref - we can't drop state here as we're still iterating |
411 | 1 | data_to_unref.push(data); |
412 | 1 | removal_futures.extend(futures.into_iter()); |
413 | 0 | } |
414 | | } else { |
415 | 187 | if !peek { Branch (415:32): [True: 0, False: 0]
Branch (415:32): [True: 99, False: 0]
Branch (415:32): [True: 0, False: 0]
Branch (415:32): [Folded - Ignored]
Branch (415:32): [True: 13, False: 0]
Branch (415:32): [True: 0, False: 0]
Branch (415:32): [True: 0, False: 0]
Branch (415:32): [True: 0, False: 3]
Branch (415:32): [True: 0, False: 0]
Branch (415:32): [True: 0, False: 0]
Branch (415:32): [True: 0, False: 0]
Branch (415:32): [True: 0, False: 0]
Branch (415:32): [True: 0, False: 0]
Branch (415:32): [True: 0, False: 0]
Branch (415:32): [True: 1, False: 0]
Branch (415:32): [True: 0, False: 0]
Branch (415:32): [True: 0, False: 0]
Branch (415:32): [Folded - Ignored]
Branch (415:32): [True: 71, False: 0]
|
416 | 184 | entry.seconds_since_anchor = |
417 | 184 | i32::try_from(self.anchor_time.elapsed().as_secs()) |
418 | 184 | .unwrap_or(i32::MAX); |
419 | 184 | }3 |
420 | 187 | *result = Some(entry.data.len()); |
421 | | } |
422 | | } |
423 | 158 | None => *result = None, |
424 | | } |
425 | | } |
426 | 311 | (removal_futures, data_to_unref) |
427 | | }; |
428 | | |
429 | | // Perform the async callbacks outside of the lock |
430 | 311 | let mut callbacks: FuturesUnordered<_> = removal_futures.into_iter().collect(); |
431 | 311 | while callbacks.next().await.is_some() {}0 Branch (431:15): [True: 0, False: 0]
Branch (431:15): [True: 0, False: 191]
Branch (431:15): [True: 0, False: 0]
Branch (431:15): [Folded - Ignored]
Branch (431:15): [True: 0, False: 25]
Branch (431:15): [True: 0, False: 2]
Branch (431:15): [True: 0, False: 5]
Branch (431:15): [True: 0, False: 4]
Branch (431:15): [True: 0, False: 0]
Branch (431:15): [True: 0, False: 0]
Branch (431:15): [True: 0, False: 0]
Branch (431:15): [True: 0, False: 0]
Branch (431:15): [True: 0, False: 0]
Branch (431:15): [True: 0, False: 1]
Branch (431:15): [True: 0, False: 3]
Branch (431:15): [True: 0, False: 0]
Branch (431:15): [True: 0, False: 0]
Branch (431:15): [Folded - Ignored]
Branch (431:15): [True: 0, False: 80]
|
432 | 311 | let mut callbacks: FuturesUnordered<_> = |
433 | 311 | data_to_unref.iter().map(LenEntry::unref).collect(); |
434 | 312 | while callbacks.next().await.is_some() {}1 Branch (434:15): [True: 0, False: 0]
Branch (434:15): [True: 0, False: 191]
Branch (434:15): [True: 0, False: 0]
Branch (434:15): [Folded - Ignored]
Branch (434:15): [True: 1, False: 25]
Branch (434:15): [True: 0, False: 2]
Branch (434:15): [True: 0, False: 5]
Branch (434:15): [True: 0, False: 4]
Branch (434:15): [True: 0, False: 0]
Branch (434:15): [True: 0, False: 0]
Branch (434:15): [True: 0, False: 0]
Branch (434:15): [True: 0, False: 0]
Branch (434:15): [True: 0, False: 0]
Branch (434:15): [True: 0, False: 1]
Branch (434:15): [True: 0, False: 3]
Branch (434:15): [True: 0, False: 0]
Branch (434:15): [True: 0, False: 0]
Branch (434:15): [Folded - Ignored]
Branch (434:15): [True: 0, False: 80]
|
435 | 311 | } |
436 | | |
437 | 4.49k | pub async fn get(&self, key: &Q) -> Option<T> { |
438 | | // Fast path: Check if we need eviction before acquiring lock for eviction |
439 | 4.49k | let needs_eviction = { |
440 | 4.49k | let state = self.state.lock(); |
441 | 4.49k | if let Some((_, peek_entry4.48k )) = state.lru.peek_lru() { Branch (441:20): [True: 0, False: 0]
Branch (441:20): [True: 0, False: 0]
Branch (441:20): [True: 4.36k, False: 4]
Branch (441:20): [Folded - Ignored]
Branch (441:20): [True: 1, False: 0]
Branch (441:20): [True: 5, False: 0]
Branch (441:20): [True: 0, False: 0]
Branch (441:20): [True: 0, False: 0]
Branch (441:20): [True: 1, False: 0]
Branch (441:20): [True: 0, False: 0]
Branch (441:20): [True: 1, False: 0]
Branch (441:20): [True: 0, False: 0]
Branch (441:20): [True: 9, False: 0]
Branch (441:20): [True: 0, False: 0]
Branch (441:20): [True: 4, False: 0]
Branch (441:20): [True: 1, False: 1]
Branch (441:20): [Folded - Ignored]
Branch (441:20): [True: 41, False: 0]
Branch (441:20): [True: 0, False: 0]
Branch (441:20): [True: 57, False: 0]
|
442 | 4.48k | self.should_evict( |
443 | 4.48k | state.lru.len(), |
444 | 4.48k | peek_entry, |
445 | 4.48k | state.sum_store_size, |
446 | 4.48k | self.max_bytes, |
447 | | ) |
448 | | } else { |
449 | 5 | false |
450 | | } |
451 | | }; |
452 | | |
453 | | // Perform eviction if needed |
454 | 4.49k | if needs_eviction { Branch (454:12): [True: 0, False: 0]
Branch (454:12): [True: 0, False: 0]
Branch (454:12): [True: 0, False: 4.37k]
Branch (454:12): [Folded - Ignored]
Branch (454:12): [True: 0, False: 1]
Branch (454:12): [True: 2, False: 3]
Branch (454:12): [True: 0, False: 0]
Branch (454:12): [True: 0, False: 0]
Branch (454:12): [True: 0, False: 1]
Branch (454:12): [True: 0, False: 0]
Branch (454:12): [True: 0, False: 1]
Branch (454:12): [True: 0, False: 0]
Branch (454:12): [True: 0, False: 9]
Branch (454:12): [True: 0, False: 0]
Branch (454:12): [True: 0, False: 4]
Branch (454:12): [True: 0, False: 2]
Branch (454:12): [Folded - Ignored]
Branch (454:12): [True: 0, False: 41]
Branch (454:12): [True: 0, False: 0]
Branch (454:12): [True: 0, False: 57]
|
455 | 2 | let (items_to_unref, removal_futures) = { |
456 | 2 | let mut state = self.state.lock(); |
457 | 2 | self.evict_items(&mut *state) |
458 | 2 | }; |
459 | | // Unref items outside of lock |
460 | 2 | let mut callbacks: FuturesUnordered<_> = removal_futures.into_iter().collect(); |
461 | 2 | while callbacks.next().await.is_some() {}0 Branch (461:19): [True: 0, False: 0]
Branch (461:19): [True: 0, False: 0]
Branch (461:19): [True: 0, False: 0]
Branch (461:19): [Folded - Ignored]
Branch (461:19): [True: 0, False: 0]
Branch (461:19): [True: 0, False: 2]
Branch (461:19): [True: 0, False: 0]
Branch (461:19): [True: 0, False: 0]
Branch (461:19): [True: 0, False: 0]
Branch (461:19): [True: 0, False: 0]
Branch (461:19): [True: 0, False: 0]
Branch (461:19): [True: 0, False: 0]
Branch (461:19): [True: 0, False: 0]
Branch (461:19): [True: 0, False: 0]
Branch (461:19): [True: 0, False: 0]
Branch (461:19): [True: 0, False: 0]
Branch (461:19): [Folded - Ignored]
Branch (461:19): [True: 0, False: 0]
Branch (461:19): [True: 0, False: 0]
Branch (461:19): [True: 0, False: 0]
|
462 | 2 | let mut callbacks: FuturesUnordered<_> = |
463 | 2 | items_to_unref.iter().map(LenEntry::unref).collect(); |
464 | 4 | while callbacks.next().await.is_some() {}2 Branch (464:19): [True: 0, False: 0]
Branch (464:19): [True: 0, False: 0]
Branch (464:19): [True: 0, False: 0]
Branch (464:19): [Folded - Ignored]
Branch (464:19): [True: 0, False: 0]
Branch (464:19): [True: 2, False: 2]
Branch (464:19): [True: 0, False: 0]
Branch (464:19): [True: 0, False: 0]
Branch (464:19): [True: 0, False: 0]
Branch (464:19): [True: 0, False: 0]
Branch (464:19): [True: 0, False: 0]
Branch (464:19): [True: 0, False: 0]
Branch (464:19): [True: 0, False: 0]
Branch (464:19): [True: 0, False: 0]
Branch (464:19): [True: 0, False: 0]
Branch (464:19): [True: 0, False: 0]
Branch (464:19): [Folded - Ignored]
Branch (464:19): [True: 0, False: 0]
Branch (464:19): [True: 0, False: 0]
Branch (464:19): [True: 0, False: 0]
|
465 | 4.49k | } |
466 | | |
467 | | // Now get the item |
468 | 4.49k | let mut state = self.state.lock(); |
469 | 4.49k | let entry4.48k = state.lru.get_mut(key.borrow())?10 ; |
470 | 4.48k | entry.seconds_since_anchor = |
471 | 4.48k | i32::try_from(self.anchor_time.elapsed().as_secs()).unwrap_or(i32::MAX); |
472 | 4.48k | Some(entry.data.clone()) |
473 | 4.49k | } |
474 | | |
475 | | /// Returns the replaced item if any. |
476 | 5.94k | pub async fn insert(&self, key: K, data: T) -> Option<T> |
477 | 5.94k | where |
478 | 5.94k | K: 'static, |
479 | 5.94k | { |
480 | 5.94k | self.insert_with_time( |
481 | 5.94k | key, |
482 | 5.94k | data, |
483 | 5.94k | i32::try_from(self.anchor_time.elapsed().as_secs()).unwrap_or(i32::MAX), |
484 | 5.94k | ) |
485 | 5.94k | .await |
486 | 5.94k | } |
487 | | |
488 | | /// Returns the replaced item if any. |
489 | 5.94k | pub async fn insert_with_time(&self, key: K, data: T, seconds_since_anchor: i32) -> Option<T> { |
490 | 5.94k | let (items_to_unref, removal_futures) = { |
491 | 5.94k | let mut state = self.state.lock(); |
492 | 5.94k | self.inner_insert_many(&mut state, [(key, data)], seconds_since_anchor) |
493 | 5.94k | }; |
494 | | |
495 | 5.94k | let mut futures: FuturesUnordered<_> = removal_futures.into_iter().collect(); |
496 | 5.94k | while futures.next().await.is_some() {}1 Branch (496:15): [True: 0, False: 0]
Branch (496:15): [True: 1, False: 5.75k]
Branch (496:15): [True: 0, False: 0]
Branch (496:15): [True: 0, False: 0]
Branch (496:15): [Folded - Ignored]
Branch (496:15): [True: 0, False: 2]
Branch (496:15): [True: 0, False: 27]
Branch (496:15): [True: 0, False: 3]
Branch (496:15): [True: 0, False: 0]
Branch (496:15): [True: 0, False: 4]
Branch (496:15): [True: 0, False: 2]
Branch (496:15): [True: 0, False: 2]
Branch (496:15): [True: 0, False: 2]
Branch (496:15): [True: 0, False: 2]
Branch (496:15): [True: 0, False: 2]
Branch (496:15): [True: 0, False: 1]
Branch (496:15): [True: 0, False: 16]
Branch (496:15): [True: 0, False: 0]
Branch (496:15): [True: 0, False: 2]
Branch (496:15): [Folded - Ignored]
Branch (496:15): [True: 0, False: 97]
Branch (496:15): [True: 0, False: 27]
|
497 | | |
498 | | // Unref items outside of lock |
499 | 5.94k | let futures: FuturesUnordered<_> = items_to_unref |
500 | 5.94k | .into_iter() |
501 | 5.94k | .map(|item| async move {54 |
502 | 54 | item.unref().await; |
503 | 54 | item |
504 | 108 | }) |
505 | 5.94k | .collect(); |
506 | 5.94k | futures.collect::<Vec<_>>().await.into_iter().next() |
507 | 5.94k | } |
508 | | |
509 | | /// Same as `insert()`, but optimized for multiple inserts. |
510 | | /// Returns the replaced items if any. |
511 | 7 | pub async fn insert_many<It>(&self, inserts: It) -> Vec<T> |
512 | 7 | where |
513 | 7 | It: IntoIterator<Item = (K, T)> + Send, |
514 | 7 | // Note: It's not enough to have the inserts themselves be Send. The |
515 | 7 | // returned iterator should be Send as well. |
516 | 7 | <It as IntoIterator>::IntoIter: Send, |
517 | 7 | K: 'static, |
518 | 7 | { |
519 | 7 | let mut inserts = inserts.into_iter().peekable(); |
520 | | // Shortcut for cases where there are no inserts, so we don't need to lock. |
521 | 7 | if inserts.peek().is_none() { Branch (521:12): [True: 0, False: 0]
Branch (521:12): [Folded - Ignored]
Branch (521:12): [True: 1, False: 1]
Branch (521:12): [True: 4, False: 1]
Branch (521:12): [Folded - Ignored]
|
522 | 5 | return Vec::new(); |
523 | 2 | } |
524 | | |
525 | 2 | let (items_to_unref, removal_futures) = { |
526 | 2 | let mut state = self.state.lock(); |
527 | 2 | self.inner_insert_many( |
528 | 2 | &mut state, |
529 | 2 | inserts, |
530 | 2 | i32::try_from(self.anchor_time.elapsed().as_secs()).unwrap_or(i32::MAX), |
531 | 2 | ) |
532 | 2 | }; |
533 | | |
534 | 2 | let mut futures: FuturesUnordered<_> = removal_futures.into_iter().collect(); |
535 | 2 | while futures.next().await.is_some() {}0 Branch (535:15): [True: 0, False: 0]
Branch (535:15): [Folded - Ignored]
Branch (535:15): [True: 0, False: 1]
Branch (535:15): [True: 0, False: 1]
Branch (535:15): [Folded - Ignored]
|
536 | | |
537 | | // Unref items outside of lock |
538 | 2 | items_to_unref |
539 | 2 | .into_iter() |
540 | 2 | .map(|item| async move {0 |
541 | 0 | item.unref().await; |
542 | 0 | item |
543 | 0 | }) |
544 | 2 | .collect::<FuturesUnordered<_>>() |
545 | 2 | .collect::<Vec<_>>() |
546 | 2 | .await |
547 | 7 | } |
548 | | |
549 | 5.95k | fn inner_insert_many<It>( |
550 | 5.95k | &self, |
551 | 5.95k | state: &mut State<K, Q, T, C>, |
552 | 5.95k | inserts: It, |
553 | 5.95k | seconds_since_anchor: i32, |
554 | 5.95k | ) -> (Vec<T>, Vec<RemoveFuture>) |
555 | 5.95k | where |
556 | 5.95k | It: IntoIterator<Item = (K, T)> + Send, |
557 | 5.95k | // Note: It's not enough to have the inserts themselves be Send. The |
558 | 5.95k | // returned iterator should be Send as well. |
559 | 5.95k | <It as IntoIterator>::IntoIter: Send, |
560 | | { |
561 | 5.95k | let mut replaced_items = Vec::new(); |
562 | 5.95k | let mut removal_futures = Vec::new(); |
563 | 11.9k | for (key5.95k , data5.95k ) in inserts { |
564 | 5.95k | let new_item_size = data.len(); |
565 | 5.95k | let eviction_item = EvictionItem { |
566 | 5.95k | seconds_since_anchor, |
567 | 5.95k | data, |
568 | 5.95k | }; |
569 | | |
570 | 5.95k | if let Some((old_item40 , futures40 )) = state.put(&key, eviction_item) { Branch (570:20): [True: 0, False: 0]
Branch (570:20): [True: 0, False: 0]
Branch (570:20): [True: 18, False: 5.74k]
Branch (570:20): [True: 0, False: 0]
Branch (570:20): [True: 0, False: 0]
Branch (570:20): [Folded - Ignored]
Branch (570:20): [True: 1, False: 1]
Branch (570:20): [True: 0, False: 27]
Branch (570:20): [True: 0, False: 3]
Branch (570:20): [True: 0, False: 0]
Branch (570:20): [True: 0, False: 1]
Branch (570:20): [True: 0, False: 4]
Branch (570:20): [True: 0, False: 1]
Branch (570:20): [True: 0, False: 2]
Branch (570:20): [True: 0, False: 2]
Branch (570:20): [True: 0, False: 2]
Branch (570:20): [True: 1, False: 1]
Branch (570:20): [True: 1, False: 1]
Branch (570:20): [True: 0, False: 1]
Branch (570:20): [True: 3, False: 13]
Branch (570:20): [True: 0, False: 0]
Branch (570:20): [True: 0, False: 2]
Branch (570:20): [Folded - Ignored]
Branch (570:20): [True: 16, False: 81]
Branch (570:20): [True: 0, False: 27]
|
571 | 40 | removal_futures.extend(futures.into_iter()); |
572 | 40 | replaced_items.push(old_item); |
573 | 5.91k | } |
574 | 5.95k | state.sum_store_size += new_item_size; |
575 | 5.95k | state.lifetime_inserted_bytes.add(new_item_size); |
576 | | } |
577 | | |
578 | | // Perform eviction after all insertions |
579 | 5.95k | let (items_to_unref, futures) = self.evict_items(state); |
580 | 5.95k | removal_futures.extend(futures); |
581 | | |
582 | | // Note: We cannot drop the state lock here since we're borrowing it, |
583 | | // but the caller will handle unreffing these items after releasing the lock |
584 | 5.95k | replaced_items.extend(items_to_unref); |
585 | | |
586 | 5.95k | (replaced_items, removal_futures) |
587 | 5.95k | } |
588 | | |
589 | 15 | pub async fn remove(&self, key: &Q) -> bool { |
590 | 15 | let (items_to_unref, removed_item, removal_futures) = { |
591 | 15 | let mut state = self.state.lock(); |
592 | | |
593 | | // First perform eviction |
594 | 15 | let (evicted_items, mut removal_futures) = self.evict_items(&mut *state); |
595 | | |
596 | | // Then try to remove the requested item |
597 | 15 | let removed = if let Some(entry14 ) = state.lru.pop(key.borrow()) { Branch (597:34): [True: 0, False: 0]
Branch (597:34): [True: 0, False: 0]
Branch (597:34): [Folded - Ignored]
Branch (597:34): [True: 6, False: 0]
Branch (597:34): [True: 1, False: 0]
Branch (597:34): [True: 1, False: 1]
Branch (597:34): [True: 1, False: 0]
Branch (597:34): [True: 1, False: 0]
Branch (597:34): [True: 2, False: 0]
Branch (597:34): [True: 0, False: 0]
Branch (597:34): [True: 0, False: 0]
Branch (597:34): [True: 0, False: 0]
Branch (597:34): [True: 0, False: 0]
Branch (597:34): [True: 0, False: 0]
Branch (597:34): [True: 0, False: 0]
Branch (597:34): [True: 2, False: 0]
Branch (597:34): [True: 0, False: 0]
Branch (597:34): [True: 0, False: 0]
Branch (597:34): [Folded - Ignored]
Branch (597:34): [True: 0, False: 0]
|
598 | 14 | let (removed_item, more_removal_futures) = state.remove(key, &entry, false); |
599 | 14 | removal_futures.extend(more_removal_futures.into_iter()); |
600 | 14 | Some(removed_item) |
601 | | } else { |
602 | 1 | None |
603 | | }; |
604 | | |
605 | 15 | (evicted_items, removed, removal_futures) |
606 | | }; |
607 | | |
608 | 15 | let mut callbacks: FuturesUnordered<_> = removal_futures.into_iter().collect(); |
609 | 16 | while callbacks.next().await.is_some() {}1 Branch (609:15): [True: 0, False: 0]
Branch (609:15): [True: 0, False: 0]
Branch (609:15): [Folded - Ignored]
Branch (609:15): [True: 0, False: 6]
Branch (609:15): [True: 0, False: 1]
Branch (609:15): [True: 0, False: 2]
Branch (609:15): [True: 0, False: 1]
Branch (609:15): [True: 1, False: 1]
Branch (609:15): [True: 0, False: 2]
Branch (609:15): [True: 0, False: 0]
Branch (609:15): [True: 0, False: 0]
Branch (609:15): [True: 0, False: 0]
Branch (609:15): [True: 0, False: 0]
Branch (609:15): [True: 0, False: 0]
Branch (609:15): [True: 0, False: 0]
Branch (609:15): [True: 0, False: 2]
Branch (609:15): [True: 0, False: 0]
Branch (609:15): [True: 0, False: 0]
Branch (609:15): [Folded - Ignored]
Branch (609:15): [True: 0, False: 0]
|
610 | | |
611 | | // Unref evicted items outside of lock |
612 | 15 | let mut callbacks: FuturesUnordered<_> = |
613 | 15 | items_to_unref.iter().map(LenEntry::unref).collect(); |
614 | 16 | while callbacks.next().await.is_some() {}1 Branch (614:15): [True: 0, False: 0]
Branch (614:15): [True: 0, False: 0]
Branch (614:15): [Folded - Ignored]
Branch (614:15): [True: 0, False: 6]
Branch (614:15): [True: 0, False: 1]
Branch (614:15): [True: 1, False: 2]
Branch (614:15): [True: 0, False: 1]
Branch (614:15): [True: 0, False: 1]
Branch (614:15): [True: 0, False: 2]
Branch (614:15): [True: 0, False: 0]
Branch (614:15): [True: 0, False: 0]
Branch (614:15): [True: 0, False: 0]
Branch (614:15): [True: 0, False: 0]
Branch (614:15): [True: 0, False: 0]
Branch (614:15): [True: 0, False: 0]
Branch (614:15): [True: 0, False: 2]
Branch (614:15): [True: 0, False: 0]
Branch (614:15): [True: 0, False: 0]
Branch (614:15): [Folded - Ignored]
Branch (614:15): [True: 0, False: 0]
|
615 | | |
616 | | // Unref removed item if any |
617 | 15 | if let Some(item14 ) = removed_item { Branch (617:16): [True: 0, False: 0]
Branch (617:16): [True: 0, False: 0]
Branch (617:16): [Folded - Ignored]
Branch (617:16): [True: 6, False: 0]
Branch (617:16): [True: 1, False: 0]
Branch (617:16): [True: 1, False: 1]
Branch (617:16): [True: 1, False: 0]
Branch (617:16): [True: 1, False: 0]
Branch (617:16): [True: 2, False: 0]
Branch (617:16): [True: 0, False: 0]
Branch (617:16): [True: 0, False: 0]
Branch (617:16): [True: 0, False: 0]
Branch (617:16): [True: 0, False: 0]
Branch (617:16): [True: 0, False: 0]
Branch (617:16): [True: 0, False: 0]
Branch (617:16): [True: 2, False: 0]
Branch (617:16): [True: 0, False: 0]
Branch (617:16): [True: 0, False: 0]
Branch (617:16): [Folded - Ignored]
Branch (617:16): [True: 0, False: 0]
|
618 | 14 | item.unref().await; |
619 | 14 | return true; |
620 | 1 | } |
621 | | |
622 | 1 | false |
623 | 15 | } |
624 | | |
625 | | /// Same as `remove()`, but allows for a conditional to be applied to the |
626 | | /// entry before removal in an atomic fashion. |
627 | 1 | pub async fn remove_if<F>(&self, key: &Q, cond: F) -> bool |
628 | 1 | where |
629 | 1 | F: FnOnce(&T) -> bool + Send, |
630 | 1 | { |
631 | 1 | let (evicted_items, removal_futures, removed_item) = { |
632 | 1 | let mut state = self.state.lock(); |
633 | 1 | if let Some(entry) = state.lru.get(key.borrow()) { Branch (633:20): [True: 0, False: 0]
Branch (633:20): [Folded - Ignored]
Branch (633:20): [True: 0, False: 0]
Branch (633:20): [True: 0, False: 0]
Branch (633:20): [True: 0, False: 0]
Branch (633:20): [True: 0, False: 0]
Branch (633:20): [True: 0, False: 0]
Branch (633:20): [True: 1, False: 0]
Branch (633:20): [True: 0, False: 0]
Branch (633:20): [True: 0, False: 0]
Branch (633:20): [True: 0, False: 0]
Branch (633:20): [Folded - Ignored]
Branch (633:20): [True: 0, False: 0]
|
634 | 1 | if !cond(&entry.data) { Branch (634:20): [True: 0, False: 0]
Branch (634:20): [Folded - Ignored]
Branch (634:20): [True: 0, False: 0]
Branch (634:20): [True: 0, False: 0]
Branch (634:20): [True: 0, False: 0]
Branch (634:20): [True: 0, False: 0]
Branch (634:20): [True: 0, False: 0]
Branch (634:20): [True: 0, False: 1]
Branch (634:20): [True: 0, False: 0]
Branch (634:20): [True: 0, False: 0]
Branch (634:20): [True: 0, False: 0]
Branch (634:20): [Folded - Ignored]
Branch (634:20): [True: 0, False: 0]
|
635 | 0 | return false; |
636 | 1 | } |
637 | | // First perform eviction |
638 | 1 | let (evicted_items, mut removal_futures) = self.evict_items(&mut state); |
639 | | |
640 | | // Then try to remove the requested item |
641 | 1 | let removed_item = if let Some(entry) = state.lru.pop(key.borrow()) { Branch (641:43): [True: 0, False: 0]
Branch (641:43): [Folded - Ignored]
Branch (641:43): [True: 0, False: 0]
Branch (641:43): [True: 0, False: 0]
Branch (641:43): [True: 0, False: 0]
Branch (641:43): [True: 0, False: 0]
Branch (641:43): [True: 0, False: 0]
Branch (641:43): [True: 1, False: 0]
Branch (641:43): [True: 0, False: 0]
Branch (641:43): [True: 0, False: 0]
Branch (641:43): [True: 0, False: 0]
Branch (641:43): [Folded - Ignored]
Branch (641:43): [True: 0, False: 0]
|
642 | 1 | let (item, more_removal_futures) = state.remove(key, &entry, false); |
643 | 1 | removal_futures.extend(more_removal_futures.into_iter()); |
644 | 1 | Some(item) |
645 | | } else { |
646 | 0 | None |
647 | | }; |
648 | | |
649 | 1 | (evicted_items, removal_futures, removed_item) |
650 | | } else { |
651 | 0 | (vec![], vec![].into_iter().collect(), None) |
652 | | } |
653 | | }; |
654 | | |
655 | | // Perform the async callbacks outside of the lock |
656 | 1 | let mut removal_futures: FuturesUnordered<_> = removal_futures.into_iter().collect(); |
657 | 1 | while removal_futures.next().await.is_some() {}0 Branch (657:15): [True: 0, False: 0]
Branch (657:15): [Folded - Ignored]
Branch (657:15): [True: 0, False: 0]
Branch (657:15): [True: 0, False: 0]
Branch (657:15): [True: 0, False: 0]
Branch (657:15): [True: 0, False: 0]
Branch (657:15): [True: 0, False: 0]
Branch (657:15): [True: 0, False: 1]
Branch (657:15): [True: 0, False: 0]
Branch (657:15): [True: 0, False: 0]
Branch (657:15): [True: 0, False: 0]
Branch (657:15): [Folded - Ignored]
Branch (657:15): [True: 0, False: 0]
|
658 | | |
659 | | // Unref evicted items |
660 | 1 | let mut callbacks: FuturesUnordered<_> = |
661 | 1 | evicted_items.iter().map(LenEntry::unref).collect(); |
662 | 1 | while callbacks.next().await.is_some() {}0 Branch (662:15): [True: 0, False: 0]
Branch (662:15): [Folded - Ignored]
Branch (662:15): [True: 0, False: 0]
Branch (662:15): [True: 0, False: 0]
Branch (662:15): [True: 0, False: 0]
Branch (662:15): [True: 0, False: 0]
Branch (662:15): [True: 0, False: 0]
Branch (662:15): [True: 0, False: 1]
Branch (662:15): [True: 0, False: 0]
Branch (662:15): [True: 0, False: 0]
Branch (662:15): [True: 0, False: 0]
Branch (662:15): [Folded - Ignored]
Branch (662:15): [True: 0, False: 0]
|
663 | | |
664 | | // Unref removed item if any |
665 | 1 | if let Some(item) = removed_item { Branch (665:16): [True: 0, False: 0]
Branch (665:16): [Folded - Ignored]
Branch (665:16): [True: 0, False: 0]
Branch (665:16): [True: 0, False: 0]
Branch (665:16): [True: 0, False: 0]
Branch (665:16): [True: 0, False: 0]
Branch (665:16): [True: 0, False: 0]
Branch (665:16): [True: 1, False: 0]
Branch (665:16): [True: 0, False: 0]
Branch (665:16): [True: 0, False: 0]
Branch (665:16): [True: 0, False: 0]
Branch (665:16): [Folded - Ignored]
Branch (665:16): [True: 0, False: 0]
|
666 | 1 | item.unref().await; |
667 | 1 | true |
668 | | } else { |
669 | 0 | false |
670 | | } |
671 | 1 | } |
672 | | |
673 | 5 | pub fn add_remove_callback(&self, callback: C) { |
674 | 5 | self.state.lock().add_remove_callback(callback); |
675 | 5 | } |
676 | | } |