Coverage Report

Created: 2025-05-30 16:37

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/verify_store.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::pin::Pin;
16
use std::sync::Arc;
17
18
use async_trait::async_trait;
19
use nativelink_config::stores::VerifySpec;
20
use nativelink_error::{Error, ResultExt, make_input_err};
21
use nativelink_metric::MetricsComponent;
22
use nativelink_util::buf_channel::{
23
    DropCloserReadHalf, DropCloserWriteHalf, make_buf_channel_pair,
24
};
25
use nativelink_util::common::PackedHash;
26
use nativelink_util::digest_hasher::{DigestHasher, DigestHasherFunc, default_digest_hasher_func};
27
use nativelink_util::health_utils::{HealthStatusIndicator, default_health_status_indicator};
28
use nativelink_util::metrics_utils::CounterWithTime;
29
use nativelink_util::store_trait::{Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo};
30
use opentelemetry::context::Context;
31
32
#[derive(Debug, MetricsComponent)]
33
pub struct VerifyStore {
34
    #[metric(group = "inner_store")]
35
    inner_store: Store,
36
    #[metric(help = "If the verification store is verifying the size of the data")]
37
    verify_size: bool,
38
    #[metric(help = "If the verification store is verifying the hash of the data")]
39
    verify_hash: bool,
40
41
    // Metrics.
42
    #[metric(help = "Number of failures the verification store had due to size mismatches")]
43
    size_verification_failures: CounterWithTime,
44
    #[metric(help = "Number of failures the verification store had due to hash mismatches")]
45
    hash_verification_failures: CounterWithTime,
46
}
47
48
impl VerifyStore {
49
10
    pub fn new(spec: &VerifySpec, inner_store: Store) -> Arc<Self> {
50
10
        Arc::new(Self {
51
10
            inner_store,
52
10
            verify_size: spec.verify_size,
53
10
            verify_hash: spec.verify_hash,
54
10
            size_verification_failures: CounterWithTime::default(),
55
10
            hash_verification_failures: CounterWithTime::default(),
56
10
        })
57
10
    }
58
59
10
    async fn inner_check_update<D: DigestHasher>(
60
10
        &self,
61
10
        mut tx: DropCloserWriteHalf,
62
10
        mut rx: DropCloserReadHalf,
63
10
        maybe_expected_digest_size: Option<u64>,
64
10
        original_hash: &PackedHash,
65
10
        mut maybe_hasher: Option<&mut D>,
66
10
    ) -> Result<(), Error> {
67
10
        let mut sum_size: u64 = 0;
68
        loop {
69
21
            let chunk = rx
70
21
                .recv()
71
21
                .await
72
21
                .err_tip(|| "Failed to read chunk in check_update in verify store")
?0
;
73
21
            sum_size += chunk.len() as u64;
74
75
            // Ensure if a user sends us too much data we fail quickly.
76
21
            if let Some(
expected_size11
) = maybe_expected_digest_size {
  Branch (76:20): [True: 11, False: 10]
  Branch (76:20): [Folded - Ignored]
77
11
                match sum_size.cmp(&expected_size) {
78
                    core::cmp::Ordering::Greater => {
79
1
                        self.size_verification_failures.inc();
80
1
                        return Err(make_input_err!(
81
1
                            "Expected size {} but already received {} on insert",
82
1
                            expected_size,
83
1
                            sum_size
84
1
                        ));
85
                    }
86
                    core::cmp::Ordering::Equal => {
87
                        // Ensure our next chunk is the EOF chunk.
88
                        // If this was an error it'll be caught on the .recv()
89
                        // on next cycle.
90
6
                        if let Ok(eof_chunk) = rx.peek().await {
  Branch (90:32): [True: 6, False: 0]
  Branch (90:32): [Folded - Ignored]
91
6
                            if !eof_chunk.is_empty() {
  Branch (91:32): [True: 0, False: 6]
  Branch (91:32): [Folded - Ignored]
92
0
                                self.size_verification_failures.inc();
93
0
                                return Err(make_input_err!(
94
0
                                    "Expected EOF chunk when exact size was hit on insert in verify store - {}",
95
0
                                    expected_size,
96
0
                                ));
97
6
                            }
98
0
                        }
99
                    }
100
4
                    core::cmp::Ordering::Less => {}
101
                }
102
10
            }
103
104
            // If is EOF.
105
20
            if chunk.is_empty() {
  Branch (105:16): [True: 9, False: 11]
  Branch (105:16): [Folded - Ignored]
106
9
                if let Some(
expected_size4
) = maybe_expected_digest_size {
  Branch (106:24): [True: 4, False: 5]
  Branch (106:24): [Folded - Ignored]
107
4
                    if sum_size != expected_size {
  Branch (107:24): [True: 1, False: 3]
  Branch (107:24): [Folded - Ignored]
108
1
                        self.size_verification_failures.inc();
109
1
                        return Err(make_input_err!(
110
1
                            "Expected size {} but got size {} on insert",
111
1
                            expected_size,
112
1
                            sum_size
113
1
                        ));
114
3
                    }
115
5
                }
116
8
                if let Some(
hasher5
) = maybe_hasher.as_mut() {
  Branch (116:24): [True: 5, False: 3]
  Branch (116:24): [Folded - Ignored]
117
5
                    let digest = hasher.finalize_digest();
118
5
                    let hash_result = digest.packed_hash();
119
5
                    if original_hash != hash_result {
  Branch (119:24): [True: 2, False: 3]
  Branch (119:24): [Folded - Ignored]
120
2
                        self.hash_verification_failures.inc();
121
2
                        return Err(make_input_err!(
122
2
                            "Hashes do not match, got: {original_hash} but digest hash was {hash_result}",
123
2
                        ));
124
3
                    }
125
3
                }
126
6
                tx.send_eof().err_tip(|| "In verify_store::check_update")
?0
;
127
6
                break;
128
11
            }
129
130
            // This will allows us to hash while sending data to another thread.
131
11
            let write_future = tx.send(chunk.clone());
132
133
11
            if let Some(
hasher5
) = maybe_hasher.as_mut() {
  Branch (133:20): [True: 5, False: 6]
  Branch (133:20): [Folded - Ignored]
134
5
                hasher.update(chunk.as_ref());
135
6
            }
136
137
11
            write_future
138
11
                .await
139
11
                .err_tip(|| "Failed to write chunk to inner store in verify store")
?0
;
140
        }
141
6
        Ok(())
142
10
    }
143
}
144
145
#[async_trait]
146
impl StoreDriver for VerifyStore {
147
    async fn has_with_results(
148
        self: Pin<&Self>,
149
        digests: &[StoreKey<'_>],
150
        results: &mut [Option<u64>],
151
0
    ) -> Result<(), Error> {
152
0
        self.inner_store.has_with_results(digests, results).await
153
0
    }
154
155
    async fn update(
156
        self: Pin<&Self>,
157
        key: StoreKey<'_>,
158
        reader: DropCloserReadHalf,
159
        size_info: UploadSizeInfo,
160
20
    ) -> Result<(), Error> {
161
10
        let StoreKey::Digest(digest) = key else {
  Branch (161:13): [True: 10, False: 0]
  Branch (161:13): [Folded - Ignored]
162
0
            return Err(make_input_err!(
163
0
                "Only digests are supported in VerifyStore. Got {key:?}"
164
0
            ));
165
        };
166
10
        let digest_size = digest.size_bytes();
167
10
        if let UploadSizeInfo::ExactSize(expected_size) = size_info {
  Branch (167:16): [True: 10, False: 0]
  Branch (167:16): [Folded - Ignored]
168
10
            if self.verify_size && 
expected_size != digest_size5
{
  Branch (168:16): [True: 5, False: 5]
  Branch (168:36): [True: 0, False: 5]
  Branch (168:16): [Folded - Ignored]
  Branch (168:36): [Folded - Ignored]
169
0
                self.size_verification_failures.inc();
170
0
                return Err(make_input_err!(
171
0
                    "Expected size to match. Got {} but digest says {} on update",
172
0
                    expected_size,
173
0
                    digest_size
174
0
                ));
175
10
            }
176
0
        }
177
178
10
        let mut hasher = if self.verify_hash {
  Branch (178:29): [True: 5, False: 5]
  Branch (178:29): [Folded - Ignored]
179
            Some(
180
5
                Context::current()
181
5
                    .get::<DigestHasherFunc>()
182
5
                    .map_or_else(default_digest_hasher_func, |v| *v)
183
5
                    .hasher(),
184
            )
185
        } else {
186
5
            None
187
        };
188
189
10
        let maybe_digest_size = if self.verify_size {
  Branch (189:36): [True: 5, False: 5]
  Branch (189:36): [Folded - Ignored]
190
5
            Some(digest_size)
191
        } else {
192
5
            None
193
        };
194
10
        let (tx, rx) = make_buf_channel_pair();
195
196
10
        let update_fut = self.inner_store.update(digest, rx, size_info);
197
10
        let check_fut = self.inner_check_update(
198
10
            tx,
199
10
            reader,
200
10
            maybe_digest_size,
201
10
            digest.packed_hash(),
202
10
            hasher.as_mut(),
203
10
        );
204
205
10
        let (update_res, check_res) = tokio::join!(update_fut, check_fut);
206
207
10
        update_res.merge(check_res)
208
20
    }
209
210
    async fn get_part(
211
        self: Pin<&Self>,
212
        key: StoreKey<'_>,
213
        writer: &mut DropCloserWriteHalf,
214
        offset: u64,
215
        length: Option<u64>,
216
0
    ) -> Result<(), Error> {
217
0
        self.inner_store.get_part(key, writer, offset, length).await
218
0
    }
219
220
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &'_ dyn StoreDriver {
221
0
        self
222
0
    }
223
224
0
    fn as_any<'a>(&'a self) -> &'a (dyn core::any::Any + Sync + Send + 'static) {
225
0
        self
226
0
    }
227
228
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> {
229
0
        self
230
0
    }
231
}
232
233
default_health_status_indicator!(VerifyStore);