Coverage Report

Created: 2025-10-30 00:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/completeness_checking_store.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::pin::Pin;
16
use core::{iter, mem};
17
use std::sync::Arc;
18
19
use async_trait::async_trait;
20
use futures::stream::{FuturesUnordered, StreamExt};
21
use futures::{FutureExt, TryFutureExt, select};
22
use nativelink_error::{Code, Error, ResultExt, make_err};
23
use nativelink_metric::MetricsComponent;
24
use nativelink_proto::build::bazel::remote::execution::v2::{
25
    ActionResult as ProtoActionResult, OutputDirectory as ProtoOutputDirectory, Tree as ProtoTree,
26
};
27
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
28
use nativelink_util::common::DigestInfo;
29
use nativelink_util::health_utils::{HealthStatusIndicator, default_health_status_indicator};
30
use nativelink_util::metrics_utils::CounterWithTime;
31
use nativelink_util::store_trait::{
32
    RemoveItemCallback, Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo,
33
};
34
use parking_lot::Mutex;
35
use tokio::sync::Notify;
36
use tracing::warn;
37
38
use crate::ac_utils::{get_and_decode_digest, get_size_and_decode_digest};
39
40
/// Given a proto action result, return all relevant digests and
41
/// output directories that need to be checked.
42
8
fn get_digests_and_output_dirs(
43
8
    action_result: ProtoActionResult,
44
8
) -> Result<(Vec<StoreKey<'static>>, Vec<ProtoOutputDirectory>), Error> {
45
    // TODO(palfrey) When `try_collect()` is stable we can use it instead.
46
8
    let mut digest_iter = action_result
47
8
        .output_files
48
8
        .into_iter()
49
8
        .filter_map(|file| file.digest.map(DigestInfo::try_from))
50
8
        .chain(action_result.stdout_digest.map(DigestInfo::try_from))
51
8
        .chain(action_result.stderr_digest.map(DigestInfo::try_from));
52
8
    let mut digest_infos = Vec::with_capacity(digest_iter.size_hint().1.unwrap_or(0));
53
8
    digest_iter
54
24
        .
try_for_each8
(|maybe_digest| {
55
24
            digest_infos.push(maybe_digest
?0
.into());
56
24
            Result::<_, Error>::Ok(())
57
24
        })
58
8
        .err_tip(|| "Some digests could not be converted to DigestInfos")
?0
;
59
8
    Ok((digest_infos, action_result.output_directories))
60
8
}
61
62
/// Given a list of output directories recursively get all digests
63
/// that need to be checked and pass them into `handle_digest_infos_fn`
64
/// as they are found.
65
#[expect(clippy::future_not_send)] // TODO(jhpratt) remove this
66
8
async fn check_output_directories<'a>(
67
8
    cas_store: &Store,
68
8
    output_directories: Vec<ProtoOutputDirectory>,
