/build/source/nativelink-store/src/completeness_checking_store.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use std::pin::Pin; |
16 | | use std::sync::Arc; |
17 | | use std::{iter, mem}; |
18 | | |
19 | | use async_trait::async_trait; |
20 | | use futures::stream::{FuturesUnordered, StreamExt}; |
21 | | use futures::{select, FutureExt, TryFutureExt}; |
22 | | use nativelink_error::{make_err, Code, Error, ResultExt}; |
23 | | use nativelink_metric::MetricsComponent; |
24 | | use nativelink_proto::build::bazel::remote::execution::v2::{ |
25 | | ActionResult as ProtoActionResult, OutputDirectory as ProtoOutputDirectory, Tree as ProtoTree, |
26 | | }; |
27 | | use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; |
28 | | use nativelink_util::common::DigestInfo; |
29 | | use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; |
30 | | use nativelink_util::metrics_utils::CounterWithTime; |
31 | | use nativelink_util::store_trait::{Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo}; |
32 | | use parking_lot::Mutex; |
33 | | use tokio::sync::Notify; |
34 | | use tracing::{event, Level}; |
35 | | |
36 | | use crate::ac_utils::{get_and_decode_digest, get_size_and_decode_digest}; |
37 | | |
38 | | /// Given a proto action result, return all relevant digests and |
39 | | /// output directories that need to be checked. |
40 | 8 | fn get_digests_and_output_dirs( |
41 | 8 | action_result: ProtoActionResult, |
42 | 8 | ) -> Result<(Vec<StoreKey<'static>>, Vec<ProtoOutputDirectory>), Error> { |
43 | 8 | // TODO(allada) When `try_collect()` is stable we can use it instead. |
44 | 8 | let mut digest_iter = action_result |
45 | 8 | .output_files |
46 | 8 | .into_iter() |
47 | 8 | .filter_map(|file| file.digest.map(DigestInfo::try_from)) |
48 | 8 | .chain(action_result.stdout_digest.map(DigestInfo::try_from)) |
49 | 8 | .chain(action_result.stderr_digest.map(DigestInfo::try_from)); |
50 | 8 | let mut digest_infos = Vec::with_capacity(digest_iter.size_hint().1.unwrap_or(0)); |
51 | 8 | digest_iter |
52 | 24 | .try_for_each(|maybe_digest| { |
53 | 24 | digest_infos.push(maybe_digest?0 .into()); |
54 | 24 | Result::<_, Error>::Ok(()) |
55 | 24 | }) |
56 | 8 | .err_tip(|| "Some digests could not be converted to DigestInfos"0 )?0 ; |
57 | 8 | Ok((digest_infos, action_result.output_directories)) |
58 | 8 | } |
59 | | |
60 | | /// Given a list of output directories recursively get all digests |
61 | | /// that need to be checked and pass them into `handle_digest_infos_fn` |
62 | | /// as they are found. |
63 | 8 | async fn check_output_directories<'a>( |
64 | 8 | cas_store: &Store, |
65 | 8 | output_directories: Vec<ProtoOutputDirectory>, |
66 | 8 | handle_digest_infos_fn: &impl Fn(Vec<StoreKey<'a>>), |
67 | 8 | ) -> Result<(), Error> { |
68 | 8 | let mut futures = FuturesUnordered::new(); |
69 | 8 | |
70 | 8 | let tree_digests = output_directories |
71 | 8 | .into_iter() |
72 | 8 | .filter_map(|output_dir| output_dir.tree_digest.map(DigestInfo::try_from)); |
73 | 16 | for maybe_tree_digest8 in tree_digests { |
74 | 8 | let tree_digest = maybe_tree_digest |
75 | 8 | .err_tip(|| "Could not decode tree digest CompletenessCheckingStore::has"0 )?0 ; |
76 | 8 | futures.push(async move { |
77 | 8 | let tree = get_and_decode_digest::<ProtoTree>(cas_store, tree_digest.into()).await?0 ; |
78 | | // TODO(allada) When `try_collect()` is stable we can use it instead. |
79 | | // https://github.com/rust-lang/rust/issues/94047 |
80 | 16 | let mut digest_iter = tree.children.into_iter().chain(tree.root).flat_map(8 |dir| { |
81 | 16 | dir.files |
82 | 16 | .into_iter() |
83 | 16 | .filter_map(|f| f.digest.map(DigestInfo::try_from)) |
84 | 16 | }); |
85 | 8 | |
86 | 8 | let mut digest_infos = Vec::with_capacity(digest_iter.size_hint().1.unwrap_or(0)); |
87 | 8 | digest_iter |
88 | 16 | .try_for_each(|maybe_digest| { |
89 | 16 | digest_infos.push(maybe_digest?0 .into()); |
90 | 16 | Result::<_, Error>::Ok(()) |
91 | 16 | }) |
92 | 8 | .err_tip(|| "Expected digest to exist and be convertable"0 )?0 ; |
93 | 8 | handle_digest_infos_fn(digest_infos); |
94 | 8 | Ok(()) |
95 | 8 | }); |
96 | 8 | } |
97 | | |
98 | 16 | while let Some(result8 ) = futures.next().await { Branch (98:15): [True: 8, False: 8]
Branch (98:15): [Folded - Ignored]
|
99 | 8 | match result { |
100 | 8 | Ok(()) => {} |
101 | 0 | Err(e) => return Err(e), |
102 | | } |
103 | | } |
104 | 8 | Ok(()) |
105 | 8 | } |
106 | | |
107 | | #[derive(MetricsComponent)] |
108 | | pub struct CompletenessCheckingStore { |
109 | | cas_store: Store, |
110 | | ac_store: Store, |
111 | | |
112 | | #[metric(help = "Incomplete entries hit in CompletenessCheckingStore")] |
113 | | incomplete_entries_counter: CounterWithTime, |
114 | | #[metric(help = "Complete entries hit in CompletenessCheckingStore")] |
115 | | complete_entries_counter: CounterWithTime, |
116 | | } |
117 | | |
118 | | impl CompletenessCheckingStore { |
119 | 8 | pub fn new(ac_store: Store, cas_store: Store) -> Arc<Self> { |
120 | 8 | Arc::new(CompletenessCheckingStore { |
121 | 8 | cas_store, |
122 | 8 | ac_store, |
123 | 8 | incomplete_entries_counter: CounterWithTime::default(), |
124 | 8 | complete_entries_counter: CounterWithTime::default(), |
125 | 8 | }) |
126 | 8 | } |
127 | | |
128 | | /// Check that all files and directories in action results |
129 | | /// exist in the CAS. Does this by decoding digests and |
130 | | /// checking their existence in two separate sets of futures that |
131 | | /// are polled concurrently. |
132 | 8 | async fn inner_has_with_results( |
133 | 8 | &self, |
134 | 8 | action_result_digests: &[StoreKey<'_>], |
135 | 8 | results: &mut [Option<u64>], |
136 | 8 | ) -> Result<(), Error> { |
137 | | // Holds shared state between the different futures. |
138 | | // This is how get around lifetime issues. |
139 | | struct State<'a> { |
140 | | results: &'a mut [Option<u64>], |
141 | | digests_to_check: Vec<StoreKey<'a>>, |
142 | | digests_to_check_idxs: Vec<usize>, |
143 | | notify: Arc<Notify>, |
144 | | done: bool, |
145 | | } |
146 | | // Note: In theory Mutex is not needed, but lifetimes are |
147 | | // very tricky to get right here. Since we are using parking_lot |
148 | | // and we are guaranteed to never have lock collisions, it should |
149 | | // be nearly as fast as a few atomic operations. |
150 | 8 | let state_mux = &Mutex::new(State { |
151 | 8 | results, |
152 | 8 | digests_to_check: Vec::new(), |
153 | 8 | digests_to_check_idxs: Vec::new(), |
154 | 8 | // Note: Any time `digests_to_check` or `digests_to_check_idxs` is |
155 | 8 | // modified we must notify the subscriber here. |
156 | 8 | notify: Arc::new(Notify::new()), |
157 | 8 | done: false, |
158 | 8 | }); |
159 | 8 | |
160 | 8 | let mut futures = action_result_digests |
161 | 8 | .iter() |
162 | 8 | .enumerate() |
163 | 8 | .map(|(i, digest)| { |
164 | 8 | async move { |
165 | | // Note: We don't err_tip here because often have NotFound here which is ok. |
166 | 8 | let (action_result, size) = get_size_and_decode_digest::<ProtoActionResult>( |
167 | 8 | &self.ac_store, |
168 | 8 | digest.borrow(), |
169 | 8 | ) |
170 | 8 | .await?0 ; |
171 | | |
172 | 8 | let (mut digest_infos, output_directories) = |
173 | 8 | get_digests_and_output_dirs(action_result)?0 ; |
174 | | |
175 | | { |
176 | 8 | let mut state = state_mux.lock(); |
177 | 8 | |
178 | 8 | // We immediately set the size of the digest here. Later we will unset it if |
179 | 8 | // we find that the digest has missing outputs. |
180 | 8 | state.results[i] = Some(size); |
181 | 8 | let rep_len = digest_infos.len(); |
182 | 8 | if state.digests_to_check.is_empty() { Branch (182:28): [True: 8, False: 0]
Branch (182:28): [Folded - Ignored]
|
183 | 8 | // Hot path: Most actions only have files and only one digest |
184 | 8 | // requested to be checked. So we can avoid the heap allocation |
185 | 8 | // by just swapping out our container's stack if our pending_digests |
186 | 8 | // is empty. |
187 | 8 | mem::swap(&mut state.digests_to_check, &mut digest_infos); |
188 | 8 | } else { |
189 | 0 | state.digests_to_check.extend(digest_infos); |
190 | 0 | } |
191 | 8 | state |
192 | 8 | .digests_to_check_idxs |
193 | 8 | .extend(iter::repeat(i).take(rep_len)); |
194 | 8 | state.notify.notify_one(); |
195 | 8 | } |
196 | 8 | |
197 | 8 | // Hot path: It is very common for no output directories to be defined. |
198 | 8 | // So we can avoid any needless work by early returning. |
199 | 8 | if output_directories.is_empty() { Branch (199:24): [True: 0, False: 8]
Branch (199:24): [Folded - Ignored]
|
200 | 0 | return Ok(()); |
201 | 8 | } |
202 | 8 | |
203 | 8 | check_output_directories( |
204 | 8 | &self.cas_store, |
205 | 8 | output_directories, |
206 | 8 | &move |digest_infos| { |
207 | 8 | let mut state = state_mux.lock(); |
208 | 8 | let rep_len = digest_infos.len(); |
209 | 8 | state.digests_to_check.extend(digest_infos); |
210 | 8 | state |
211 | 8 | .digests_to_check_idxs |
212 | 8 | .extend(iter::repeat(i).take(rep_len)); |
213 | 8 | state.notify.notify_one(); |
214 | 8 | }, |
215 | 8 | ) |
216 | 8 | .await?0 ; |
217 | | |
218 | 8 | Result::<(), Error>::Ok(()) |
219 | 8 | } |
220 | | // Add a tip to the error to help with debugging and the index of the |
221 | | // digest that failed so we know which one to unset. |
222 | 8 | .map_err(move |mut e| { |
223 | 0 | if e.code != Code::NotFound { Branch (223:24): [True: 0, False: 0]
Branch (223:24): [Folded - Ignored]
|
224 | 0 | e = e.append( |
225 | 0 | "Error checking existence of digest in CompletenessCheckingStore::has", |
226 | 0 | ); |
227 | 0 | } |
228 | 0 | (e, i) |
229 | 8 | }) |
230 | 8 | }) |
231 | 8 | .collect::<FuturesUnordered<_>>(); |
232 | 8 | |
233 | 8 | // This future will wait for the notify to be notified and then |
234 | 8 | // check the CAS store for the digest's existence. |
235 | 8 | // For optimization reasons we only allow one outstanding call to |
236 | 8 | // the underlying `has_with_results()` at a time. This is because |
237 | 8 | // we want to give the ability for stores to batch requests together |
238 | 8 | // whenever possible. |
239 | 8 | // The most common case is only one notify will ever happen. |
240 | 8 | let check_existence_fut = async { |
241 | 8 | let mut has_results = vec![]; |
242 | 8 | let notify = state_mux.lock().notify.clone(); |
243 | | loop { |
244 | 23 | notify.notified().await; |
245 | 15 | let (digests, indexes) = { |
246 | 23 | let mut state = state_mux.lock(); |
247 | 23 | if state.done { Branch (247:24): [True: 10, False: 13]
Branch (247:24): [Folded - Ignored]
|
248 | 10 | if state.digests_to_check.is_empty() { Branch (248:28): [True: 8, False: 2]
Branch (248:28): [Folded - Ignored]
|
249 | 8 | break; |
250 | 2 | } |
251 | 2 | // Edge case: It is possible for our `digest_to_check` to have had |
252 | 2 | // data added, `notify_one` called, then immediately `done` set to |
253 | 2 | // true. We protect ourselves by checking if we have digests if done |
254 | 2 | // is set, and if we do, let ourselves know to run again, but continue |
255 | 2 | // processing the data. |
256 | 2 | notify.notify_one(); |
257 | 13 | } |
258 | 15 | ( |
259 | 15 | mem::take(&mut state.digests_to_check), |
260 | 15 | mem::take(&mut state.digests_to_check_idxs), |
261 | 15 | ) |
262 | 15 | }; |
263 | 15 | assert_eq!( |
264 | 15 | digests.len(), |
265 | 15 | indexes.len(), |
266 | 0 | "Expected sizes to match in CompletenessCheckingStore::has" |
267 | | ); |
268 | | |
269 | | // Recycle our results vector to avoid needless allocations. |
270 | 15 | has_results.clear(); |
271 | 15 | has_results.resize(digests.len(), None); |
272 | 15 | self.cas_store |
273 | 15 | .has_with_results(&digests, &mut has_results[..]) |
274 | 15 | .await |
275 | 15 | .err_tip(|| { |
276 | 0 | "Error calling has_with_results() inside CompletenessCheckingStore::has" |
277 | 15 | })?0 ; |
278 | 15 | let missed_indexes = has_results |
279 | 15 | .iter() |
280 | 15 | .zip(indexes) |
281 | 40 | .filter_map(|(r, index)| r.map_or_else(|| Some(index)6 , |_| None34 )); |
282 | 15 | { |
283 | 15 | let mut state = state_mux.lock(); |
284 | 21 | for index6 in missed_indexes { |
285 | 6 | state.results[index] = None; |
286 | 6 | } |
287 | | } |
288 | | } |
289 | 8 | Result::<(), Error>::Ok(()) |
290 | 8 | } |
291 | 8 | .fuse(); |
292 | 8 | tokio::pin!(check_existence_fut); |
293 | | |
294 | | loop { |
295 | | // Poll both futures at the same time. |
296 | 16 | select! { |
297 | 0 | r = check_existence_fut => { |
298 | 0 | return Err(make_err!( |
299 | 0 | Code::Internal, |
300 | 0 | "CompletenessCheckingStore's check_existence_fut ended unexpectedly {r:?}" |
301 | 0 | )); |
302 | | } |
303 | 16 | maybe_result = futures.next() => { |
304 | 8 | match maybe_result { |
305 | 8 | Some(Ok(())) => self.complete_entries_counter.inc(), |
306 | 0 | Some(Err((err, i))) => { |
307 | 0 | self.incomplete_entries_counter.inc(); |
308 | 0 | state_mux.lock().results[i] = None; |
309 | 0 | // Note: Don't return the errors. We just flag the result as |
310 | 0 | // missing but show a warning if it's not a NotFound. |
311 | 0 | if err.code != Code::NotFound { Branch (311:32): [True: 0, False: 0]
Branch (311:32): [Folded - Ignored]
|
312 | 0 | event!( |
313 | 0 | Level::WARN, |
314 | | ?err, |
315 | 0 | "Error checking existence of digest" |
316 | | ); |
317 | 0 | } |
318 | | } |
319 | | None => { |
320 | | // We are done, so flag it done and ensure we notify the |
321 | | // subscriber future. |
322 | 8 | { |
323 | 8 | let mut state = state_mux.lock(); |
324 | 8 | state.done = true; |
325 | 8 | state.notify.notify_one(); |
326 | 8 | } |
327 | 8 | check_existence_fut |
328 | 8 | .await |
329 | 8 | .err_tip(|| "CompletenessCheckingStore's check_existence_fut ended unexpectedly on last await"0 )?0 ; |
330 | 8 | return Ok(()); |
331 | | } |
332 | | } |
333 | | } |
334 | | } |
335 | | } |
336 | | // Unreachable. |
337 | 8 | } |
338 | | } |
339 | | |
340 | | #[async_trait] |
341 | | impl StoreDriver for CompletenessCheckingStore { |
342 | | async fn has_with_results( |
343 | | self: Pin<&Self>, |
344 | | keys: &[StoreKey<'_>], |
345 | | results: &mut [Option<u64>], |
346 | 6 | ) -> Result<(), Error> { |
347 | 6 | self.inner_has_with_results(keys, results).await |
348 | 12 | } |
349 | | |
350 | | async fn update( |
351 | | self: Pin<&Self>, |
352 | | key: StoreKey<'_>, |
353 | | reader: DropCloserReadHalf, |
354 | | size_info: UploadSizeInfo, |
355 | 8 | ) -> Result<(), Error> { |
356 | 8 | self.ac_store.update(key, reader, size_info).await |
357 | 16 | } |
358 | | |
359 | | async fn get_part( |
360 | | self: Pin<&Self>, |
361 | | key: StoreKey<'_>, |
362 | | writer: &mut DropCloserWriteHalf, |
363 | | offset: u64, |
364 | | length: Option<u64>, |
365 | 2 | ) -> Result<(), Error> { |
366 | 2 | let results = &mut [None]; |
367 | 2 | self.inner_has_with_results(&[key.borrow()], results) |
368 | 2 | .await |
369 | 2 | .err_tip(|| "when calling CompletenessCheckingStore::get_part"0 )?0 ; |
370 | 2 | if results[0].is_none() { Branch (370:12): [True: 1, False: 1]
Branch (370:12): [Folded - Ignored]
|
371 | 1 | return Err(make_err!( |
372 | 1 | Code::NotFound, |
373 | 1 | "Digest found, but not all parts were found in CompletenessCheckingStore::get_part" |
374 | 1 | )); |
375 | 1 | } |
376 | 1 | self.ac_store.get_part(key, writer, offset, length).await |
377 | 4 | } |
378 | | |
379 | 0 | fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver { |
380 | 0 | self |
381 | 0 | } |
382 | | |
383 | 0 | fn as_any(&self) -> &(dyn std::any::Any + Sync + Send + 'static) { |
384 | 0 | self |
385 | 0 | } |
386 | | |
387 | 0 | fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> { |
388 | 0 | self |
389 | 0 | } |
390 | | } |
391 | | |
392 | | default_health_status_indicator!(CompletenessCheckingStore); |