Coverage Report

Created: 2024-12-03 16:52

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/completeness_checking_store.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::pin::Pin;
16
use std::sync::Arc;
17
use std::{iter, mem};
18
19
use async_trait::async_trait;
20
use futures::stream::{FuturesUnordered, StreamExt};
21
use futures::{select, FutureExt, TryFutureExt};
22
use nativelink_error::{make_err, Code, Error, ResultExt};
23
use nativelink_metric::MetricsComponent;
24
use nativelink_proto::build::bazel::remote::execution::v2::{
25
    ActionResult as ProtoActionResult, OutputDirectory as ProtoOutputDirectory, Tree as ProtoTree,
26
};
27
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
28
use nativelink_util::common::DigestInfo;
29
use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator};
30
use nativelink_util::metrics_utils::CounterWithTime;
31
use nativelink_util::store_trait::{Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo};
32
use parking_lot::Mutex;
33
use tokio::sync::Notify;
34
use tracing::{event, Level};
35
36
use crate::ac_utils::{get_and_decode_digest, get_size_and_decode_digest};
37
38
/// Given a proto action result, return all relevant digests and
39
/// output directories that need to be checked.
40
8
fn get_digests_and_output_dirs(
41
8
    action_result: ProtoActionResult,
42
8
) -> Result<(Vec<StoreKey<'static>>, Vec<ProtoOutputDirectory>), Error> {
43
8
    // TODO(allada) When `try_collect()` is stable we can use it instead.
44
8
    let mut digest_iter = action_result
45
8
        .output_files
46
8
        .into_iter()
47
8
        .filter_map(|file| file.digest.map(DigestInfo::try_from))
48
8
        .chain(action_result.stdout_digest.map(DigestInfo::try_from))
49
8
        .chain(action_result.stderr_digest.map(DigestInfo::try_from));
50
8
    let mut digest_infos = Vec::with_capacity(digest_iter.size_hint().1.unwrap_or(0));
51
8
    digest_iter
52
24
        .try_for_each(|maybe_digest| {
53
24
            digest_infos.push(maybe_digest
?0
.into());
54
24
            Result::<_, Error>::Ok(())
55
24
        })
56
8
        .err_tip(|| 
"Some digests could not be converted to DigestInfos"0
)
?0
;
57
8
    Ok((digest_infos, action_result.output_directories))
58
8
}
59
60
/// Given a list of output directories recursively get all digests
61
/// that need to be checked and pass them into `handle_digest_infos_fn`
62
/// as they are found.
63
8
async fn check_output_directories<'a>(
64
8
    cas_store: &Store,
65
8
    output_directories: Vec<ProtoOutputDirectory>,
