/build/source/nativelink-util/src/evicting_map.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // See LICENSE file for details |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use core::borrow::Borrow; |
16 | | use core::cmp::Eq; |
17 | | use core::fmt::Debug; |
18 | | use core::future::Future; |
19 | | use core::hash::Hash; |
20 | | use core::marker::PhantomData; |
21 | | use core::ops::RangeBounds; |
22 | | use core::pin::Pin; |
23 | | use std::collections::BTreeSet; |
24 | | use std::sync::Arc; |
25 | | |
26 | | use futures::StreamExt; |
27 | | use futures::stream::FuturesUnordered; |
28 | | use lru::LruCache; |
29 | | use nativelink_config::stores::EvictionPolicy; |
30 | | use nativelink_metric::MetricsComponent; |
31 | | use parking_lot::Mutex; |
32 | | use serde::{Deserialize, Serialize}; |
33 | | use tracing::{debug, info}; |
34 | | |
35 | | use crate::instant_wrapper::InstantWrapper; |
36 | | use crate::metrics_utils::{Counter, CounterWithTime}; |
37 | | |
38 | | #[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone)] |
39 | | pub struct SerializedLRU<K> { |
40 | | pub data: Vec<(K, i32)>, |
41 | | pub anchor_time: u64, |
42 | | } |
43 | | |
44 | | #[derive(Debug)] |
45 | | struct EvictionItem<T: LenEntry + Debug> { |
46 | | seconds_since_anchor: i32, |
47 | | data: T, |
48 | | } |
49 | | |
50 | | pub trait LenEntry: 'static { |
51 | | /// Length of referenced data. |
52 | | fn len(&self) -> u64; |
53 | | |
54 | | /// Returns `true` if `self` has zero length. |
55 | | fn is_empty(&self) -> bool; |
56 | | |
57 | | /// This will be called when object is removed from map. |
58 | | /// Note: There may still be a reference to it held somewhere else, which |
59 | | /// is why it can't be mutable. This is a good place to mark the item |
60 | | /// to be deleted and then in the Drop call actually do the deleting. |
61 | | /// This will ensure nowhere else in the program still holds a reference |
62 | | /// to this object. |
63 | | /// You should not rely only on the Drop trait. Doing so might result in the |
64 | | /// program safely shutting down and calling the Drop method on each object, |
65 | | /// which if you are deleting items you may not want to do. |
66 | | /// It is undefined behavior to have `unref()` called more than once. |
67 | | /// During the execution of `unref()` no items can be added or removed to/from |
68 | | /// the `EvictionMap` globally (including inside `unref()`). |
69 | | #[inline] |
70 | 52 | fn unref(&self) -> impl Future<Output = ()> + Send { |
71 | 52 | core::future::ready(()) |
72 | 52 | } |
73 | | } |
74 | | |
75 | | impl<T: LenEntry + Send + Sync> LenEntry for Arc<T> { |
76 | | #[inline] |
77 | 241 | fn len(&self) -> u64 { |
78 | 241 | T::len(self.as_ref()) |
79 | 241 | } |
80 | | |
81 | | #[inline] |
82 | 0 | fn is_empty(&self) -> bool { |
83 | 0 | T::is_empty(self.as_ref()) |
84 | 0 | } |
85 | | |
86 | | #[inline] |
87 | 13 | async fn unref(&self) { |
88 | 13 | self.as_ref().unref().await; |
89 | 13 | } |
90 | | } |
91 | | |
92 | | // Callback to be called when the EvictingMap removes an item |
93 | | // either via eviction or direct deletion. This will be called with |
94 | | // whatever key type the EvictingMap uses. |
95 | | pub trait RemoveItemCallback<Q>: Debug + Send + Sync { |
96 | | fn callback(&self, store_key: &Q) -> Pin<Box<dyn Future<Output = ()> + Send>>; |
97 | | } |
98 | | |
99 | | #[derive(Debug, MetricsComponent)] |
100 | | struct State< |
101 | | K: Ord + Hash + Eq + Clone + Debug + Send + Borrow<Q>, |
102 | | Q: Ord + Hash + Eq + Debug, |
103 | | T: LenEntry + Debug + Send, |
104 | | C: RemoveItemCallback<Q>, |
105 | | > { |
106 | | lru: LruCache<K, EvictionItem<T>>, |
107 | | btree: Option<BTreeSet<K>>, |
108 | | #[metric(help = "Total size of all items in the store")] |
109 | | sum_store_size: u64, |
110 | | |
111 | | #[metric(help = "Number of bytes evicted from the store")] |
112 | | evicted_bytes: Counter, |
113 | | #[metric(help = "Number of items evicted from the store")] |
114 | | evicted_items: CounterWithTime, |
115 | | #[metric(help = "Number of bytes replaced in the store")] |
116 | | replaced_bytes: Counter, |
117 | | #[metric(help = "Number of items replaced in the store")] |
118 | | replaced_items: CounterWithTime, |
119 | | #[metric(help = "Number of bytes inserted into the store since it was created")] |
120 | | lifetime_inserted_bytes: Counter, |
121 | | |
122 | | _key_type: PhantomData<Q>, |
123 | | remove_callbacks: Vec<C>, |
124 | | } |
125 | | |
126 | | type RemoveFuture = Pin<Box<dyn Future<Output = ()> + Send>>; |
127 | | |
128 | | impl< |
129 | | K: Ord + Hash + Eq + Clone + Debug + Send + Sync + Borrow<Q>, |
130 | | Q: Ord + Hash + Eq + Debug + Sync, |
131 | | T: LenEntry + Debug + Sync + Send, |
132 | | C: RemoveItemCallback<Q>, |
133 | | > State<K, Q, T, C> |
134 | | { |
135 | | /// Removes an item from the cache and returns the data for deferred cleanup. |
136 | | /// The caller is responsible for calling `unref()` on the returned data outside of the lock. |
137 | | #[must_use] |
138 | 65 | fn remove( |
139 | 65 | &mut self, |
140 | 65 | key: &Q, |
141 | 65 | eviction_item: &EvictionItem<T>, |
142 | 65 | replaced: bool, |
143 | 65 | ) -> (T, Vec<RemoveFuture>) |
144 | 65 | where |
145 | 65 | T: Clone, |
146 | | { |
147 | 65 | if let Some(btree0 ) = &mut self.btree { |
148 | 0 | btree.remove(key); |
149 | 65 | } |
150 | 65 | self.sum_store_size -= eviction_item.data.len(); |
151 | 65 | if replaced { |
152 | 30 | self.replaced_items.inc(); |
153 | 30 | self.replaced_bytes.add(eviction_item.data.len()); |
154 | 35 | } else { |
155 | 35 | self.evicted_items.inc(); |
156 | 35 | self.evicted_bytes.add(eviction_item.data.len()); |
157 | 35 | } |
158 | | |
159 | 65 | let callbacks = self |
160 | 65 | .remove_callbacks |
161 | 65 | .iter() |
162 | 65 | .map(|callback| callback2 .callback2 (key2 )) |
163 | 65 | .collect(); |
164 | | |
165 | | // Return the data for deferred unref outside of lock |
166 | 65 | (eviction_item.data.clone(), callbacks) |
167 | 65 | } |
168 | | |
169 | | /// Inserts a new item into the cache. If the key already exists, the old item is returned |
170 | | /// for deferred cleanup. |
171 | | #[must_use] |
172 | 6.01k | fn put(&mut self, key: &K, eviction_item: EvictionItem<T>) -> Option<(T, Vec<RemoveFuture>)> |
173 | 6.01k | where |
174 | 6.01k | K: Clone, |
175 | 6.01k | T: Clone, |
176 | | { |
177 | | // If we are maintaining a btree index, we need to update it. |
178 | 6.01k | if let Some(btree0 ) = &mut self.btree { |
179 | 0 | btree.insert(key.clone()); |
180 | 6.01k | } |
181 | 6.01k | self.lru |
182 | 6.01k | .put(key.clone(), eviction_item) |
183 | 6.01k | .map(|old_item| self30 .remove30 (key.borrow()30 , &old_item30 , true)) |
184 | 6.01k | } |
185 | | |
186 | 5 | fn add_remove_callback(&mut self, callback: C) { |
187 | 5 | self.remove_callbacks.push(callback); |
188 | 5 | } |
189 | | } |
190 | | |
191 | | #[derive(Debug, Clone, Copy)] |
192 | | pub struct NoopRemove; |
193 | | |
194 | | impl<Q> RemoveItemCallback<Q> for NoopRemove { |
195 | 0 | fn callback(&self, _store_key: &Q) -> Pin<Box<dyn Future<Output = ()> + Send>> { |
196 | 0 | Box::pin(async {}) |
197 | 0 | } |
198 | | } |
199 | | |
200 | | #[derive(Debug, MetricsComponent)] |
201 | | pub struct EvictingMap< |
202 | | K: Ord + Hash + Eq + Clone + Debug + Send + Borrow<Q>, |
203 | | Q: Ord + Hash + Eq + Debug, |
204 | | T: LenEntry + Debug + Send, |
205 | | I: InstantWrapper, |
206 | | C: RemoveItemCallback<Q> = NoopRemove, |
207 | | > { |
208 | | #[metric] |
209 | | state: Mutex<State<K, Q, T, C>>, |
210 | | anchor_time: I, |
211 | | #[metric(help = "Maximum size of the store in bytes")] |
212 | | max_bytes: u64, |
213 | | #[metric(help = "Number of bytes to evict when the store is full")] |
214 | | evict_bytes: u64, |
215 | | #[metric(help = "Maximum number of seconds to keep an item in the store")] |
216 | | max_seconds: i32, |
217 | | #[metric(help = "Maximum number of items to keep in the store")] |
218 | | max_count: u64, |
219 | | } |
220 | | |
221 | | impl<K, Q, T, I, C> EvictingMap<K, Q, T, I, C> |
222 | | where |
223 | | K: Ord + Hash + Eq + Clone + Debug + Send + Sync + Borrow<Q>, |
224 | | Q: Ord + Hash + Eq + Debug + Sync, |
225 | | T: LenEntry + Debug + Clone + Send + Sync, |
226 | | I: InstantWrapper, |
227 | | C: RemoveItemCallback<Q>, |
228 | | { |
229 | 483 | pub fn new(config: &EvictionPolicy, anchor_time: I) -> Self { |
230 | 483 | Self { |
231 | 483 | // We use unbounded because if we use the bounded version we can't call the delete |
232 | 483 | // function on the LenEntry properly. |
233 | 483 | state: Mutex::new(State { |
234 | 483 | lru: LruCache::unbounded(), |
235 | 483 | btree: None, |
236 | 483 | sum_store_size: 0, |
237 | 483 | evicted_bytes: Counter::default(), |
238 | 483 | evicted_items: CounterWithTime::default(), |
239 | 483 | replaced_bytes: Counter::default(), |
240 | 483 | replaced_items: CounterWithTime::default(), |
241 | 483 | lifetime_inserted_bytes: Counter::default(), |
242 | 483 | _key_type: PhantomData, |
243 | 483 | remove_callbacks: Vec::new(), |
244 | 483 | }), |
245 | 483 | anchor_time, |
246 | 483 | max_bytes: config.max_bytes as u64, |
247 | 483 | evict_bytes: config.evict_bytes as u64, |
248 | 483 | max_seconds: config.max_seconds as i32, |
249 | 483 | max_count: config.max_count, |
250 | 483 | } |
251 | 483 | } |
252 | | |
253 | 0 | pub async fn enable_filtering(&self) { |
254 | 0 | let mut state = self.state.lock(); |
255 | 0 | if state.btree.is_none() { |
256 | 0 | Self::rebuild_btree_index(&mut state); |
257 | 0 | } |
258 | 0 | } |
259 | | |
260 | 2 | fn rebuild_btree_index(state: &mut State<K, Q, T, C>) { |
261 | 2 | state.btree = Some(state.lru.iter().map(|(k, _)| k).cloned().collect()); |
262 | 2 | } |
263 | | |
264 | | /// Run the `handler` function on each key-value pair that matches the `prefix_range` |
265 | | /// and return the number of items that were processed. |
266 | | /// The `handler` function should return `true` to continue processing the next item |
267 | | /// or `false` to stop processing. |
268 | 11 | pub fn range<F>(&self, prefix_range: impl RangeBounds<Q> + Send, mut handler: F) -> u64 |
269 | 11 | where |
270 | 11 | F: FnMut(&K, &T) -> bool + Send, |
271 | 11 | K: Ord, |
272 | | { |
273 | 11 | let mut state = self.state.lock(); |
274 | 11 | let btree = if let Some(ref btree9 ) = state.btree { |
275 | 9 | btree |
276 | | } else { |
277 | 2 | Self::rebuild_btree_index(&mut state); |
278 | 2 | state.btree.as_ref().unwrap() |
279 | | }; |
280 | 11 | let mut continue_count = 0; |
281 | 22 | for key in btree11 .range11 (prefix_range11 ) { |
282 | 22 | let value = &state.lru.peek(key.borrow()).unwrap().data; |
283 | 22 | let should_continue = handler(key, value); |
284 | 22 | if !should_continue { |
285 | 0 | break; |
286 | 22 | } |
287 | 22 | continue_count += 1; |
288 | | } |
289 | 11 | continue_count |
290 | 11 | } |
291 | | |
292 | | /// Returns the number of key-value pairs that are currently in the the cache. |
293 | | /// Function is not for production code paths. |
294 | 30 | pub fn len_for_test(&self) -> usize { |
295 | 30 | self.state.lock().lru.len() |
296 | 30 | } |
297 | | |
298 | 10.9k | fn should_evict( |
299 | 10.9k | &self, |
300 | 10.9k | lru_len: usize, |
301 | 10.9k | peek_entry: &EvictionItem<T>, |
302 | 10.9k | sum_store_size: u64, |
303 | 10.9k | max_bytes: u64, |
304 | 10.9k | ) -> bool { |
305 | 10.9k | let is_over_size = max_bytes != 0 && sum_store_size >= max_bytes4.92k ; |
306 | | |
307 | 10.9k | let elapsed_seconds = |
308 | 10.9k | i32::try_from(self.anchor_time.elapsed().as_secs()).unwrap_or(i32::MAX); |
309 | 10.9k | let evict_older_than_seconds = elapsed_seconds.saturating_sub(self.max_seconds); |
310 | 10.9k | let old_item_exists = |
311 | 10.9k | self.max_seconds != 0 && peek_entry.seconds_since_anchor < evict_older_than_seconds128 ; |
312 | | |
313 | 10.9k | let is_over_count = |
314 | 10.9k | self.max_count != 0 && u64::try_from26 (lru_len26 ).unwrap_or(u64::MAX) > self.max_count; |
315 | | |
316 | 10.9k | is_over_size || old_item_exists10.9k || is_over_count10.9k |
317 | 10.9k | } |
318 | | |
319 | | #[must_use] |
320 | 6.03k | fn evict_items(&self, state: &mut State<K, Q, T, C>) -> (Vec<T>, Vec<RemoveFuture>) { |
321 | 6.03k | let Some((_, mut peek_entry)) = state.lru.peek_lru() else { |
322 | 0 | return (Vec::new(), Vec::new()); |
323 | | }; |
324 | | |
325 | 6.03k | let max_bytes = if self.max_bytes != 0 |
326 | 19 | && self.evict_bytes != 0 |
327 | 4 | && self.should_evict( |
328 | 4 | state.lru.len(), |
329 | 4 | peek_entry, |
330 | 4 | state.sum_store_size, |
331 | 4 | self.max_bytes, |
332 | | ) { |
333 | 1 | self.max_bytes.saturating_sub(self.evict_bytes) |
334 | | } else { |
335 | 6.03k | self.max_bytes |
336 | | }; |
337 | | |
338 | 6.03k | let mut items_to_unref = Vec::new(); |
339 | 6.03k | let mut removal_futures = Vec::new(); |
340 | | |
341 | 6.04k | while self.should_evict(state.lru.len(), peek_entry, state.sum_store_size, max_bytes) { |
342 | 16 | let (key, eviction_item) = state |
343 | 16 | .lru |
344 | 16 | .pop_lru() |
345 | 16 | .expect("Tried to peek() then pop() but failed"); |
346 | 16 | debug!(?key, "Evicting",); |
347 | 16 | let (data, futures) = state.remove(key.borrow(), &eviction_item, false); |
348 | 16 | items_to_unref.push(data); |
349 | 16 | removal_futures.extend(futures.into_iter()); |
350 | | |
351 | 16 | peek_entry = if let Some((_, entry12 )) = state.lru.peek_lru() { |
352 | 12 | entry |
353 | | } else { |
354 | 4 | break; |
355 | | }; |
356 | | } |
357 | | |
358 | 6.03k | (items_to_unref, removal_futures) |
359 | 6.03k | } |
360 | | |
361 | | /// Return the size of a `key`, if not found `None` is returned. |
362 | 25 | pub async fn size_for_key(&self, key: &Q) -> Option<u64> { |
363 | 25 | let mut results = [None]; |
364 | 25 | self.sizes_for_keys([key], &mut results[..], false).await; |
365 | 25 | results[0] |
366 | 25 | } |
367 | | |
368 | | /// Return the sizes of a collection of `keys`. Expects `results` collection |
369 | | /// to be provided for storing the resulting key sizes. Each index value in |
370 | | /// `keys` maps directly to the size value for the key in `results`. |
371 | | /// If no key is found in the internal map, `None` is filled in its place. |
372 | | /// If `peek` is set to `true`, the items are not promoted to the front of the |
373 | | /// LRU cache. Note: peek may still evict, but won't promote. |
374 | 404 | pub async fn sizes_for_keys<It, R>(&self, keys: It, results: &mut [Option<u64>], peek: bool) |
375 | 404 | where |
376 | 404 | It: IntoIterator<Item = R> + Send, |
377 | 404 | // Note: It's not enough to have the inserts themselves be Send. The |
378 | 404 | // returned iterator should be Send as well. |
379 | 404 | <It as IntoIterator>::IntoIter: Send, |
380 | 404 | // This may look strange, but what we are doing is saying: |
381 | 404 | // * `K` must be able to borrow `Q` |
382 | 404 | // * `R` (the input stream item type) must also be able to borrow `Q` |
383 | 404 | // Note: That K and R do not need to be the same type, they just both need |
384 | 404 | // to be able to borrow a `Q`. |
385 | 404 | R: Borrow<Q> + Send, |
386 | 404 | { |
387 | 404 | let (removal_futures, data_to_unref) = { |
388 | 404 | let mut state = self.state.lock(); |
389 | | |
390 | 404 | let lru_len = state.lru.len(); |
391 | 404 | let mut data_to_unref = Vec::new(); |
392 | 404 | let mut removal_futures = Vec::new(); |
393 | 447 | for (key, result) in keys404 .into_iter404 ().zip404 (results404 .iter_mut404 ()) { |
394 | 447 | let maybe_entry = if peek { |
395 | 11 | state.lru.peek_mut(key.borrow()) |
396 | | } else { |
397 | 436 | state.lru.get_mut(key.borrow()) |
398 | | }; |
399 | 447 | match maybe_entry { |
400 | 203 | Some(entry) => { |
401 | | // Note: We need to check eviction because the item might be expired |
402 | | // based on the current time. In such case, we remove the item while |
403 | | // we are here. |
404 | 203 | if self.should_evict(lru_len, entry, 0, u64::MAX) { |
405 | 1 | *result = None; |
406 | 1 | if let Some((key, eviction_item)) = state.lru.pop_entry(key.borrow()) { |
407 | 1 | info!(?key, "Item expired, evicting"); |
408 | 1 | let (data, futures) = |
409 | 1 | state.remove(key.borrow(), &eviction_item, false); |
410 | | // Store data for later unref - we can't drop state here as we're still iterating |
411 | 1 | data_to_unref.push(data); |
412 | 1 | removal_futures.extend(futures.into_iter()); |
413 | 0 | } |
414 | | } else { |
415 | 202 | if !peek { |
416 | 199 | entry.seconds_since_anchor = |
417 | 199 | i32::try_from(self.anchor_time.elapsed().as_secs()) |
418 | 199 | .unwrap_or(i32::MAX); |
419 | 199 | }3 |
420 | 202 | *result = Some(entry.data.len()); |
421 | | } |
422 | | } |
423 | 244 | None => *result = None, |
424 | | } |
425 | | } |
426 | 404 | (removal_futures, data_to_unref) |
427 | | }; |
428 | | |
429 | | // Perform the async callbacks outside of the lock |
430 | 404 | let mut callbacks: FuturesUnordered<_> = removal_futures.into_iter().collect(); |
431 | 404 | while callbacks.next().await.is_some() {}0 |
432 | 404 | let mut callbacks: FuturesUnordered<_> = |
433 | 404 | data_to_unref.iter().map(LenEntry::unref).collect(); |
434 | 405 | while callbacks.next().await.is_some() {}1 |
435 | 404 | } |
436 | | |
437 | | /// Returns the value for `key` if present and not expired, refreshing |
438 | | /// its LRU/atime position. If the entry is present but TTL- or |
439 | | /// count-expired, it is reaped and `None` is returned. |
440 | | /// |
441 | | /// A read never cascades into other entries — only the queried key is |
442 | | /// ever touched. Global eviction (size/count overflow trim) runs on |
443 | | /// inserts; it is not driven by reads, since `sum_store_size` cannot |
444 | | /// grow without an insert. |
445 | 4.70k | pub async fn get(&self, key: &Q) -> Option<T> { |
446 | | // Lazily reap *only* the requested entry if it is itself expired; |
447 | | // leave the rest for inserts (which already run the global eviction |
448 | | // loop). |
449 | 4.69k | let (data, expired_data, removal_futures) = { |
450 | 4.70k | let mut state = self.state.lock(); |
451 | 4.70k | let lru_len = state.lru.len(); |
452 | 4.70k | let entry4.69k = state.lru.get_mut(key.borrow())?13 ; |
453 | | // Pass `sum_store_size=0` and `max_bytes=u64::MAX` so we only |
454 | | // consult TTL / count predicates — never the global byte budget. |
455 | | // Mirrors the per-key reap path in `sizes_for_keys`. |
456 | 4.69k | if self.should_evict(lru_len, entry, 0, u64::MAX) { |
457 | 3 | let (popped_key, eviction_item) = state |
458 | 3 | .lru |
459 | 3 | .pop_entry(key.borrow()) |
460 | 3 | .expect("entry was just observed via get_mut"); |
461 | 3 | info!(?popped_key, "Item expired, evicting"); |
462 | 3 | let (data, futures) = state.remove(popped_key.borrow(), &eviction_item, false); |
463 | 3 | (None, Some(data), futures) |
464 | | } else { |
465 | 4.68k | entry.seconds_since_anchor = |
466 | 4.68k | i32::try_from(self.anchor_time.elapsed().as_secs()).unwrap_or(i32::MAX); |
467 | 4.68k | (Some(entry.data.clone()), None, Vec::new()) |
468 | | } |
469 | | }; |
470 | | |
471 | | // Drain remove_callbacks and unref the reaped entry outside the lock. |
472 | 4.69k | let mut callbacks: FuturesUnordered<_> = removal_futures.into_iter().collect(); |
473 | 4.69k | while callbacks.next().await.is_some() {}0 |
474 | 4.69k | if let Some(d3 ) = expired_data { |
475 | 3 | d.unref().await; |
476 | 4.68k | } |
477 | | |
478 | 4.69k | data |
479 | 4.70k | } |
480 | | |
481 | | /// Returns the replaced item if any. |
482 | 6.01k | pub async fn insert(&self, key: K, data: T) -> Option<T> |
483 | 6.01k | where |
484 | 6.01k | K: 'static, |
485 | 6.01k | { |
486 | 6.01k | self.insert_with_time( |
487 | 6.01k | key, |
488 | 6.01k | data, |
489 | 6.01k | i32::try_from(self.anchor_time.elapsed().as_secs()).unwrap_or(i32::MAX), |
490 | 6.01k | ) |
491 | 6.01k | .await |
492 | 6.01k | } |
493 | | |
494 | | /// Returns the replaced item if any. |
495 | 6.01k | pub async fn insert_with_time(&self, key: K, data: T, seconds_since_anchor: i32) -> Option<T> { |
496 | 6.01k | let (items_to_unref, removal_futures) = { |
497 | 6.01k | let mut state = self.state.lock(); |
498 | 6.01k | self.inner_insert_many(&mut state, [(key, data)], seconds_since_anchor) |
499 | 6.01k | }; |
500 | | |
501 | 6.01k | let mut futures: FuturesUnordered<_> = removal_futures.into_iter().collect(); |
502 | 6.01k | while futures.next().await.is_some() {}1 |
503 | | |
504 | | // Unref items outside of lock |
505 | 6.01k | let futures: FuturesUnordered<_> = items_to_unref |
506 | 6.01k | .into_iter() |
507 | 6.01k | .map(|item| async move {45 |
508 | 45 | item.unref().await; |
509 | 45 | item |
510 | 90 | }) |
511 | 6.01k | .collect(); |
512 | 6.01k | futures.collect::<Vec<_>>().await.into_iter().next() |
513 | 6.01k | } |
514 | | |
515 | | /// Same as `insert()`, but optimized for multiple inserts. |
516 | | /// Returns the replaced items if any. |
517 | 7 | pub async fn insert_many<It>(&self, inserts: It) -> Vec<T> |
518 | 7 | where |
519 | 7 | It: IntoIterator<Item = (K, T)> + Send, |
520 | 7 | // Note: It's not enough to have the inserts themselves be Send. The |
521 | 7 | // returned iterator should be Send as well. |
522 | 7 | <It as IntoIterator>::IntoIter: Send, |
523 | 7 | K: 'static, |
524 | 7 | { |
525 | 7 | let mut inserts = inserts.into_iter().peekable(); |
526 | | // Shortcut for cases where there are no inserts, so we don't need to lock. |
527 | 7 | if inserts.peek().is_none() { |
528 | 5 | return Vec::new(); |
529 | 2 | } |
530 | | |
531 | 2 | let (items_to_unref, removal_futures) = { |
532 | 2 | let mut state = self.state.lock(); |
533 | 2 | self.inner_insert_many( |
534 | 2 | &mut state, |
535 | 2 | inserts, |
536 | 2 | i32::try_from(self.anchor_time.elapsed().as_secs()).unwrap_or(i32::MAX), |
537 | 2 | ) |
538 | 2 | }; |
539 | | |
540 | 2 | let mut futures: FuturesUnordered<_> = removal_futures.into_iter().collect(); |
541 | 2 | while futures.next().await.is_some() {}0 |
542 | | |
543 | | // Unref items outside of lock |
544 | 2 | items_to_unref |
545 | 2 | .into_iter() |
546 | 2 | .map(|item| async move {0 |
547 | 0 | item.unref().await; |
548 | 0 | item |
549 | 0 | }) |
550 | 2 | .collect::<FuturesUnordered<_>>() |
551 | 2 | .collect::<Vec<_>>() |
552 | 2 | .await |
553 | 7 | } |
554 | | |
555 | 6.01k | fn inner_insert_many<It>( |
556 | 6.01k | &self, |
557 | 6.01k | state: &mut State<K, Q, T, C>, |
558 | 6.01k | inserts: It, |
559 | 6.01k | seconds_since_anchor: i32, |
560 | 6.01k | ) -> (Vec<T>, Vec<RemoveFuture>) |
561 | 6.01k | where |
562 | 6.01k | It: IntoIterator<Item = (K, T)> + Send, |
563 | 6.01k | // Note: It's not enough to have the inserts themselves be Send. The |
564 | 6.01k | // returned iterator should be Send as well. |
565 | 6.01k | <It as IntoIterator>::IntoIter: Send, |
566 | | { |
567 | 6.01k | let mut replaced_items = Vec::new(); |
568 | 6.01k | let mut removal_futures = Vec::new(); |
569 | 6.01k | for (key, data) in inserts { |
570 | 6.01k | let new_item_size = data.len(); |
571 | 6.01k | let eviction_item = EvictionItem { |
572 | 6.01k | seconds_since_anchor, |
573 | 6.01k | data, |
574 | 6.01k | }; |
575 | | |
576 | 6.01k | if let Some((old_item30 , futures30 )) = state.put(&key, eviction_item) { |
577 | 30 | removal_futures.extend(futures.into_iter()); |
578 | 30 | replaced_items.push(old_item); |
579 | 5.98k | } |
580 | 6.01k | state.sum_store_size += new_item_size; |
581 | 6.01k | state.lifetime_inserted_bytes.add(new_item_size); |
582 | | } |
583 | | |
584 | | // Perform eviction after all insertions |
585 | 6.01k | let (items_to_unref, futures) = self.evict_items(state); |
586 | 6.01k | removal_futures.extend(futures); |
587 | | |
588 | | // Note: We cannot drop the state lock here since we're borrowing it, |
589 | | // but the caller will handle unreffing these items after releasing the lock |
590 | 6.01k | replaced_items.extend(items_to_unref); |
591 | | |
592 | 6.01k | (replaced_items, removal_futures) |
593 | 6.01k | } |
594 | | |
595 | 15 | pub async fn remove(&self, key: &Q) -> bool { |
596 | 15 | let (items_to_unref, removed_item, removal_futures) = { |
597 | 15 | let mut state = self.state.lock(); |
598 | | |
599 | | // First perform eviction |
600 | 15 | let (evicted_items, mut removal_futures) = self.evict_items(&mut *state); |
601 | | |
602 | | // Then try to remove the requested item |
603 | 15 | let removed = if let Some(entry14 ) = state.lru.pop(key.borrow()) { |
604 | 14 | let (removed_item, more_removal_futures) = state.remove(key, &entry, false); |
605 | 14 | removal_futures.extend(more_removal_futures.into_iter()); |
606 | 14 | Some(removed_item) |
607 | | } else { |
608 | 1 | None |
609 | | }; |
610 | | |
611 | 15 | (evicted_items, removed, removal_futures) |
612 | | }; |
613 | | |
614 | 15 | let mut callbacks: FuturesUnordered<_> = removal_futures.into_iter().collect(); |
615 | 16 | while callbacks.next().await.is_some() {}1 |
616 | | |
617 | | // Unref evicted items outside of lock |
618 | 15 | let mut callbacks: FuturesUnordered<_> = |
619 | 15 | items_to_unref.iter().map(LenEntry::unref).collect(); |
620 | 16 | while callbacks.next().await.is_some() {}1 |
621 | | |
622 | | // Unref removed item if any |
623 | 15 | if let Some(item14 ) = removed_item { |
624 | 14 | item.unref().await; |
625 | 14 | return true; |
626 | 1 | } |
627 | | |
628 | 1 | false |
629 | 15 | } |
630 | | |
631 | | /// Same as `remove()`, but allows for a conditional to be applied to the |
632 | | /// entry before removal in an atomic fashion. |
633 | 1 | pub async fn remove_if<F>(&self, key: &Q, cond: F) -> bool |
634 | 1 | where |
635 | 1 | F: FnOnce(&T) -> bool + Send, |
636 | 1 | { |
637 | 1 | let (evicted_items, removal_futures, removed_item) = { |
638 | 1 | let mut state = self.state.lock(); |
639 | 1 | if let Some(entry) = state.lru.get(key.borrow()) { |
640 | 1 | if !cond(&entry.data) { |
641 | 0 | return false; |
642 | 1 | } |
643 | | // First perform eviction |
644 | 1 | let (evicted_items, mut removal_futures) = self.evict_items(&mut state); |
645 | | |
646 | | // Then try to remove the requested item |
647 | 1 | let removed_item = if let Some(entry) = state.lru.pop(key.borrow()) { |
648 | 1 | let (item, more_removal_futures) = state.remove(key, &entry, false); |
649 | 1 | removal_futures.extend(more_removal_futures.into_iter()); |
650 | 1 | Some(item) |
651 | | } else { |
652 | 0 | None |
653 | | }; |
654 | | |
655 | 1 | (evicted_items, removal_futures, removed_item) |
656 | | } else { |
657 | 0 | (vec![], vec![].into_iter().collect(), None) |
658 | | } |
659 | | }; |
660 | | |
661 | | // Perform the async callbacks outside of the lock |
662 | 1 | let mut removal_futures: FuturesUnordered<_> = removal_futures.into_iter().collect(); |
663 | 1 | while removal_futures.next().await.is_some() {}0 |
664 | | |
665 | | // Unref evicted items |
666 | 1 | let mut callbacks: FuturesUnordered<_> = |
667 | 1 | evicted_items.iter().map(LenEntry::unref).collect(); |
668 | 1 | while callbacks.next().await.is_some() {}0 |
669 | | |
670 | | // Unref removed item if any |
671 | 1 | if let Some(item) = removed_item { |
672 | 1 | item.unref().await; |
673 | 1 | true |
674 | | } else { |
675 | 0 | false |
676 | | } |
677 | 1 | } |
678 | | |
679 | 5 | pub fn add_remove_callback(&self, callback: C) { |
680 | 5 | self.state.lock().add_remove_callback(callback); |
681 | 5 | } |
682 | | } |