Coverage Report

Created: 2024-10-22 12:33

/build/source/nativelink-store/src/verify_store.rs
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::pin::Pin;
16
use std::sync::Arc;
17
18
use async_trait::async_trait;
19
use nativelink_error::{make_input_err, Error, ResultExt};
20
use nativelink_metric::MetricsComponent;
21
use nativelink_util::buf_channel::{
22
    make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf,
23
};
24
use nativelink_util::common::PackedHash;
25
use nativelink_util::digest_hasher::{
26
    default_digest_hasher_func, DigestHasher, ACTIVE_HASHER_FUNC,
27
};
28
use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator};
29
use nativelink_util::metrics_utils::CounterWithTime;
30
use nativelink_util::origin_context::ActiveOriginContext;
31
use nativelink_util::store_trait::{Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo};
32
33
0
#[derive(MetricsComponent)]
34
pub struct VerifyStore {
35
    #[metric(group = "inner_store")]
36
    inner_store: Store,
37
    #[metric(help = "If the verification store is verifying the size of the data")]
38
    verify_size: bool,
39
    #[metric(help = "If the verification store is verifying the hash of the data")]
40
    verify_hash: bool,
41
42
    // Metrics.
43
    #[metric(help = "Number of failures the verification store had due to size mismatches")]
44
    size_verification_failures: CounterWithTime,
45
    #[metric(help = "Number of failures the verification store had due to hash mismatches")]
46
    hash_verification_failures: CounterWithTime,
47
}
48
49
impl VerifyStore {
50
10
    pub fn new(config: &nativelink_config::stores::VerifyStore, inner_store: Store) -> Arc<Self> {
51
10
        Arc::new(VerifyStore {
52
10
            inner_store,
53
10
            verify_size: config.verify_size,
54
10
            verify_hash: config.verify_hash,
55
10
            size_verification_failures: CounterWithTime::default(),
56
10
            hash_verification_failures: CounterWithTime::default(),
57
10
        })
58
10
    }
59
60
10
    async fn inner_check_update<D: DigestHasher>(
61
10
        &self,
62
10
        mut tx: DropCloserWriteHalf,
63
10
        mut rx: DropCloserReadHalf,
64
10
        maybe_expected_digest_size: Option<u64>,
65
10
        original_hash: &PackedHash,
66
10
        mut maybe_hasher: Option<&mut D>,
67
10
    ) -> Result<(), Error> {
68
10
        let mut sum_size: u64 = 0;
69
        loop {
70
21
            let chunk = rx
71
21
                .recv()
72
0
                .await
73
21
                .err_tip(|| 
"Failed to read chunk in check_update in verify store"0
)
?0
;
74
21
            sum_size += chunk.len() as u64;
75
76
            // Ensure if a user sends us too much data we fail quickly.
77
21
            if let Some(
expected_size11
) = maybe_expected_digest_size {
  Branch (77:20): [True: 11, False: 10]
  Branch (77:20): [Folded - Ignored]
78
11
                match sum_size.cmp(&expected_size) {
79
                    std::cmp::Ordering::Greater => {
80
1
                        self.size_verification_failures.inc();
81
1
                        return Err(make_input_err!(
82
1
                            "Expected size {} but already received {} on insert",
83
1
                            expected_size,
84
1
                            sum_size
85
1
                        ));
86
                    }
87
                    std::cmp::Ordering::Equal => {
88
                        // Ensure our next chunk is the EOF chunk.
89
                        // If this was an error it'll be caught on the .recv()
90
                        // on next cycle.
91
6
                        if let Ok(eof_chunk) = rx.peek().
await0
{
  Branch (91:32): [True: 6, False: 0]
  Branch (91:32): [Folded - Ignored]
92
6
                            if !eof_chunk.is_empty() {
  Branch (92:32): [True: 0, False: 6]
  Branch (92:32): [Folded - Ignored]
93
0
                                self.size_verification_failures.inc();
94
0
                                return Err(make_input_err!(
95
0
                                    "Expected EOF chunk when exact size was hit on insert in verify store - {}",
96
0
                                    expected_size,
97
0
                                ));
98
6
                            }
99
0
                        }
100
                    }
101
4
                    std::cmp::Ordering::Less => {}
102
                }
103
10
            }
104
105
            // If is EOF.
106
20
            if chunk.is_empty() {
  Branch (106:16): [True: 9, False: 11]
  Branch (106:16): [Folded - Ignored]
107
9
                if let Some(
expected_size4
) = maybe_expected_digest_size {
  Branch (107:24): [True: 4, False: 5]
  Branch (107:24): [Folded - Ignored]
108
4
                    if sum_size != expected_size {
  Branch (108:24): [True: 1, False: 3]
  Branch (108:24): [Folded - Ignored]
109
1
                        self.size_verification_failures.inc();
110
1
                        return Err(make_input_err!(
111
1
                            "Expected size {} but got size {} on insert",
112
1
                            expected_size,
113
1
                            sum_size
114
1
                        ));
115
3
                    }
116
5
                }
117
8
                if let Some(
hasher5
) = maybe_hasher.as_mut() {
  Branch (117:24): [True: 5, False: 3]
  Branch (117:24): [Folded - Ignored]
118
5
                    let digest = hasher.finalize_digest();
119
5
                    let hash_result = digest.packed_hash();
120
5
                    if original_hash != hash_result {
  Branch (120:24): [True: 2, False: 3]
  Branch (120:24): [Folded - Ignored]
121
2
                        self.hash_verification_failures.inc();
122
2
                        return Err(make_input_err!(
123
2
                            "Hashes do not match, got: {original_hash} but digest hash was {hash_result}",
124
2
                        ));
125
3
                    }
126
3
                }
127
6
                tx.send_eof().err_tip(|| 
"In verify_store::check_update"0
)
?0
;
128
6
                break;
129
11
            }
130
11
131
11
            // This will allows us to hash while sending data to another thread.
132
11
            let write_future = tx.send(chunk.clone());
133
134
11
            if let Some(
hasher5
) = maybe_hasher.as_mut() {
  Branch (134:20): [True: 5, False: 6]
  Branch (134:20): [Folded - Ignored]
135
5
                hasher.update(chunk.as_ref());
136
6
            }
137
138
11
            write_future
139
0
                .await
140
11
                .err_tip(|| 
"Failed to write chunk to inner store in verify store"0
)
?0
;
141
        }
142
6
        Ok(())
143
10
    }
144
}
145
146
#[async_trait]
147
impl StoreDriver for VerifyStore {
148
    async fn has_with_results(
149
        self: Pin<&Self>,
150
        digests: &[StoreKey<'_>],
151
        results: &mut [Option<u64>],
152
0
    ) -> Result<(), Error> {
153
0
        self.inner_store.has_with_results(digests, results).await
154
0
    }
155
156
    async fn update(
157
        self: Pin<&Self>,
158
        key: StoreKey<'_>,
159
        reader: DropCloserReadHalf,
160
        size_info: UploadSizeInfo,
161
10
    ) -> Result<(), Error> {
162
10
        let StoreKey::Digest(digest) = key else {
  Branch (162:13): [True: 10, False: 0]
  Branch (162:13): [Folded - Ignored]
163
0
            return Err(make_input_err!(
164
0
                "Only digests are supported in VerifyStore. Got {key:?}"
165
0
            ));
166
        };
167
10
        let digest_size = digest.size_bytes();
168
10
        if let UploadSizeInfo::ExactSize(expected_size) = size_info {
  Branch (168:16): [True: 10, False: 0]
  Branch (168:16): [Folded - Ignored]
169
10
            if self.verify_size && 
expected_size != digest_size5
{
  Branch (169:16): [True: 5, False: 5]
  Branch (169:36): [True: 0, False: 5]
  Branch (169:16): [Folded - Ignored]
  Branch (169:36): [Folded - Ignored]
170
0
                self.size_verification_failures.inc();
171
0
                return Err(make_input_err!(
172
0
                    "Expected size to match. Got {} but digest says {} on update",
173
0
                    expected_size,
174
0
                    digest_size
175
0
                ));
176
10
            }
177
0
        }
178
179
10
        let mut hasher = if self.verify_hash {
  Branch (179:29): [True: 5, False: 5]
  Branch (179:29): [Folded - Ignored]
180
            Some(
181
5
                ActiveOriginContext::get_value(&ACTIVE_HASHER_FUNC)
182
5
                    .err_tip(|| 
"In verify_store::update"0
)
?0
183
5
                    .map_or_else(default_digest_hasher_func, |v| 
*v2
)
184
5
                    .hasher(),
185
5
            )
186
        } else {
187
5
            None
188
        };
189
190
10
        let maybe_digest_size = if self.verify_size {
  Branch (190:36): [True: 5, False: 5]
  Branch (190:36): [Folded - Ignored]
191
5
            Some(digest_size)
192
        } else {
193
5
            None
194
        };
195
10
        let (tx, rx) = make_buf_channel_pair();
196
10
197
10
        let update_fut = self.inner_store.update(digest, rx, size_info);
198
10
        let check_fut = self.inner_check_update(
199
10
            tx,
200
10
            reader,
201
10
            maybe_digest_size,
202
10
            digest.packed_hash(),
203
10
            hasher.as_mut(),
204
10
        );
205
206
10
        let (update_res, check_res) = tokio::join!(update_fut, check_fut);
207
208
10
        update_res.merge(check_res)
209
20
    }
210
211
    async fn get_part(
212
        self: Pin<&Self>,
213
        key: StoreKey<'_>,
214
        writer: &mut DropCloserWriteHalf,
215
        offset: u64,
216
        length: Option<u64>,
217
0
    ) -> Result<(), Error> {
218
0
        self.inner_store.get_part(key, writer, offset, length).await
219
0
    }
220
221
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &'_ dyn StoreDriver {
222
0
        self
223
0
    }
224
225
0
    fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) {
226
0
        self
227
0
    }
228
229
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
230
0
        self
231
0
    }
232
}
233
234
default_health_status_indicator!(VerifyStore);