66
8
    handle_digest_infos_fn: &impl Fn(Vec<StoreKey<'a>>),
67
8
) -> Result<(), Error> {
68
8
    let mut futures = FuturesUnordered::new();
69
8
70
8
    let tree_digests = output_directories
71
8
        .into_iter()
72
8
        .filter_map(|output_dir| output_dir.tree_digest.map(DigestInfo::try_from));
73
16
    for 
maybe_tree_digest8
in tree_digests {
74
8
        let tree_digest = maybe_tree_digest
75
8
            .err_tip(|| 
"Could not decode tree digest CompletenessCheckingStore::has"0
)
?0
;
76
8
        futures.push(async move {
77
8
            let tree = get_and_decode_digest::<ProtoTree>(cas_store, tree_digest.into()).await
?0
;
78
            // TODO(allada) When `try_collect()` is stable we can use it instead.
79
            // https://github.com/rust-lang/rust/issues/94047
80
16
            let 
mut digest_iter = tree.children.into_iter().chain(tree.root).flat_map(8
|dir| {
81
16
                dir.files
82
16
                    .into_iter()
83
16
                    .filter_map(|f| f.digest.map(DigestInfo::try_from))
84
16
            });
85
8
86
8
            let mut digest_infos = Vec::with_capacity(digest_iter.size_hint().1.unwrap_or(0));
87
8
            digest_iter
88
16
                .try_for_each(|maybe_digest| {
89
16
                    digest_infos.push(maybe_digest
?0
.into());
90
16
                    Result::<_, Error>::Ok(())
91
16
                })
92
8
                .err_tip(|| 
"Expected digest to exist and be convertable"0
)
?0
;
93
8
            handle_digest_infos_fn(digest_infos);
94
8
            Ok(())
95
8
        });
96
8
    }
97
98
16
    while let Some(
result8
) = futures.next().await {
  Branch (98:15): [True: 8, False: 8]
  Branch (98:15): [Folded - Ignored]
99
8
        match result {
100
8
            Ok(()) => {}
101
0
            Err(e) => return Err(e),
102
        }
103
    }
104
8
    Ok(())
105
8
}
106
107
#[derive(MetricsComponent)]
108
pub struct CompletenessCheckingStore {
109
    cas_store: Store,
110
    ac_store: Store,
111
112
    #[metric(help = "Incomplete entries hit in CompletenessCheckingStore")]
113
    incomplete_entries_counter: CounterWithTime,
114
    #[metric(help = "Complete entries hit in CompletenessCheckingStore")]
115
    complete_entries_counter: CounterWithTime,
116
}
117
118
impl CompletenessCheckingStore {
119
8
    pub fn new(ac_store: Store, cas_store: Store) -> Arc<Self> {
120
8
        Arc::new(CompletenessCheckingStore {
121
8
            cas_store,
122
8
            ac_store,
123
8
            incomplete_entries_counter: CounterWithTime::default(),
124
8
            complete_entries_counter: CounterWithTime::default(),
125
8
        })
126
8
    }
127
128
    /// Check that all files and directories in action results
129
    /// exist in the CAS. Does this by decoding digests and
130
    /// checking their existence in two separate sets of futures that
131
    /// are polled concurrently.
132
8
    async fn inner_has_with_results(
133
8
        &self,
134
8
        action_result_digests: &[StoreKey<'_>],
135
8
        results: &mut [Option<u64>],
136
8
    ) -> Result<(), Error> {
137
        // Holds shared state between the different futures.
138
        // This is how get around lifetime issues.
139
        struct State<'a> {
140
            results: &'a mut [Option<u64>],
141
            digests_to_check: Vec<StoreKey<'a>>,
142
            digests_to_check_idxs: Vec<usize>,
143
            notify: Arc<Notify>,
144
            done: bool,
145
        }
146
        // Note: In theory Mutex is not needed, but lifetimes are
147
        // very tricky to get right here. Since we are using parking_lot
148
        // and we are guaranteed to never have lock collisions, it should
149
        // be nearly as fast as a few atomic operations.
150
8
        let state_mux = &Mutex::new(State {
151
8
            results,
152
8
            digests_to_check: Vec::new(),
153
8
            digests_to_check_idxs: Vec::new(),
154
8
            // Note: Any time `digests_to_check` or `digests_to_check_idxs` is
155
8
            // modified we must notify the subscriber here.
156
8
            notify: Arc::new(Notify::new()),
157
8
            done: false,
158
8
        });
159
8
160
8
        let mut futures = action_result_digests
161
8
            .iter()
162
8
            .enumerate()
163
8
            .map(|(i, digest)| {
164
8
                async move {
165
                    // Note: We don't err_tip here because often have NotFound here which is ok.
166
8
                    let (action_result, size) = get_size_and_decode_digest::<ProtoActionResult>(
167
8
                        &self.ac_store,
168
8
                        digest.borrow(),
169
8
                    )
170
8
                    .await
?0
;
171
172
8
                    let (mut digest_infos, output_directories) =
173
8
                        get_digests_and_output_dirs(action_result)
?0
;
174
175
                    {
176
8
                        let mut state = state_mux.lock();
177
8
178
8
                        // We immediately set the size of the digest here. Later we will unset it if
179
8
                        // we find that the digest has missing outputs.
180
8
                        state.results[i] = Some(size);
181
8
                        let rep_len = digest_infos.len();
182
8
                        if state.digests_to_check.is_empty() {
  Branch (182:28): [True: 8, False: 0]
  Branch (182:28): [Folded - Ignored]
183
8
                            // Hot path: Most actions only have files and only one digest
184
8
                            // requested to be checked. So we can avoid the heap allocation
185
8
                            // by just swapping out our container's stack if our pending_digests
186
8
                            // is empty.
187
8
                            mem::swap(&mut state.digests_to_check, &mut digest_infos);
188
8
                        } else {
189
0
                            state.digests_to_check.extend(digest_infos);
190
0
                        }
191
8
                        state
192
8
                            .digests_to_check_idxs
193
8
                            .extend(iter::repeat(i).take(rep_len));
194
8
                        state.notify.notify_one();
195
8
                    }
196
8
197
8
                    // Hot path: It is very common for no output directories to be defined.
198
8
                    // So we can avoid any needless work by early returning.
199
8
                    if output_directories.is_empty() {
  Branch (199:24): [True: 0, False: 8]
  Branch (199:24): [Folded - Ignored]
200
0
                        return Ok(());
201
8
                    }
202
8
203
8
                    check_output_directories(
204
8
                        &self.cas_store,
205
8
                        output_directories,
206
8
                        &move |digest_infos| {
207
8
                            let mut state = state_mux.lock();
208
8
                            let rep_len = digest_infos.len();
209
8
                            state.digests_to_check.extend(digest_infos);
210
8
                            state
211
8
                                .digests_to_check_idxs
212
8
                                .extend(iter::repeat(i).take(rep_len));
213
8
                            state.notify.notify_one();
214
8
                        },
215
8
                    )
216
8
                    .await
?0
;
217
218
8
                    Result::<(), Error>::Ok(())
219
8
                }
220
                // Add a tip to the error to help with debugging and the index of the
221
                // digest that failed so we know which one to unset.
222
8
                .map_err(move |mut e| {
223
0
                    if e.code != Code::NotFound {
  Branch (223:24): [True: 0, False: 0]
  Branch (223:24): [Folded - Ignored]
224
0
                        e = e.append(
225
0
                            "Error checking existence of digest in CompletenessCheckingStore::has",
226
0
                        );
227
0
                    }
228
0
                    (e, i)
229
8
                })
230
8
            })
231
8
            .collect::<FuturesUnordered<_>>();
232
8
233
8
        // This future will wait for the notify to be notified and then
234
8
        // check the CAS store for the digest's existence.
235
8
        // For optimization reasons we only allow one outstanding call to
236
8
        // the underlying `has_with_results()` at a time. This is because
237
8
        // we want to give the ability for stores to batch requests together
238
8
        // whenever possible.
239
8
        // The most common case is only one notify will ever happen.
240
8
        let check_existence_fut = async {
241
8
            let mut has_results = vec![];
242
8
            let notify = state_mux.lock().notify.clone();
243
            loop {
244
23
                notify.notified().await;
245
15
                let (digests, indexes) = {
246
23
                    let mut state = state_mux.lock();
247
23
                    if state.done {
  Branch (247:24): [True: 10, False: 13]
  Branch (247:24): [Folded - Ignored]
248
10
                        if state.digests_to_check.is_empty() {
  Branch (248:28): [True: 8, False: 2]
  Branch (248:28): [Folded - Ignored]
249
8
                            break;
250
2
                        }
251
2
                        // Edge case: It is possible for our `digest_to_check` to have had
252
2
                        // data added, `notify_one` called, then immediately `done` set to
253
2
                        // true. We protect ourselves by checking if we have digests if done
254
2
                        // is set, and if we do, let ourselves know to run again, but continue
255
2
                        // processing the data.
256
2
                        notify.notify_one();
257
13
                    }
258
15
                    (
259
15
                        mem::take(&mut state.digests_to_check),
260
15
                        mem::take(&mut state.digests_to_check_idxs),
261
15
                    )
262
15
                };
263
15
                assert_eq!(
264
15
                    digests.len(),
265
15
                    indexes.len(),
266
0
                    "Expected sizes to match in CompletenessCheckingStore::has"
267
                );
268
269
                // Recycle our results vector to avoid needless allocations.
270
15
                has_results.clear();
271
15
                has_results.resize(digests.len(), None);
272
15
                self.cas_store
273
15
                    .has_with_results(&digests, &mut has_results[..])
274
15
                    .await
275
15
                    .err_tip(|| {
276
0
                        "Error calling has_with_results() inside CompletenessCheckingStore::has"
277
15
                    })
?0
;
278
15
                let missed_indexes = has_results
279
15
                    .iter()
280
15
                    .zip(indexes)
281
40
                    .filter_map(|(r, index)| r.map_or_else(|| 
Some(index)6
, |_|
None34
));
282
15
                {
283
15
                    let mut state = state_mux.lock();
284
21
                    for 
index6
in missed_indexes {
285
6
                        state.results[index] = None;
286
6
                    }
287
                }
288
            }
289
8
            Result::<(), Error>::Ok(())
290
8
        }
291
8
        .fuse();
292
8
        tokio::pin!(check_existence_fut);
293
294
        loop {
295
            // Poll both futures at the same time.
296
16
            select! {
297
0
                r = check_existence_fut => {
298
0
                    return Err(make_err!(
299
0
                        Code::Internal,
300
0
                        "CompletenessCheckingStore's check_existence_fut ended unexpectedly {r:?}"
301
0
                    ));
302
                }
303
16
                maybe_result = futures.next() => {
304
8
                    match maybe_result {
305
8
                        Some(Ok(())) => self.complete_entries_counter.inc(),
306
0
                        Some(Err((err, i))) => {
307
0
                            self.incomplete_entries_counter.inc();
308
0
                            state_mux.lock().results[i] = None;
309
0
                            // Note: Don't return the errors. We just flag the result as
310
0
                            // missing but show a warning if it's not a NotFound.
311
0
                            if err.code != Code::NotFound {
  Branch (311:32): [True: 0, False: 0]
  Branch (311:32): [Folded - Ignored]
312
0
                                event!(
313
0
                                    Level::WARN,
314
                                    ?err,
315
0
                                    "Error checking existence of digest"
316
                                );
317
0
                            }
318
                        }
319
                        None => {
320
                            // We are done, so flag it done and ensure we notify the
321
                            // subscriber future.
322
8
                            {
323
8
                                let mut state = state_mux.lock();
324
8
                                state.done = true;
325
8
                                state.notify.notify_one();
326
8
                            }
327
8
                            check_existence_fut
328
8
                                .await
329
8
                                .err_tip(|| 
"CompletenessCheckingStore's check_existence_fut ended unexpectedly on last await"0
)
?0
;
330
8
                            return Ok(());
331
                        }
332
                    }
333
                }
334
            }
335
        }
336
        // Unreachable.
337
8
    }
338
}
339
340
#[async_trait]
341
impl StoreDriver for CompletenessCheckingStore {
342
    async fn has_with_results(
343
        self: Pin<&Self>,
344
        keys: &[StoreKey<'_>],
345
        results: &mut [Option<u64>],
346
6
    ) -> Result<(), Error> {
347
6
        self.inner_has_with_results(keys, results).await
348
12
    }
349
350
    async fn update(
351
        self: Pin<&Self>,
352
        key: StoreKey<'_>,
353
        reader: DropCloserReadHalf,
354
        size_info: UploadSizeInfo,
355
8
    ) -> Result<(), Error> {
356
8
        self.ac_store.update(key, reader, size_info).await
357
16
    }
358
359
    async fn get_part(
360
        self: Pin<&Self>,
361
        key: StoreKey<'_>,
362
        writer: &mut DropCloserWriteHalf,
363
        offset: u64,
364
        length: Option<u64>,
365
2
    ) -> Result<(), Error> {
366
2
        let results = &mut [None];
367
2
        self.inner_has_with_results(&[key.borrow()], results)
368
2
            .await
369
2
            .err_tip(|| 
"when calling CompletenessCheckingStore::get_part"0
)
?0
;
370
2
        if results[0].is_none() {
  Branch (370:12): [True: 1, False: 1]
  Branch (370:12): [Folded - Ignored]
371
1
            return Err(make_err!(
372
1
                Code::NotFound,
373
1
                "Digest found, but not all parts were found in CompletenessCheckingStore::get_part"
374
1
            ));
375
1
        }
376
1
        self.ac_store.get_part(key, writer, offset, length).await
377
4
    }
378
379
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
380
0
        self
381
0
    }
382
383
0
    fn as_any(&self) -> &(dyn std::any::Any + Sync + Send + 'static) {
384
0
        self
385
0
    }
386
387
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
388
0
        self
389
0
    }
390
}
391
392
default_health_status_indicator!(CompletenessCheckingStore);