69
8
    handle_digest_infos_fn: &impl Fn(Vec<StoreKey<'a>>),
70
8
) -> Result<(), Error> {
71
8
    let mut futures = FuturesUnordered::new();
72
73
8
    let tree_digests = output_directories
74
8
        .into_iter()
75
8
        .filter_map(|output_dir| output_dir.tree_digest.map(DigestInfo::try_from));
76
16
    for 
maybe_tree_digest8
in tree_digests {
77
8
        let tree_digest = maybe_tree_digest
78
8
            .err_tip(|| "Could not decode tree digest CompletenessCheckingStore::has")
?0
;
79
8
        futures.push(async move {
80
8
            let tree = get_and_decode_digest::<ProtoTree>(cas_store, tree_digest.into()).await
?0
;
81
            // TODO(palfrey) When `try_collect()` is stable we can use it instead.
82
            // https://github.com/rust-lang/rust/issues/94047
83
16
            let 
mut digest_iter8
=
tree.children8
.
into_iter8
().
chain8
(
tree.root8
).
flat_map8
(|dir| {
84
16
                dir.files
85
16
                    .into_iter()
86
16
                    .filter_map(|f| f.digest.map(DigestInfo::try_from))
87
16
            });
88
89
8
            let mut digest_infos = Vec::with_capacity(digest_iter.size_hint().1.unwrap_or(0));
90
8
            digest_iter
91
16
                .
try_for_each8
(|maybe_digest| {
92
16
                    digest_infos.push(maybe_digest
?0
.into());
93
16
                    Result::<_, Error>::Ok(())
94
16
                })
95
8
                .err_tip(|| "Expected digest to exist and be convertible")
?0
;
96
8
            handle_digest_infos_fn(digest_infos);
97
8
            Ok(())
98
8
        });
99
    }
100
101
16
    while let Some(
result8
) = futures.next().await {
  Branch (101:15): [True: 8, False: 8]
  Branch (101:15): [Folded - Ignored]
102
8
        match result {
103
8
            Ok(()) => {}
104
0
            Err(e) => return Err(e),
105
        }
106
    }
107
8
    Ok(())
108
8
}
109
110
#[derive(Debug, MetricsComponent)]
111
pub struct CompletenessCheckingStore {
112
    cas_store: Store,
113
    ac_store: Store,
114
115
    #[metric(help = "Incomplete entries hit in CompletenessCheckingStore")]
116
    incomplete_entries_counter: CounterWithTime,
117
    #[metric(help = "Complete entries hit in CompletenessCheckingStore")]
118
    complete_entries_counter: CounterWithTime,
119
}
120
121
impl CompletenessCheckingStore {
122
8
    pub fn new(ac_store: Store, cas_store: Store) -> Arc<Self> {
123
8
        Arc::new(Self {
124
8
            cas_store,
125
8
            ac_store,
126
8
            incomplete_entries_counter: CounterWithTime::default(),
127
8
            complete_entries_counter: CounterWithTime::default(),
128
8
        })
129
8
    }
130
131
    /// Check that all files and directories in action results
132
    /// exist in the CAS. Does this by decoding digests and
133
    /// checking their existence in two separate sets of futures that
134
    /// are polled concurrently.
135
8
    async fn inner_has_with_results(
136
8
        &self,
137
8
        action_result_digests: &[StoreKey<'_>],
138
8
        results: &mut [Option<u64>],
139
8
    ) -> Result<(), Error> {
140
        // Holds shared state between the different futures.
141
        // This is how get around lifetime issues.
142
        struct State<'a> {
143
            results: &'a mut [Option<u64>],
144
            digests_to_check: Vec<StoreKey<'a>>,
145
            digests_to_check_idxs: Vec<usize>,
146
            notify: Arc<Notify>,
147
            done: bool,
148
        }
149
        // Note: In theory Mutex is not needed, but lifetimes are
150
        // very tricky to get right here. Since we are using parking_lot
151
        // and we are guaranteed to never have lock collisions, it should
152
        // be nearly as fast as a few atomic operations.
153
8
        let state_mux = &Mutex::new(State {
154
8
            results,
155
8
            digests_to_check: Vec::new(),
156
8
            digests_to_check_idxs: Vec::new(),
157
8
            // Note: Any time `digests_to_check` or `digests_to_check_idxs` is
158
8
            // modified we must notify the subscriber here.
159
8
            notify: Arc::new(Notify::new()),
160
8
            done: false,
161
8
        });
162
163
8
        let mut futures = action_result_digests
164
8
            .iter()
165
8
            .enumerate()
166
8
            .map(|(i, digest)| {
167
8
                async move {
168
                    // Note: We don't err_tip here because often have NotFound here which is ok.
169
8
                    let (action_result, size) = get_size_and_decode_digest::<ProtoActionResult>(
170
8
                        &self.ac_store,
171
8
                        digest.borrow(),
172
8
                    )
173
8
                    .await
?0
;
174
175
8
                    let (mut digest_infos, output_directories) =
176
8
                        get_digests_and_output_dirs(action_result)
?0
;
177
178
                    {
179
8
                        let mut state = state_mux.lock();
180
181
                        // We immediately set the size of the digest here. Later we will unset it if
182
                        // we find that the digest has missing outputs.
183
8
                        state.results[i] = Some(size);
184
8
                        let rep_len = digest_infos.len();
185
8
                        if state.digests_to_check.is_empty() {
  Branch (185:28): [True: 8, False: 0]
  Branch (185:28): [Folded - Ignored]
186
8
                            // Hot path: Most actions only have files and only one digest
187
8
                            // requested to be checked. So we can avoid the heap allocation
188
8
                            // by just swapping out our container's stack if our pending_digests
189
8
                            // is empty.
190
8
                            mem::swap(&mut state.digests_to_check, &mut digest_infos);
191
8
                        } else {
192
0
                            state.digests_to_check.extend(digest_infos);
193
0
                        }
194
8
                        state
195
8
                            .digests_to_check_idxs
196
8
                            .extend(iter::repeat_n(i, rep_len));
197
8
                        state.notify.notify_one();
198
                    }
199
200
                    // Hot path: It is very common for no output directories to be defined.
201
                    // So we can avoid any needless work by early returning.
202
8
                    if output_directories.is_empty() {
  Branch (202:24): [True: 0, False: 8]
  Branch (202:24): [Folded - Ignored]
203
0
                        return Ok(());
204
8
                    }
205
206
8
                    check_output_directories(
207
8
                        &self.cas_store,
208
8
                        output_directories,
209
8
                        &move |digest_infos| {
210
8
                            let mut state = state_mux.lock();
211
8
                            let rep_len = digest_infos.len();
212
8
                            state.digests_to_check.extend(digest_infos);
213
8
                            state
214
8
                                .digests_to_check_idxs
215
8
                                .extend(iter::repeat_n(i, rep_len));
216
8
                            state.notify.notify_one();
217
8
                        },
218
                    )
219
8
                    .await
?0
;
220
221
8
                    Result::<(), Error>::Ok(())
222
8
                }
223
                // Add a tip to the error to help with debugging and the index of the
224
                // digest that failed so we know which one to unset.
225
8
                .map_err(move |mut e| 
{0
226
0
                    if e.code != Code::NotFound {
  Branch (226:24): [True: 0, False: 0]
  Branch (226:24): [Folded - Ignored]
227
0
                        e = e.append(
228
0
                            format!("Error checking existence of digest ({digest}) in CompletenessCheckingStore::has"),
229
0
                        );
230
0
                    }
231
0
                    (e, i)
232
0
                })
233
8
            })
234
8
            .collect::<FuturesUnordered<_>>();
235
236
        // This future will wait for the notify to be notified and then
237
        // check the CAS store for the digest's existence.
238
        // For optimization reasons we only allow one outstanding call to
239
        // the underlying `has_with_results()` at a time. This is because
240
        // we want to give the ability for stores to batch requests together
241
        // whenever possible.
242
        // The most common case is only one notify will ever happen.
243
8
        let check_existence_fut = async {
244
8
            let mut has_results = vec![];
245
8
            let notify = state_mux.lock().notify.clone();
246
            loop {
247
23
                notify.notified().await;
248
15
                let (digests, indexes) = {
249
23
                    let mut state = state_mux.lock();
250
23
                    if state.done {
  Branch (250:24): [True: 10, False: 13]
  Branch (250:24): [Folded - Ignored]
251
10
                        if state.digests_to_check.is_empty() {
  Branch (251:28): [True: 8, False: 2]
  Branch (251:28): [Folded - Ignored]
252
8
                            break;
253
2
                        }
254
                        // Edge case: It is possible for our `digest_to_check` to have had
255
                        // data added, `notify_one` called, then immediately `done` set to
256
                        // true. We protect ourselves by checking if we have digests if done
257
                        // is set, and if we do, let ourselves know to run again, but continue
258
                        // processing the data.
259
2
                        notify.notify_one();
260
13
                    }
261
15
                    (
262
15
                        mem::take(&mut state.digests_to_check),
263
15
                        mem::take(&mut state.digests_to_check_idxs),
264
15
                    )
265
                };
266
15
                assert_eq!(
267
15
                    digests.len(),
268
15
                    indexes.len(),
269
0
                    "Expected sizes to match in CompletenessCheckingStore::has"
270
                );
271
272
                // Recycle our results vector to avoid needless allocations.
273
15
                has_results.clear();
274
15
                has_results.resize(digests.len(), None);
275
15
                self.cas_store
276
15
                    .has_with_results(&digests, &mut has_results[..])
277
15
                    .await
278
15
                    .err_tip(
279
                        || "Error calling has_with_results() inside CompletenessCheckingStore::has",
280
0
                    )?;
281
15
                let missed_indexes = has_results
282
15
                    .iter()
283
15
                    .zip(indexes)
284
40
                    .
filter_map15
(|(r, index)| r.map_or_else(|| Some(
index6
), |_| None));
285
                {
286
15
                    let mut state = state_mux.lock();
287
21
                    for 
index6
in missed_indexes {
288
6
                        state.results[index] = None;
289
6
                    }
290
                }
291
            }
292
8
            Result::<(), Error>::Ok(())
293
8
        }
294
8
        .fuse();
295
8
        tokio::pin!(check_existence_fut);
296
297
        loop {
298
            // Poll both futures at the same time.
299
16
            select! {
300
0
                r = check_existence_fut => {
301
0
                    return Err(make_err!(
302
0
                        Code::Internal,
303
0
                        "CompletenessCheckingStore's check_existence_fut ended unexpectedly {r:?}"
304
0
                    ));
305
                }
306
16
                maybe_result = futures.next() => {
307
8
                    match maybe_result {
308
8
                        Some(Ok(())) => self.complete_entries_counter.inc(),
309
0
                        Some(Err((err, i))) => {
310
0
                            self.incomplete_entries_counter.inc();
311
0
                            state_mux.lock().results[i] = None;
312
                            // Note: Don't return the errors. We just flag the result as
313
                            // missing but show a warning if it's not a NotFound.
314
0
                            if err.code != Code::NotFound {
  Branch (314:32): [True: 0, False: 0]
  Branch (314:32): [Folded - Ignored]
315
0
                                warn!(
316
                                    ?err,
317
0
                                    "Error checking existence of digest"
318
                                );
319
0
                            }
320
                        }
321
                        None => {
322
                            // We are done, so flag it done and ensure we notify the
323
                            // subscriber future.
324
8
                            {
325
8
                                let mut state = state_mux.lock();
326
8
                                state.done = true;
327
8
                                state.notify.notify_one();
328
8
                            }
329
8
                            check_existence_fut
330
8
                                .await
331
8
                                .err_tip(|| "CompletenessCheckingStore's check_existence_fut ended unexpectedly on last await")
?0
;
332
8
                            return Ok(());
333
                        }
334
                    }
335
                }
336
            }
337
        }
338
        // Unreachable.
339
8
    }
340
}
341
342
#[async_trait]
343
impl StoreDriver for CompletenessCheckingStore {
344
    async fn has_with_results(
345
        self: Pin<&Self>,
346
        keys: &[StoreKey<'_>],
347
        results: &mut [Option<u64>],
348
6
    ) -> Result<(), Error> {
349
        self.inner_has_with_results(keys, results).await
350
6
    }
351
352
    async fn update(
353
        self: Pin<&Self>,
354
        key: StoreKey<'_>,
355
        reader: DropCloserReadHalf,
356
        size_info: UploadSizeInfo,
357
8
    ) -> Result<(), Error> {
358
        self.ac_store.update(key, reader, size_info).await
359
8
    }
360
361
    async fn get_part(
362
        self: Pin<&Self>,
363
        key: StoreKey<'_>,
364
        writer: &mut DropCloserWriteHalf,
365
        offset: u64,
366
        length: Option<u64>,
367
2
    ) -> Result<(), Error> {
368
        let results = &mut [None];
369
        self.inner_has_with_results(&[key.borrow()], results)
370
            .await
371
            .err_tip(|| "when calling CompletenessCheckingStore::get_part")?;
372
        if results[0].is_none() {
373
            return Err(make_err!(
374
                Code::NotFound,
375
                "Digest found, but not all parts were found in CompletenessCheckingStore::get_part"
376
            ));
377
        }
378
        self.ac_store.get_part(key, writer, offset, length).await
379
2
    }
380
381
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
382
0
        self
383
0
    }
384
385
0
    fn as_any(&self) -> &(dyn core::any::Any + Sync + Send + 'static) {
386
0
        self
387
0
    }
388
389
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> {
390
0
        self
391
0
    }
392
393
0
    fn register_remove_callback(
394
0
        self: Arc<Self>,
395
0
        callback: Arc<dyn RemoveItemCallback>,
396
0
    ) -> Result<(), Error> {
397
0
        self.ac_store.register_remove_callback(callback.clone())?;
398
0
        self.cas_store.register_remove_callback(callback)?;
399
0
        Ok(())
400
0
    }
401
}
402
403
default_health_status_indicator!(CompletenessCheckingStore);