/build/source/nativelink-store/src/existence_cache_store.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // See LICENSE file for details |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use core::pin::Pin; |
16 | | use std::borrow::Cow; |
17 | | use std::sync::{Arc, Weak}; |
18 | | use std::time::SystemTime; |
19 | | |
20 | | use async_trait::async_trait; |
21 | | use futures::StreamExt; |
22 | | use futures::stream::FuturesUnordered; |
23 | | use nativelink_config::stores::{EvictionPolicy, ExistenceCacheSpec}; |
24 | | use nativelink_error::{Error, ResultExt, error_if}; |
25 | | use nativelink_metric::MetricsComponent; |
26 | | use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; |
27 | | use nativelink_util::common::DigestInfo; |
28 | | use nativelink_util::evicting_map::{EvictingMap, LenEntry}; |
29 | | use nativelink_util::health_utils::{HealthStatus, HealthStatusIndicator}; |
30 | | use nativelink_util::instant_wrapper::InstantWrapper; |
31 | | use nativelink_util::store_trait::{ |
32 | | RemoveItemCallback, Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo, |
33 | | }; |
34 | | use parking_lot::Mutex; |
35 | | use tracing::{debug, info, trace}; |
36 | | |
37 | | #[derive(Clone, Debug)] |
38 | | struct ExistenceItem(u64); |
39 | | |
40 | | impl LenEntry for ExistenceItem { |
41 | | #[inline] |
42 | 15 | fn len(&self) -> u64 { |
43 | 15 | self.0 |
44 | 15 | } |
45 | | |
46 | | #[inline] |
47 | 0 | fn is_empty(&self) -> bool { |
48 | 0 | false |
49 | 0 | } |
50 | | } |
51 | | |
52 | | #[derive(Debug, MetricsComponent)] |
53 | | pub struct ExistenceCacheStore<I: InstantWrapper> { |
54 | | #[metric(group = "inner_store")] |
55 | | inner_store: Store, |
56 | | existence_cache: EvictingMap<DigestInfo, DigestInfo, ExistenceItem, I>, |
57 | | |
58 | | // We need to pause them temporarily when inserting into the inner store |
59 | | // as if it immediately expires them, we should only apply the remove callbacks |
60 | | // afterwards. If this is None, we're not pausing; if it's Some it's the location to |
61 | | // store them in temporarily |
62 | | pause_remove_callbacks: Mutex<Option<Vec<StoreKey<'static>>>>, |
63 | | } |
64 | | |
65 | | impl ExistenceCacheStore<SystemTime> { |
66 | 4 | pub fn new(spec: &ExistenceCacheSpec, inner_store: Store) -> Arc<Self> { |
67 | 4 | Self::new_with_time(spec, inner_store, SystemTime::now()) |
68 | 4 | } |
69 | | } |
70 | | |
71 | | impl<I: InstantWrapper> RemoveItemCallback for ExistenceCacheStore<I> { |
72 | 2 | fn callback<'a>( |
73 | 2 | &'a self, |
74 | 2 | store_key: StoreKey<'a>, |
75 | 2 | ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> { |
76 | 2 | debug!(?store_key, "Removing item from cache due to callback"); |
77 | 2 | let digest = store_key.borrow().into_digest(); |
78 | 2 | Box::pin(async move { |
79 | 2 | let deleted_key = self.existence_cache.remove(&digest).await; |
80 | 2 | if !deleted_key { Branch (80:16): [True: 0, False: 0]
Branch (80:16): [True: 0, False: 1]
Branch (80:16): [True: 0, False: 1]
Branch (80:16): [Folded - Ignored]
|
81 | 0 | info!(?store_key, "Failed to delete key from cache on callback"); |
82 | 2 | } |
83 | 2 | }) |
84 | 2 | } |
85 | | } |
86 | | |
87 | | #[derive(Debug)] |
88 | | struct ExistenceCacheCallback<I: InstantWrapper> { |
89 | | cache: Weak<ExistenceCacheStore<I>>, |
90 | | } |
91 | | |
92 | | impl<I: InstantWrapper> RemoveItemCallback for ExistenceCacheCallback<I> { |
93 | 2 | fn callback<'a>( |
94 | 2 | &'a self, |
95 | 2 | store_key: StoreKey<'a>, |
96 | 2 | ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> { |
97 | 2 | let cache = self.cache.upgrade(); |
98 | 2 | if let Some(local_cache) = cache { Branch (98:16): [True: 1, False: 0]
Branch (98:16): [True: 1, False: 0]
Branch (98:16): [Folded - Ignored]
|
99 | 2 | if let Some(callbacks1 ) = local_cache.pause_remove_callbacks.lock().as_mut() { Branch (99:20): [True: 1, False: 0]
Branch (99:20): [True: 0, False: 1]
Branch (99:20): [Folded - Ignored]
|
100 | 1 | callbacks.push(store_key.into_owned()); |
101 | 1 | } else { |
102 | 1 | let store_key = store_key.into_owned(); |
103 | 1 | return Box::pin(async move { |
104 | 1 | local_cache.callback(store_key).await; |
105 | 1 | }); |
106 | | } |
107 | | } else { |
108 | 0 | debug!("Cache dropped, so not doing callback"); |
109 | | } |
110 | 1 | Box::pin(async {}) |
111 | 2 | } |
112 | | } |
113 | | |
114 | | impl<I: InstantWrapper> ExistenceCacheStore<I> { |
115 | 5 | pub fn new_with_time( |
116 | 5 | spec: &ExistenceCacheSpec, |
117 | 5 | inner_store: Store, |
118 | 5 | anchor_time: I, |
119 | 5 | ) -> Arc<Self> { |
120 | 5 | let empty_policy = EvictionPolicy::default(); |
121 | 5 | let eviction_policy = spec.eviction_policy.as_ref().unwrap_or(&empty_policy); |
122 | 5 | let existence_cache_store = Arc::new(Self { |
123 | 5 | inner_store, |
124 | 5 | existence_cache: EvictingMap::new(eviction_policy, anchor_time), |
125 | 5 | pause_remove_callbacks: Mutex::new(None), |
126 | 5 | }); |
127 | 5 | let other_ref = Arc::downgrade(&existence_cache_store); |
128 | 5 | existence_cache_store |
129 | 5 | .inner_store |
130 | 5 | .register_remove_callback(Arc::new(ExistenceCacheCallback { cache: other_ref })) |
131 | 5 | .expect("Register remove callback should work"); |
132 | 5 | existence_cache_store |
133 | 5 | } |
134 | | |
135 | 4 | pub async fn exists_in_cache(&self, digest: &DigestInfo) -> bool { |
136 | 4 | let mut results = [None]; |
137 | 4 | self.existence_cache |
138 | 4 | .sizes_for_keys([digest], &mut results[..], true /* peek */) |
139 | 4 | .await; |
140 | 4 | results[0].is_some() |
141 | 4 | } |
142 | | |
143 | 1 | pub async fn remove_from_cache(&self, digest: &DigestInfo) { |
144 | 1 | self.existence_cache.remove(digest).await; |
145 | 1 | } |
146 | | |
147 | 7 | async fn inner_has_with_results( |
148 | 7 | self: Pin<&Self>, |
149 | 7 | keys: &[DigestInfo], |
150 | 7 | results: &mut [Option<u64>], |
151 | 7 | ) -> Result<(), Error> { |
152 | 7 | self.existence_cache |
153 | 7 | .sizes_for_keys(keys, results, true /* peek */) |
154 | 7 | .await; |
155 | | |
156 | 7 | let not_cached_keys: Vec<_> = keys |
157 | 7 | .iter() |
158 | 7 | .zip(results.iter()) |
159 | 7 | .filter_map(|(digest, result)| result.map_or_else(|| Some(digest.into()), |_| None)) |
160 | 7 | .collect(); |
161 | | |
162 | | // Hot path optimization when all keys are cached. |
163 | 7 | if not_cached_keys.is_empty() { Branch (163:12): [True: 0, False: 0]
Branch (163:12): [True: 0, False: 2]
Branch (163:12): [True: 0, False: 5]
Branch (163:12): [Folded - Ignored]
|
164 | 0 | return Ok(()); |
165 | 7 | } |
166 | | |
167 | | // Now query only the items not found in the cache. |
168 | 7 | let mut inner_results = vec![None; not_cached_keys.len()]; |
169 | 7 | self.inner_store |
170 | 7 | .has_with_results(¬_cached_keys, &mut inner_results) |
171 | 7 | .await |
172 | 7 | .err_tip(|| "In ExistenceCacheStore::inner_has_with_results")?0 ; |
173 | | |
174 | | // Insert found from previous query into our cache. |
175 | | { |
176 | | // Note: Sadly due to some weird lifetime issues we need to collect here, but |
177 | | // in theory we don't actually need to collect. |
178 | 7 | let inserts = not_cached_keys |
179 | 7 | .iter() |
180 | 7 | .zip(inner_results.iter()) |
181 | 7 | .filter_map(|(key, result)| { |
182 | 7 | result.map(|size| (key2 .borrow2 ().into_digest2 (), ExistenceItem(size)2 )) |
183 | 7 | }) |
184 | 7 | .collect::<Vec<_>>(); |
185 | 7 | drop(self.existence_cache.insert_many(inserts).await); |
186 | | } |
187 | | |
188 | | // Merge the results from the cache and the query. |
189 | | { |
190 | 7 | let mut inner_results_iter = inner_results.into_iter(); |
191 | | // We know at this point that any None in results was queried and will have |
192 | | // a result in inner_results_iter, so use this knowledge to fill in the results. |
193 | 7 | for result in results.iter_mut() { |
194 | 7 | if result.is_none() { Branch (194:20): [True: 0, False: 0]
Branch (194:20): [True: 2, False: 0]
Branch (194:20): [True: 5, False: 0]
Branch (194:20): [Folded - Ignored]
|
195 | 7 | *result = inner_results_iter |
196 | 7 | .next() |
197 | 7 | .expect("has_with_results returned less results than expected"); |
198 | 7 | }0 |
199 | | } |
200 | | // Ensure that there was no logic error by ensuring our iterator is not empty. |
201 | 0 | error_if!( |
202 | 7 | inner_results_iter.next().is_some(), Branch (202:17): [True: 0, False: 0]
Branch (202:17): [True: 0, False: 2]
Branch (202:17): [True: 0, False: 5]
Branch (202:17): [Folded - Ignored]
|
203 | | "has_with_results returned more results than expected" |
204 | | ); |
205 | | } |
206 | | |
207 | 7 | Ok(()) |
208 | 7 | } |
209 | | } |
210 | | |
211 | | #[async_trait] |
212 | | impl<I: InstantWrapper> StoreDriver for ExistenceCacheStore<I> { |
213 | | async fn has_with_results( |
214 | | self: Pin<&Self>, |
215 | | digests: &[StoreKey<'_>], |
216 | | results: &mut [Option<u64>], |
217 | 4 | ) -> Result<(), Error> { |
218 | | // TODO(palfrey) This is a bit of a hack to get around the lifetime issues with the |
219 | | // existence_cache. We need to convert the digests to owned values to be able to |
220 | | // insert them into the cache. In theory it should be able to elide this conversion |
221 | | // but it seems to be a bit tricky to get right. |
222 | | let digests: Vec<_> = digests |
223 | | .iter() |
224 | 4 | .map(|key| key.borrow().into_digest()) |
225 | | .collect(); |
226 | | self.inner_has_with_results(&digests, results).await |
227 | 4 | } |
228 | | |
229 | | async fn update( |
230 | | self: Pin<&Self>, |
231 | | key: StoreKey<'_>, |
232 | | mut reader: DropCloserReadHalf, |
233 | | size_info: UploadSizeInfo, |
234 | 3 | ) -> Result<(), Error> { |
235 | | let digest = key.into_digest(); |
236 | | let mut exists = [None]; |
237 | | self.inner_has_with_results(&[digest], &mut exists) |
238 | | .await |
239 | | .err_tip(|| "In ExistenceCacheStore::update")?; |
240 | | if exists[0].is_some() { |
241 | | // We need to drain the reader to avoid the writer complaining that we dropped |
242 | | // the connection prematurely. |
243 | | reader |
244 | | .drain() |
245 | | .await |
246 | | .err_tip(|| "In ExistenceCacheStore::update")?; |
247 | | return Ok(()); |
248 | | } |
249 | | { |
250 | | let mut locked_callbacks = self.pause_remove_callbacks.lock(); |
251 | | if locked_callbacks.is_none() { |
252 | | locked_callbacks.replace(vec![]); |
253 | | } |
254 | | } |
255 | | trace!(?digest, "Inserting into inner cache"); |
256 | | let result = self.inner_store.update(digest, reader, size_info).await; |
257 | | if result.is_ok() { |
258 | | trace!(?digest, "Inserting into existence cache"); |
259 | | if let UploadSizeInfo::ExactSize(size) = size_info { |
260 | | let _ = self |
261 | | .existence_cache |
262 | | .insert(digest, ExistenceItem(size)) |
263 | | .await; |
264 | | } |
265 | | } |
266 | | { |
267 | | let maybe_keys = self.pause_remove_callbacks.lock().take(); |
268 | | if let Some(keys) = maybe_keys { |
269 | | let mut callbacks: FuturesUnordered<_> = keys |
270 | | .into_iter() |
271 | 1 | .map(|store_key| self.callback(store_key)) |
272 | | .collect(); |
273 | | while callbacks.next().await.is_some() {} |
274 | | } |
275 | | } |
276 | | result |
277 | 3 | } |
278 | | |
279 | | async fn get_part( |
280 | | self: Pin<&Self>, |
281 | | key: StoreKey<'_>, |
282 | | writer: &mut DropCloserWriteHalf, |
283 | | offset: u64, |
284 | | length: Option<u64>, |
285 | 1 | ) -> Result<(), Error> { |
286 | | let digest = key.into_digest(); |
287 | | let result = self |
288 | | .inner_store |
289 | | .get_part(digest, writer, offset, length) |
290 | | .await; |
291 | | if result.is_ok() { |
292 | | let _ = self |
293 | | .existence_cache |
294 | | .insert(digest, ExistenceItem(digest.size_bytes())) |
295 | | .await; |
296 | | } |
297 | | result |
298 | 1 | } |
299 | | |
300 | 0 | fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver { |
301 | 0 | self |
302 | 0 | } |
303 | | |
304 | 0 | fn as_any<'a>(&'a self) -> &'a (dyn core::any::Any + Sync + Send + 'static) { |
305 | 0 | self |
306 | 0 | } |
307 | | |
308 | 0 | fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> { |
309 | 0 | self |
310 | 0 | } |
311 | | |
312 | 0 | fn register_remove_callback( |
313 | 0 | self: Arc<Self>, |
314 | 0 | callback: Arc<dyn RemoveItemCallback>, |
315 | 0 | ) -> Result<(), Error> { |
316 | 0 | self.inner_store.register_remove_callback(callback) |
317 | 0 | } |
318 | | } |
319 | | |
320 | | #[async_trait] |
321 | | impl<I: InstantWrapper> HealthStatusIndicator for ExistenceCacheStore<I> { |
322 | 0 | fn get_name(&self) -> &'static str { |
323 | 0 | "ExistenceCacheStore" |
324 | 0 | } |
325 | | |
326 | 0 | async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus { |
327 | | StoreDriver::check_health(Pin::new(self), namespace).await |
328 | 0 | } |
329 | | } |