Coverage Report

Created: 2026-03-30 18:04

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/verify_store.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::pin::Pin;
16
use std::sync::Arc;
17
18
use async_trait::async_trait;
19
use nativelink_config::stores::VerifySpec;
20
use nativelink_error::{Error, ResultExt, make_input_err};
21
use nativelink_metric::MetricsComponent;
22
use nativelink_util::buf_channel::{
23
    DropCloserReadHalf, DropCloserWriteHalf, make_buf_channel_pair,
24
};
25
use nativelink_util::common::PackedHash;
26
use nativelink_util::digest_hasher::{DigestHasher, DigestHasherFunc, default_digest_hasher_func};
27
use nativelink_util::health_utils::{HealthStatusIndicator, default_health_status_indicator};
28
use nativelink_util::metrics_utils::CounterWithTime;
29
use nativelink_util::store_trait::{
30
    RemoveItemCallback, Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo,
31
};
32
use opentelemetry::context::Context;
33
34
#[derive(Debug, MetricsComponent)]
35
pub struct VerifyStore {
36
    #[metric(group = "inner_store")]
37
    inner_store: Store,
38
    #[metric(help = "If the verification store is verifying the size of the data")]
39
    verify_size: bool,
40
    #[metric(help = "If the verification store is verifying the hash of the data")]
41
    verify_hash: bool,
42
43
    // Metrics.
44
    #[metric(help = "Number of failures the verification store had due to size mismatches")]
45
    size_verification_failures: CounterWithTime,
46
    #[metric(help = "Number of failures the verification store had due to hash mismatches")]
47
    hash_verification_failures: CounterWithTime,
48
}
49
50
impl VerifyStore {
51
10
    pub fn new(spec: &VerifySpec, inner_store: Store) -> Arc<Self> {
52
10
        Arc::new(Self {
53
10
            inner_store,
54
10
            verify_size: spec.verify_size,
55
10
            verify_hash: spec.verify_hash,
56
10
            size_verification_failures: CounterWithTime::default(),
57
10
            hash_verification_failures: CounterWithTime::default(),
58
10
        })
59
10
    }
60
61
10
    async fn inner_check_update<D: DigestHasher>(
62
10
        &self,
63
10
        mut tx: DropCloserWriteHalf,
64
10
        mut rx: DropCloserReadHalf,
65
10
        maybe_expected_digest_size: Option<u64>,
66
10
        original_hash: &PackedHash,
67
10
        mut maybe_hasher: Option<&mut D>,
68
10
    ) -> Result<(), Error> {
69
10
        let mut sum_size: u64 = 0;
70
        loop {
71
21
            let chunk = rx
72
21
                .recv()
73
21
                .await
74
21
                .err_tip(|| "Failed to read chunk in check_update in verify store")
?0
;
75
21
            sum_size += chunk.len() as u64;
76
77
            // Ensure if a user sends us too much data we fail quickly.
78
21
            if let Some(
expected_size11
) = maybe_expected_digest_size {
79
11
                match sum_size.cmp(&expected_size) {
80
                    core::cmp::Ordering::Greater => {
81
1
                        self.size_verification_failures.inc();
82
1
                        return Err(make_input_err!(
83
1
                            "Expected size {} but already received {} on insert",
84
1
                            expected_size,
85
1
                            sum_size
86
1
                        ));
87
                    }
88
                    core::cmp::Ordering::Equal => {
89
                        // Ensure our next chunk is the EOF chunk.
90
                        // If this was an error it'll be caught on the .recv()
91
                        // on next cycle.
92
6
                        if let Ok(eof_chunk) = rx.peek().await
93
6
                            && !eof_chunk.is_empty()
94
                        {
95
0
                            self.size_verification_failures.inc();
96
0
                            return Err(make_input_err!(
97
0
                                "Expected EOF chunk when exact size was hit on insert in verify store - {}",
98
0
                                expected_size,
99
0
                            ));
100
6
                        }
101
                    }
102
4
                    core::cmp::Ordering::Less => {}
103
                }
104
10
            }
105
106
            // If is EOF.
107
20
            if chunk.is_empty() {
108
9
                if let Some(
expected_size4
) = maybe_expected_digest_size
109
4
                    && sum_size != expected_size
110
                {
111
1
                    self.size_verification_failures.inc();
112
1
                    return Err(make_input_err!(
113
1
                        "Expected size {} but got size {} on insert",
114
1
                        expected_size,
115
1
                        sum_size
116
1
                    ));
117
8
                }
118
8
                if let Some(
hasher5
) = maybe_hasher.as_mut() {
119
5
                    let digest = hasher.finalize_digest();
120
5
                    let hash_result = digest.packed_hash();
121
5
                    if original_hash != hash_result {
122
2
                        self.hash_verification_failures.inc();
123
2
                        return Err(make_input_err!(
124
2
                            "Hashes do not match, got: {original_hash} but digest hash was {hash_result}",
125
2
                        ));
126
3
                    }
127
3
                }
128
6
                tx.send_eof().err_tip(|| "In verify_store::check_update")
?0
;
129
6
                break;
130
11
            }
131
132
            // This will allows us to hash while sending data to another thread.
133
11
            let write_future = tx.send(chunk.clone());
134
135
11
            if let Some(
hasher5
) = maybe_hasher.as_mut() {
136
5
                hasher.update(chunk.as_ref());
137
6
            }
138
139
11
            write_future
140
11
                .await
141
11
                .err_tip(|| "Failed to write chunk to inner store in verify store")
?0
;
142
        }
143
6
        Ok(())
144
10
    }
145
}
146
147
#[async_trait]
148
impl StoreDriver for VerifyStore {
149
    async fn has_with_results(
150
        self: Pin<&Self>,
151
        digests: &[StoreKey<'_>],
152
        results: &mut [Option<u64>],
153
0
    ) -> Result<(), Error> {
154
        self.inner_store.has_with_results(digests, results).await
155
0
    }
156
157
    async fn update(
158
        self: Pin<&Self>,
159
        key: StoreKey<'_>,
160
        reader: DropCloserReadHalf,
161
        size_info: UploadSizeInfo,
162
10
    ) -> Result<(), Error> {
163
        let StoreKey::Digest(digest) = key else {
164
            return Err(make_input_err!(
165
                "Only digests are supported in VerifyStore. Got {key:?}"
166
            ));
167
        };
168
        let digest_size = digest.size_bytes();
169
        if let UploadSizeInfo::ExactSize(expected_size) = size_info
170
            && self.verify_size
171
            && expected_size != digest_size
172
        {
173
            self.size_verification_failures.inc();
174
            return Err(make_input_err!(
175
                "Expected size to match. Got {} but digest says {} on update",
176
                expected_size,
177
                digest_size
178
            ));
179
        }
180
181
        let mut hasher = if self.verify_hash {
182
            Some(
183
                Context::current()
184
                    .get::<DigestHasherFunc>()
185
                    .map_or_else(default_digest_hasher_func, |v| *v)
186
                    .hasher(),
187
            )
188
        } else {
189
            None
190
        };
191
192
        let maybe_digest_size = if self.verify_size {
193
            Some(digest_size)
194
        } else {
195
            None
196
        };
197
        let (tx, rx) = make_buf_channel_pair();
198
199
        let update_fut = self.inner_store.update(digest, rx, size_info);
200
        let check_fut = self.inner_check_update(
201
            tx,
202
            reader,
203
            maybe_digest_size,
204
            digest.packed_hash(),
205
            hasher.as_mut(),
206
        );
207
208
        let (update_res, check_res) = tokio::join!(update_fut, check_fut);
209
210
        update_res.merge(check_res)
211
10
    }
212
213
    async fn get_part(
214
        self: Pin<&Self>,
215
        key: StoreKey<'_>,
216
        writer: &mut DropCloserWriteHalf,
217
        offset: u64,
218
        length: Option<u64>,
219
0
    ) -> Result<(), Error> {
220
        self.inner_store.get_part(key, writer, offset, length).await
221
0
    }
222
223
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &'_ dyn StoreDriver {
224
0
        self
225
0
    }
226
227
0
    fn as_any<'a>(&'a self) -> &'a (dyn core::any::Any + Sync + Send + 'static) {
228
0
        self
229
0
    }
230
231
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> {
232
0
        self
233
0
    }
234
235
0
    fn register_remove_callback(
236
0
        self: Arc<Self>,
237
0
        callback: Arc<dyn RemoveItemCallback>,
238
0
    ) -> Result<(), Error> {
239
0
        self.inner_store.register_remove_callback(callback)
240
0
    }
241
}
242
243
default_health_status_indicator!(VerifyStore);