/build/source/nativelink-store/src/completeness_checking_store.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // See LICENSE file for details |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use core::pin::Pin; |
16 | | use core::{iter, mem}; |
17 | | use std::sync::Arc; |
18 | | |
19 | | use async_trait::async_trait; |
20 | | use futures::stream::{FuturesUnordered, StreamExt}; |
21 | | use futures::{FutureExt, TryFutureExt, select}; |
22 | | use nativelink_error::{Code, Error, ResultExt, make_err}; |
23 | | use nativelink_metric::MetricsComponent; |
24 | | use nativelink_proto::build::bazel::remote::execution::v2::{ |
25 | | ActionResult as ProtoActionResult, OutputDirectory as ProtoOutputDirectory, Tree as ProtoTree, |
26 | | }; |
27 | | use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; |
28 | | use nativelink_util::common::DigestInfo; |
29 | | use nativelink_util::health_utils::{HealthStatusIndicator, default_health_status_indicator}; |
30 | | use nativelink_util::metrics_utils::CounterWithTime; |
31 | | use nativelink_util::store_trait::{ |
32 | | RemoveItemCallback, Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo, |
33 | | }; |
34 | | use parking_lot::Mutex; |
35 | | use tokio::sync::Notify; |
36 | | use tracing::warn; |
37 | | |
38 | | use crate::ac_utils::{get_and_decode_digest, get_size_and_decode_digest}; |
39 | | |
40 | | /// Given a proto action result, return all relevant digests and |
41 | | /// output directories that need to be checked. |
42 | 8 | fn get_digests_and_output_dirs( |
43 | 8 | action_result: ProtoActionResult, |
44 | 8 | ) -> Result<(Vec<StoreKey<'static>>, Vec<ProtoOutputDirectory>), Error> { |
45 | | // TODO(palfrey) When `try_collect()` is stable we can use it instead. |
46 | 8 | let mut digest_iter = action_result |
47 | 8 | .output_files |
48 | 8 | .into_iter() |
49 | 8 | .filter_map(|file| file.digest.map(DigestInfo::try_from)) |
50 | 8 | .chain(action_result.stdout_digest.map(DigestInfo::try_from)) |
51 | 8 | .chain(action_result.stderr_digest.map(DigestInfo::try_from)); |
52 | 8 | let mut digest_infos = Vec::with_capacity(digest_iter.size_hint().1.unwrap_or(0)); |
53 | 8 | digest_iter |
54 | 24 | .try_for_each8 (|maybe_digest| { |
55 | 24 | digest_infos.push(maybe_digest?0 .into()); |
56 | 24 | Result::<_, Error>::Ok(()) |
57 | 24 | }) |
58 | 8 | .err_tip(|| "Some digests could not be converted to DigestInfos")?0 ; |
59 | 8 | Ok((digest_infos, action_result.output_directories)) |
60 | 8 | } |
61 | | |
62 | | /// Given a list of output directories recursively get all digests |
63 | | /// that need to be checked and pass them into `handle_digest_infos_fn` |
64 | | /// as they are found. |
65 | | #[expect(clippy::future_not_send)] // TODO(jhpratt) remove this |
66 | 8 | async fn check_output_directories<'a>( |
67 | 8 | cas_store: &Store, |
68 | 8 | output_directories: Vec<ProtoOutputDirectory>, |
69 | 8 | handle_digest_infos_fn: &impl Fn(Vec<StoreKey<'a>>), |
70 | 8 | ) -> Result<(), Error> { |
71 | 8 | let mut futures = FuturesUnordered::new(); |
72 | | |
73 | 8 | let tree_digests = output_directories |
74 | 8 | .into_iter() |
75 | 8 | .filter_map(|output_dir| output_dir.tree_digest.map(DigestInfo::try_from)); |
76 | 16 | for maybe_tree_digest8 in tree_digests { |
77 | 8 | let tree_digest = maybe_tree_digest |
78 | 8 | .err_tip(|| "Could not decode tree digest CompletenessCheckingStore::has")?0 ; |
79 | 8 | futures.push(async move { |
80 | 8 | let tree = get_and_decode_digest::<ProtoTree>(cas_store, tree_digest.into()).await?0 ; |
81 | | // TODO(palfrey) When `try_collect()` is stable we can use it instead. |
82 | | // https://github.com/rust-lang/rust/issues/94047 |
83 | 16 | let mut digest_iter8 = tree.children8 .into_iter8 ().chain8 (tree.root8 ).flat_map8 (|dir| { |
84 | 16 | dir.files |
85 | 16 | .into_iter() |
86 | 16 | .filter_map(|f| f.digest.map(DigestInfo::try_from)) |
87 | 16 | }); |
88 | | |
89 | 8 | let mut digest_infos = Vec::with_capacity(digest_iter.size_hint().1.unwrap_or(0)); |
90 | 8 | digest_iter |
91 | 16 | .try_for_each8 (|maybe_digest| { |
92 | 16 | digest_infos.push(maybe_digest?0 .into()); |
93 | 16 | Result::<_, Error>::Ok(()) |
94 | 16 | }) |
95 | 8 | .err_tip(|| "Expected digest to exist and be convertible")?0 ; |
96 | 8 | handle_digest_infos_fn(digest_infos); |
97 | 8 | Ok(()) |
98 | 8 | }); |
99 | | } |
100 | | |
101 | 16 | while let Some(result8 ) = futures.next().await { Branch (101:15): [True: 8, False: 8]
Branch (101:15): [Folded - Ignored]
|
102 | 8 | match result { |
103 | 8 | Ok(()) => {} |
104 | 0 | Err(e) => return Err(e), |
105 | | } |
106 | | } |
107 | 8 | Ok(()) |
108 | 8 | } |
109 | | |
110 | | #[derive(Debug, MetricsComponent)] |
111 | | pub struct CompletenessCheckingStore { |
112 | | cas_store: Store, |
113 | | ac_store: Store, |
114 | | |
115 | | #[metric(help = "Incomplete entries hit in CompletenessCheckingStore")] |
116 | | incomplete_entries_counter: CounterWithTime, |
117 | | #[metric(help = "Complete entries hit in CompletenessCheckingStore")] |
118 | | complete_entries_counter: CounterWithTime, |
119 | | } |
120 | | |
121 | | impl CompletenessCheckingStore { |
122 | 8 | pub fn new(ac_store: Store, cas_store: Store) -> Arc<Self> { |
123 | 8 | Arc::new(Self { |
124 | 8 | cas_store, |
125 | 8 | ac_store, |
126 | 8 | incomplete_entries_counter: CounterWithTime::default(), |
127 | 8 | complete_entries_counter: CounterWithTime::default(), |
128 | 8 | }) |
129 | 8 | } |
130 | | |
131 | | /// Check that all files and directories in action results |
132 | | /// exist in the CAS. Does this by decoding digests and |
133 | | /// checking their existence in two separate sets of futures that |
134 | | /// are polled concurrently. |
135 | 8 | async fn inner_has_with_results( |
136 | 8 | &self, |
137 | 8 | action_result_digests: &[StoreKey<'_>], |
138 | 8 | results: &mut [Option<u64>], |
139 | 8 | ) -> Result<(), Error> { |
140 | | // Holds shared state between the different futures. |
141 | | // This is how get around lifetime issues. |
142 | | struct State<'a> { |
143 | | results: &'a mut [Option<u64>], |
144 | | digests_to_check: Vec<StoreKey<'a>>, |
145 | | digests_to_check_idxs: Vec<usize>, |
146 | | notify: Arc<Notify>, |
147 | | done: bool, |
148 | | } |
149 | | // Note: In theory Mutex is not needed, but lifetimes are |
150 | | // very tricky to get right here. Since we are using parking_lot |
151 | | // and we are guaranteed to never have lock collisions, it should |
152 | | // be nearly as fast as a few atomic operations. |
153 | 8 | let state_mux = &Mutex::new(State { |
154 | 8 | results, |
155 | 8 | digests_to_check: Vec::new(), |
156 | 8 | digests_to_check_idxs: Vec::new(), |
157 | 8 | // Note: Any time `digests_to_check` or `digests_to_check_idxs` is |
158 | 8 | // modified we must notify the subscriber here. |
159 | 8 | notify: Arc::new(Notify::new()), |
160 | 8 | done: false, |
161 | 8 | }); |
162 | | |
163 | 8 | let mut futures = action_result_digests |
164 | 8 | .iter() |
165 | 8 | .enumerate() |
166 | 8 | .map(|(i, digest)| { |
167 | 8 | async move { |
168 | | // Note: We don't err_tip here because often have NotFound here which is ok. |
169 | 8 | let (action_result, size) = get_size_and_decode_digest::<ProtoActionResult>( |
170 | 8 | &self.ac_store, |
171 | 8 | digest.borrow(), |
172 | 8 | ) |
173 | 8 | .await?0 ; |
174 | | |
175 | 8 | let (mut digest_infos, output_directories) = |
176 | 8 | get_digests_and_output_dirs(action_result)?0 ; |
177 | | |
178 | | { |
179 | 8 | let mut state = state_mux.lock(); |
180 | | |
181 | | // We immediately set the size of the digest here. Later we will unset it if |
182 | | // we find that the digest has missing outputs. |
183 | 8 | state.results[i] = Some(size); |
184 | 8 | let rep_len = digest_infos.len(); |
185 | 8 | if state.digests_to_check.is_empty() { Branch (185:28): [True: 8, False: 0]
Branch (185:28): [Folded - Ignored]
|
186 | 8 | // Hot path: Most actions only have files and only one digest |
187 | 8 | // requested to be checked. So we can avoid the heap allocation |
188 | 8 | // by just swapping out our container's stack if our pending_digests |
189 | 8 | // is empty. |
190 | 8 | mem::swap(&mut state.digests_to_check, &mut digest_infos); |
191 | 8 | } else { |
192 | 0 | state.digests_to_check.extend(digest_infos); |
193 | 0 | } |
194 | 8 | state |
195 | 8 | .digests_to_check_idxs |
196 | 8 | .extend(iter::repeat_n(i, rep_len)); |
197 | 8 | state.notify.notify_one(); |
198 | | } |
199 | | |
200 | | // Hot path: It is very common for no output directories to be defined. |
201 | | // So we can avoid any needless work by early returning. |
202 | 8 | if output_directories.is_empty() { Branch (202:24): [True: 0, False: 8]
Branch (202:24): [Folded - Ignored]
|
203 | 0 | return Ok(()); |
204 | 8 | } |
205 | | |
206 | 8 | check_output_directories( |
207 | 8 | &self.cas_store, |
208 | 8 | output_directories, |
209 | 8 | &move |digest_infos| { |
210 | 8 | let mut state = state_mux.lock(); |
211 | 8 | let rep_len = digest_infos.len(); |
212 | 8 | state.digests_to_check.extend(digest_infos); |
213 | 8 | state |
214 | 8 | .digests_to_check_idxs |
215 | 8 | .extend(iter::repeat_n(i, rep_len)); |
216 | 8 | state.notify.notify_one(); |
217 | 8 | }, |
218 | | ) |
219 | 8 | .await?0 ; |
220 | | |
221 | 8 | Result::<(), Error>::Ok(()) |
222 | 8 | } |
223 | | // Add a tip to the error to help with debugging and the index of the |
224 | | // digest that failed so we know which one to unset. |
225 | 8 | .map_err(move |mut e| {0 |
226 | 0 | if e.code != Code::NotFound { Branch (226:24): [True: 0, False: 0]
Branch (226:24): [Folded - Ignored]
|
227 | 0 | e = e.append( |
228 | 0 | format!("Error checking existence of digest ({digest}) in CompletenessCheckingStore::has"), |
229 | 0 | ); |
230 | 0 | } |
231 | 0 | (e, i) |
232 | 0 | }) |
233 | 8 | }) |
234 | 8 | .collect::<FuturesUnordered<_>>(); |
235 | | |
236 | | // This future will wait for the notify to be notified and then |
237 | | // check the CAS store for the digest's existence. |
238 | | // For optimization reasons we only allow one outstanding call to |
239 | | // the underlying `has_with_results()` at a time. This is because |
240 | | // we want to give the ability for stores to batch requests together |
241 | | // whenever possible. |
242 | | // The most common case is only one notify will ever happen. |
243 | 8 | let check_existence_fut = async { |
244 | 8 | let mut has_results = vec![]; |
245 | 8 | let notify = state_mux.lock().notify.clone(); |
246 | | loop { |
247 | 23 | notify.notified().await; |
248 | 15 | let (digests, indexes) = { |
249 | 23 | let mut state = state_mux.lock(); |
250 | 23 | if state.done { Branch (250:24): [True: 10, False: 13]
Branch (250:24): [Folded - Ignored]
|
251 | 10 | if state.digests_to_check.is_empty() { Branch (251:28): [True: 8, False: 2]
Branch (251:28): [Folded - Ignored]
|
252 | 8 | break; |
253 | 2 | } |
254 | | // Edge case: It is possible for our `digest_to_check` to have had |
255 | | // data added, `notify_one` called, then immediately `done` set to |
256 | | // true. We protect ourselves by checking if we have digests if done |
257 | | // is set, and if we do, let ourselves know to run again, but continue |
258 | | // processing the data. |
259 | 2 | notify.notify_one(); |
260 | 13 | } |
261 | 15 | ( |
262 | 15 | mem::take(&mut state.digests_to_check), |
263 | 15 | mem::take(&mut state.digests_to_check_idxs), |
264 | 15 | ) |
265 | | }; |
266 | 15 | assert_eq!( |
267 | 15 | digests.len(), |
268 | 15 | indexes.len(), |
269 | 0 | "Expected sizes to match in CompletenessCheckingStore::has" |
270 | | ); |
271 | | |
272 | | // Recycle our results vector to avoid needless allocations. |
273 | 15 | has_results.clear(); |
274 | 15 | has_results.resize(digests.len(), None); |
275 | 15 | self.cas_store |
276 | 15 | .has_with_results(&digests, &mut has_results[..]) |
277 | 15 | .await |
278 | 15 | .err_tip( |
279 | | || "Error calling has_with_results() inside CompletenessCheckingStore::has", |
280 | 0 | )?; |
281 | 15 | let missed_indexes = has_results |
282 | 15 | .iter() |
283 | 15 | .zip(indexes) |
284 | 40 | .filter_map15 (|(r, index)| r.map_or_else(|| Some(index6 ), |_| None)); |
285 | | { |
286 | 15 | let mut state = state_mux.lock(); |
287 | 21 | for index6 in missed_indexes { |
288 | 6 | state.results[index] = None; |
289 | 6 | } |
290 | | } |
291 | | } |
292 | 8 | Result::<(), Error>::Ok(()) |
293 | 8 | } |
294 | 8 | .fuse(); |
295 | 8 | tokio::pin!(check_existence_fut); |
296 | | |
297 | | loop { |
298 | | // Poll both futures at the same time. |
299 | 16 | select! { |
300 | 0 | r = check_existence_fut => { |
301 | 0 | return Err(make_err!( |
302 | 0 | Code::Internal, |
303 | 0 | "CompletenessCheckingStore's check_existence_fut ended unexpectedly {r:?}" |
304 | 0 | )); |
305 | | } |
306 | 16 | maybe_result = futures.next() => { |
307 | 8 | match maybe_result { |
308 | 8 | Some(Ok(())) => self.complete_entries_counter.inc(), |
309 | 0 | Some(Err((err, i))) => { |
310 | 0 | self.incomplete_entries_counter.inc(); |
311 | 0 | state_mux.lock().results[i] = None; |
312 | | // Note: Don't return the errors. We just flag the result as |
313 | | // missing but show a warning if it's not a NotFound. |
314 | 0 | if err.code != Code::NotFound { Branch (314:32): [True: 0, False: 0]
Branch (314:32): [Folded - Ignored]
|
315 | 0 | warn!( |
316 | | ?err, |
317 | 0 | "Error checking existence of digest" |
318 | | ); |
319 | 0 | } |
320 | | } |
321 | | None => { |
322 | | // We are done, so flag it done and ensure we notify the |
323 | | // subscriber future. |
324 | 8 | { |
325 | 8 | let mut state = state_mux.lock(); |
326 | 8 | state.done = true; |
327 | 8 | state.notify.notify_one(); |
328 | 8 | } |
329 | 8 | check_existence_fut |
330 | 8 | .await |
331 | 8 | .err_tip(|| "CompletenessCheckingStore's check_existence_fut ended unexpectedly on last await")?0 ; |
332 | 8 | return Ok(()); |
333 | | } |
334 | | } |
335 | | } |
336 | | } |
337 | | } |
338 | | // Unreachable. |
339 | 8 | } |
340 | | } |
341 | | |
342 | | #[async_trait] |
343 | | impl StoreDriver for CompletenessCheckingStore { |
344 | | async fn has_with_results( |
345 | | self: Pin<&Self>, |
346 | | keys: &[StoreKey<'_>], |
347 | | results: &mut [Option<u64>], |
348 | 6 | ) -> Result<(), Error> { |
349 | | self.inner_has_with_results(keys, results).await |
350 | 6 | } |
351 | | |
352 | | async fn update( |
353 | | self: Pin<&Self>, |
354 | | key: StoreKey<'_>, |
355 | | reader: DropCloserReadHalf, |
356 | | size_info: UploadSizeInfo, |
357 | 8 | ) -> Result<(), Error> { |
358 | | self.ac_store.update(key, reader, size_info).await |
359 | 8 | } |
360 | | |
361 | | async fn get_part( |
362 | | self: Pin<&Self>, |
363 | | key: StoreKey<'_>, |
364 | | writer: &mut DropCloserWriteHalf, |
365 | | offset: u64, |
366 | | length: Option<u64>, |
367 | 2 | ) -> Result<(), Error> { |
368 | | let results = &mut [None]; |
369 | | self.inner_has_with_results(&[key.borrow()], results) |
370 | | .await |
371 | | .err_tip(|| "when calling CompletenessCheckingStore::get_part")?; |
372 | | if results[0].is_none() { |
373 | | return Err(make_err!( |
374 | | Code::NotFound, |
375 | | "Digest found, but not all parts were found in CompletenessCheckingStore::get_part" |
376 | | )); |
377 | | } |
378 | | self.ac_store.get_part(key, writer, offset, length).await |
379 | 2 | } |
380 | | |
381 | 0 | fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver { |
382 | 0 | self |
383 | 0 | } |
384 | | |
385 | 0 | fn as_any(&self) -> &(dyn core::any::Any + Sync + Send + 'static) { |
386 | 0 | self |
387 | 0 | } |
388 | | |
389 | 0 | fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> { |
390 | 0 | self |
391 | 0 | } |
392 | | |
393 | 0 | fn register_remove_callback( |
394 | 0 | self: Arc<Self>, |
395 | 0 | callback: Arc<dyn RemoveItemCallback>, |
396 | 0 | ) -> Result<(), Error> { |
397 | 0 | self.ac_store.register_remove_callback(callback.clone())?; |
398 | 0 | self.cas_store.register_remove_callback(callback)?; |
399 | 0 | Ok(()) |
400 | 0 | } |
401 | | } |
402 | | |
403 | | default_health_status_indicator!(CompletenessCheckingStore); |