/build/source/nativelink-store/src/existence_cache_store.rs
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use std::borrow::Cow; |
16 | | use std::pin::Pin; |
17 | | use std::sync::Arc; |
18 | | use std::time::SystemTime; |
19 | | |
20 | | use async_trait::async_trait; |
21 | | use nativelink_config::stores::{EvictionPolicy, ExistenceCacheSpec}; |
22 | | use nativelink_error::{error_if, Error, ResultExt}; |
23 | | use nativelink_metric::MetricsComponent; |
24 | | use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; |
25 | | use nativelink_util::common::DigestInfo; |
26 | | use nativelink_util::evicting_map::{EvictingMap, LenEntry}; |
27 | | use nativelink_util::health_utils::{HealthStatus, HealthStatusIndicator}; |
28 | | use nativelink_util::instant_wrapper::InstantWrapper; |
29 | | use nativelink_util::store_trait::{Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo}; |
30 | | |
31 | | #[derive(Clone, Debug)] |
32 | | struct ExistanceItem(u64); |
33 | | |
34 | | impl LenEntry for ExistanceItem { |
35 | | #[inline] |
36 | 15 | fn len(&self) -> u64 { |
37 | 15 | self.0 |
38 | 15 | } |
39 | | |
40 | | #[inline] |
41 | 0 | fn is_empty(&self) -> bool { |
42 | 0 | false |
43 | 0 | } |
44 | | } |
45 | | |
46 | 0 | #[derive(MetricsComponent)] |
47 | | pub struct ExistenceCacheStore<I: InstantWrapper> { |
48 | | #[metric(group = "inner_store")] |
49 | | inner_store: Store, |
50 | | existence_cache: EvictingMap<DigestInfo, ExistanceItem, I>, |
51 | | } |
52 | | |
53 | | impl ExistenceCacheStore<SystemTime> { |
54 | 3 | pub fn new(spec: &ExistenceCacheSpec, inner_store: Store) -> Arc<Self> { |
55 | 3 | Self::new_with_time(spec, inner_store, SystemTime::now()) |
56 | 3 | } |
57 | | } |
58 | | |
59 | | impl<I: InstantWrapper> ExistenceCacheStore<I> { |
60 | 4 | pub fn new_with_time( |
61 | 4 | spec: &ExistenceCacheSpec, |
62 | 4 | inner_store: Store, |
63 | 4 | anchor_time: I, |
64 | 4 | ) -> Arc<Self> { |
65 | 4 | let empty_policy = EvictionPolicy::default(); |
66 | 4 | let eviction_policy = spec.eviction_policy.as_ref().unwrap_or(&empty_policy); |
67 | 4 | Arc::new(Self { |
68 | 4 | inner_store, |
69 | 4 | existence_cache: EvictingMap::new(eviction_policy, anchor_time), |
70 | 4 | }) |
71 | 4 | } |
72 | | |
73 | 4 | pub async fn exists_in_cache(&self, digest: &DigestInfo) -> bool { |
74 | 4 | let mut results = [None]; |
75 | 4 | self.existence_cache |
76 | 4 | .sizes_for_keys([digest], &mut results[..], true /* peek */) |
77 | 0 | .await; |
78 | 4 | results[0].is_some() |
79 | 4 | } |
80 | | |
81 | 1 | pub async fn remove_from_cache(&self, digest: &DigestInfo) { |
82 | 1 | self.existence_cache.remove(digest).await0 ; |
83 | 1 | } |
84 | | |
85 | 8 | async fn inner_has_with_results( |
86 | 8 | self: Pin<&Self>, |
87 | 8 | keys: &[DigestInfo], |
88 | 8 | results: &mut [Option<u64>], |
89 | 8 | ) -> Result<(), Error> { |
90 | 8 | self.existence_cache |
91 | 8 | .sizes_for_keys(keys, results, true /* peek */) |
92 | 0 | .await; |
93 | | |
94 | 8 | let not_cached_keys: Vec<_> = keys |
95 | 8 | .iter() |
96 | 8 | .zip(results.iter()) |
97 | 8 | .filter_map(|(digest, result)| result.map_or_else(|| Some(digest.into())5 , |_| None3 )) |
98 | 8 | .collect(); |
99 | 8 | |
100 | 8 | // Hot path optimization when all keys are cached. |
101 | 8 | if not_cached_keys.is_empty() { Branch (101:12): [True: 0, False: 0]
Branch (101:12): [True: 0, False: 3]
Branch (101:12): [True: 3, False: 2]
Branch (101:12): [Folded - Ignored]
|
102 | 3 | return Ok(()); |
103 | 5 | } |
104 | 5 | |
105 | 5 | // Now query only the items not found in the cache. |
106 | 5 | let mut inner_results = vec![None; not_cached_keys.len()]; |
107 | 5 | self.inner_store |
108 | 5 | .has_with_results(¬_cached_keys, &mut inner_results) |
109 | 0 | .await |
110 | 5 | .err_tip(|| "In ExistenceCacheStore::inner_has_with_results"0 )?0 ; |
111 | | |
112 | | // Insert found from previous query into our cache. |
113 | | { |
114 | | // Note: Sadly due to some weird lifetime issues we need to collect here, but |
115 | | // in theory we don't actually need to collect. |
116 | 5 | let inserts = not_cached_keys |
117 | 5 | .iter() |
118 | 5 | .zip(inner_results.iter()) |
119 | 5 | .filter_map(|(key, result)| { |
120 | 5 | result.map(|size| (key.borrow().into_digest(), ExistanceItem(size))2 ) |
121 | 5 | }) |
122 | 5 | .collect::<Vec<_>>(); |
123 | 5 | let _ = self.existence_cache.insert_many(inserts).await0 ; |
124 | | } |
125 | | |
126 | | // Merge the results from the cache and the query. |
127 | | { |
128 | 5 | let mut inner_results_iter = inner_results.into_iter(); |
129 | | // We know at this point that any None in results was queried and will have |
130 | | // a result in inner_results_iter, so use this knowledge to fill in the results. |
131 | 5 | for result in results.iter_mut() { |
132 | 5 | if result.is_none() { Branch (132:20): [True: 0, False: 0]
Branch (132:20): [True: 3, False: 0]
Branch (132:20): [True: 2, False: 0]
Branch (132:20): [Folded - Ignored]
|
133 | 5 | *result = inner_results_iter |
134 | 5 | .next() |
135 | 5 | .expect("has_with_results returned less results than expected"); |
136 | 5 | }0 |
137 | | } |
138 | | // Ensure that there was no logic error by ensuring our iterator is not empty. |
139 | 0 | error_if!( |
140 | 5 | inner_results_iter.next().is_some(), Branch (140:17): [True: 0, False: 0]
Branch (140:17): [True: 0, False: 3]
Branch (140:17): [True: 0, False: 2]
Branch (140:17): [Folded - Ignored]
|
141 | | "has_with_results returned more results than expected" |
142 | | ); |
143 | | } |
144 | | |
145 | 5 | Ok(()) |
146 | 8 | } |
147 | | } |
148 | | |
149 | | #[async_trait] |
150 | | impl<I: InstantWrapper> StoreDriver for ExistenceCacheStore<I> { |
151 | | async fn has_with_results( |
152 | | self: Pin<&Self>, |
153 | | digests: &[StoreKey<'_>], |
154 | | results: &mut [Option<u64>], |
155 | 6 | ) -> Result<(), Error> { |
156 | | // TODO(allada) This is a bit of a hack to get around the lifetime issues with the |
157 | | // existence_cache. We need to convert the digests to owned values to be able to |
158 | | // insert them into the cache. In theory it should be able to elide this conversion |
159 | | // but it seems to be a bit tricky to get right. |
160 | 6 | let digests: Vec<_> = digests |
161 | 6 | .iter() |
162 | 6 | .map(|key| key.borrow().into_digest()) |
163 | 6 | .collect(); |
164 | 6 | self.inner_has_with_results(&digests, results).await0 |
165 | 12 | } |
166 | | |
167 | | async fn update( |
168 | | self: Pin<&Self>, |
169 | | key: StoreKey<'_>, |
170 | | mut reader: DropCloserReadHalf, |
171 | | size_info: UploadSizeInfo, |
172 | 2 | ) -> Result<(), Error> { |
173 | 2 | let digest = key.into_digest(); |
174 | 2 | let mut exists = [None]; |
175 | 2 | self.inner_has_with_results(&[digest], &mut exists) |
176 | 0 | .await |
177 | 2 | .err_tip(|| "In ExistenceCacheStore::update"0 )?0 ; |
178 | 2 | if exists[0].is_some() { Branch (178:12): [True: 0, False: 0]
Branch (178:12): [True: 0, False: 2]
Branch (178:12): [True: 0, False: 0]
Branch (178:12): [Folded - Ignored]
|
179 | | // We need to drain the reader to avoid the writer complaining that we dropped |
180 | | // the connection prematurely. |
181 | 0 | reader |
182 | 0 | .drain() |
183 | 0 | .await |
184 | 0 | .err_tip(|| "In ExistenceCacheStore::update")?; |
185 | 0 | return Ok(()); |
186 | 2 | } |
187 | 2 | let result = self.inner_store.update(digest, reader, size_info).await0 ; |
188 | 2 | if result.is_ok() { Branch (188:12): [True: 0, False: 0]
Branch (188:12): [True: 2, False: 0]
Branch (188:12): [True: 0, False: 0]
Branch (188:12): [Folded - Ignored]
|
189 | 2 | if let UploadSizeInfo::ExactSize(size) = size_info { Branch (189:20): [True: 0, False: 0]
Branch (189:20): [True: 2, False: 0]
Branch (189:20): [True: 0, False: 0]
Branch (189:20): [Folded - Ignored]
|
190 | 2 | let _ = self |
191 | 2 | .existence_cache |
192 | 2 | .insert(digest, ExistanceItem(size)) |
193 | 0 | .await; |
194 | 0 | } |
195 | 0 | } |
196 | 2 | result |
197 | 4 | } |
198 | | |
199 | | async fn get_part( |
200 | | self: Pin<&Self>, |
201 | | key: StoreKey<'_>, |
202 | | writer: &mut DropCloserWriteHalf, |
203 | | offset: u64, |
204 | | length: Option<u64>, |
205 | 1 | ) -> Result<(), Error> { |
206 | 1 | let digest = key.into_digest(); |
207 | 1 | let result = self |
208 | 1 | .inner_store |
209 | 1 | .get_part(digest, writer, offset, length) |
210 | 0 | .await; |
211 | 1 | if result.is_ok() { Branch (211:12): [True: 0, False: 0]
Branch (211:12): [True: 1, False: 0]
Branch (211:12): [True: 0, False: 0]
Branch (211:12): [Folded - Ignored]
|
212 | 1 | let _ = self |
213 | 1 | .existence_cache |
214 | 1 | .insert(digest, ExistanceItem(digest.size_bytes())) |
215 | 0 | .await; |
216 | 0 | } |
217 | 1 | result |
218 | 2 | } |
219 | | |
220 | 0 | fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver { |
221 | 0 | self |
222 | 0 | } |
223 | | |
224 | 0 | fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) { |
225 | 0 | self |
226 | 0 | } |
227 | | |
228 | 0 | fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> { |
229 | 0 | self |
230 | 0 | } |
231 | | } |
232 | | |
233 | | #[async_trait] |
234 | | impl<I: InstantWrapper> HealthStatusIndicator for ExistenceCacheStore<I> { |
235 | 0 | fn get_name(&self) -> &'static str { |
236 | 0 | "ExistenceCacheStore" |
237 | 0 | } |
238 | | |
239 | 0 | async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus { |
240 | 0 | StoreDriver::check_health(Pin::new(self), namespace).await |
241 | 0 | } |
242 | | } |