Coverage Report

Created: 2024-12-19 04:04

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/verify_store.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::pin::Pin;
16
use std::sync::Arc;
17
18
use async_trait::async_trait;
19
use nativelink_config::stores::VerifySpec;
20
use nativelink_error::{make_input_err, Error, ResultExt};
21
use nativelink_metric::MetricsComponent;
22
use nativelink_util::buf_channel::{
23
    make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf,
24
};
25
use nativelink_util::common::PackedHash;
26
use nativelink_util::digest_hasher::{
27
    default_digest_hasher_func, DigestHasher, ACTIVE_HASHER_FUNC,
28
};
29
use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator};
30
use nativelink_util::metrics_utils::CounterWithTime;
31
use nativelink_util::origin_context::ActiveOriginContext;
32
use nativelink_util::store_trait::{Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo};
33
34
#[derive(MetricsComponent)]
35
pub struct VerifyStore {
36
    #[metric(group = "inner_store")]
37
    inner_store: Store,
38
    #[metric(help = "If the verification store is verifying the size of the data")]
39
    verify_size: bool,
40
    #[metric(help = "If the verification store is verifying the hash of the data")]
41
    verify_hash: bool,
42
43
    // Metrics.
44
    #[metric(help = "Number of failures the verification store had due to size mismatches")]
45
    size_verification_failures: CounterWithTime,
46
    #[metric(help = "Number of failures the verification store had due to hash mismatches")]
47
    hash_verification_failures: CounterWithTime,
48
}
49
50
impl VerifyStore {
51
10
    pub fn new(spec: &VerifySpec, inner_store: Store) -> Arc<Self> {
52
10
        Arc::new(VerifyStore {
53
10
            inner_store,
54
10
            verify_size: spec.verify_size,
55
10
            verify_hash: spec.verify_hash,
56
10
            size_verification_failures: CounterWithTime::default(),
57
10
            hash_verification_failures: CounterWithTime::default(),
58
10
        })
59
10
    }
60
61
10
    async fn inner_check_update<D: DigestHasher>(
62
10
        &self,
63
10
        mut tx: DropCloserWriteHalf,
64
10
        mut rx: DropCloserReadHalf,
65
10
        maybe_expected_digest_size: Option<u64>,
66
10
        original_hash: &PackedHash,
67
10
        mut maybe_hasher: Option<&mut D>,
68
10
    ) -> Result<(), Error> {
69
10
        let mut sum_size: u64 = 0;
70
        loop {
71
21
            let chunk = rx
72
21
                .recv()
73
21
                .await
74
21
                .err_tip(|| 
"Failed to read chunk in check_update in verify store"0
)
?0
;
75
21
            sum_size += chunk.len() as u64;
76
77
            // Ensure if a user sends us too much data we fail quickly.
78
21
            if let Some(
expected_size11
) = maybe_expected_digest_size {
  Branch (78:20): [True: 11, False: 10]
  Branch (78:20): [Folded - Ignored]
79
11
                match sum_size.cmp(&expected_size) {
80
                    std::cmp::Ordering::Greater => {
81
1
                        self.size_verification_failures.inc();
82
1
                        return Err(make_input_err!(
83
1
                            "Expected size {} but already received {} on insert",
84
1
                            expected_size,
85
1
                            sum_size
86
1
                        ));
87
                    }
88
                    std::cmp::Ordering::Equal => {
89
                        // Ensure our next chunk is the EOF chunk.
90
                        // If this was an error it'll be caught on the .recv()
91
                        // on next cycle.
92
6
                        if let Ok(eof_chunk) = rx.peek().await {
  Branch (92:32): [True: 6, False: 0]
  Branch (92:32): [Folded - Ignored]
93
6
                            if !eof_chunk.is_empty() {
  Branch (93:32): [True: 0, False: 6]
  Branch (93:32): [Folded - Ignored]
94
0
                                self.size_verification_failures.inc();
95
0
                                return Err(make_input_err!(
96
0
                                    "Expected EOF chunk when exact size was hit on insert in verify store - {}",
97
0
                                    expected_size,
98
0
                                ));
99
6
                            }
100
0
                        }
101
                    }
102
4
                    std::cmp::Ordering::Less => {}
103
                }
104
10
            }
105
106
            // If is EOF.
107
20
            if chunk.is_empty() {
  Branch (107:16): [True: 9, False: 11]
  Branch (107:16): [Folded - Ignored]
108
9
                if let Some(
expected_size4
) = maybe_expected_digest_size {
  Branch (108:24): [True: 4, False: 5]
  Branch (108:24): [Folded - Ignored]
109
4
                    if sum_size != expected_size {
  Branch (109:24): [True: 1, False: 3]
  Branch (109:24): [Folded - Ignored]
110
1
                        self.size_verification_failures.inc();
111
1
                        return Err(make_input_err!(
112
1
                            "Expected size {} but got size {} on insert",
113
1
                            expected_size,
114
1
                            sum_size
115
1
                        ));
116
3
                    }
117
5
                }
118
8
                if let Some(
hasher5
) = maybe_hasher.as_mut() {
  Branch (118:24): [True: 5, False: 3]
  Branch (118:24): [Folded - Ignored]
119
5
                    let digest = hasher.finalize_digest();
120
5
                    let hash_result = digest.packed_hash();
121
5
                    if original_hash != hash_result {
  Branch (121:24): [True: 2, False: 3]
  Branch (121:24): [Folded - Ignored]
122
2
                        self.hash_verification_failures.inc();
123
2
                        return Err(make_input_err!(
124
2
                            "Hashes do not match, got: {original_hash} but digest hash was {hash_result}",
125
2
                        ));
126
3
                    }
127
3
                }
128
6
                tx.send_eof().err_tip(|| 
"In verify_store::check_update"0
)
?0
;
129
6
                break;
130
11
            }
131
11
132
11
            // This will allows us to hash while sending data to another thread.
133
11
            let write_future = tx.send(chunk.clone());
134
135
11
            if let Some(
hasher5
) = maybe_hasher.as_mut() {
  Branch (135:20): [True: 5, False: 6]
  Branch (135:20): [Folded - Ignored]
136
5
                hasher.update(chunk.as_ref());
137
6
            }
138
139
11
            write_future
140
11
                .await
141
11
                .err_tip(|| 
"Failed to write chunk to inner store in verify store"0
)
?0
;
142
        }
143
6
        Ok(())
144
10
    }
145
}
146
147
#[async_trait]
148
impl StoreDriver for VerifyStore {
149
    async fn has_with_results(
150
        self: Pin<&Self>,
151
        digests: &[StoreKey<'_>],
152
        results: &mut [Option<u64>],
153
0
    ) -> Result<(), Error> {
154
0
        self.inner_store.has_with_results(digests, results).await
155
0
    }
156
157
    async fn update(
158
        self: Pin<&Self>,
159
        key: StoreKey<'_>,
160
        reader: DropCloserReadHalf,
161
        size_info: UploadSizeInfo,
162
10
    ) -> Result<(), Error> {
163
10
        let StoreKey::Digest(digest) = key else {
  Branch (163:13): [True: 10, False: 0]
  Branch (163:13): [Folded - Ignored]
164
0
            return Err(make_input_err!(
165
0
                "Only digests are supported in VerifyStore. Got {key:?}"
166
0
            ));
167
        };
168
10
        let digest_size = digest.size_bytes();
169
10
        if let UploadSizeInfo::ExactSize(expected_size) = size_info {
  Branch (169:16): [True: 10, False: 0]
  Branch (169:16): [Folded - Ignored]
170
10
            if self.verify_size && 
expected_size != digest_size5
{
  Branch (170:16): [True: 5, False: 5]
  Branch (170:36): [True: 0, False: 5]
  Branch (170:16): [Folded - Ignored]
  Branch (170:36): [Folded - Ignored]
171
0
                self.size_verification_failures.inc();
172
0
                return Err(make_input_err!(
173
0
                    "Expected size to match. Got {} but digest says {} on update",
174
0
                    expected_size,
175
0
                    digest_size
176
0
                ));
177
10
            }
178
0
        }
179
180
10
        let mut hasher = if self.verify_hash {
  Branch (180:29): [True: 5, False: 5]
  Branch (180:29): [Folded - Ignored]
181
            Some(
182
5
                ActiveOriginContext::get_value(&ACTIVE_HASHER_FUNC)
183
5
                    .err_tip(|| 
"In verify_store::update"0
)
?0
184
5
                    .map_or_else(default_digest_hasher_func, |v| 
*v2
)
185
5
                    .hasher(),
186
5
            )
187
        } else {
188
5
            None
189
        };
190
191
10
        let maybe_digest_size = if self.verify_size {
  Branch (191:36): [True: 5, False: 5]
  Branch (191:36): [Folded - Ignored]
192
5
            Some(digest_size)
193
        } else {
194
5
            None
195
        };
196
10
        let (tx, rx) = make_buf_channel_pair();
197
10
198
10
        let update_fut = self.inner_store.update(digest, rx, size_info);
199
10
        let check_fut = self.inner_check_update(
200
10
            tx,
201
10
            reader,
202
10
            maybe_digest_size,
203
10
            digest.packed_hash(),
204
10
            hasher.as_mut(),
205
10
        );
206
207
10
        let (update_res, check_res) = tokio::join!(update_fut, check_fut);
208
209
10
        update_res.merge(check_res)
210
20
    }
211
212
    async fn get_part(
213
        self: Pin<&Self>,
214
        key: StoreKey<'_>,
215
        writer: &mut DropCloserWriteHalf,
216
        offset: u64,
217
        length: Option<u64>,
218
0
    ) -> Result<(), Error> {
219
0
        self.inner_store.get_part(key, writer, offset, length).await
220
0
    }
221
222
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &'_ dyn StoreDriver {
223
0
        self
224
0
    }
225
226
0
    fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) {
227
0
        self
228
0
    }
229
230
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
231
0
        self
232
0
    }
233
}
234
235
default_health_status_indicator!(VerifyStore);