Coverage Report

Created: 2025-04-19 16:54

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/digest_hasher.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::sync::{Arc, OnceLock};
16
17
use blake3::Hasher as Blake3Hasher;
18
use bytes::BytesMut;
19
use futures::Future;
20
use nativelink_config::stores::ConfigDigestHashFunction;
21
use nativelink_error::{Code, Error, ResultExt, make_err, make_input_err};
22
use nativelink_metric::{
23
    MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent,
24
};
25
use nativelink_proto::build::bazel::remote::execution::v2::digest_function::Value as ProtoDigestFunction;
26
use serde::{Deserialize, Serialize};
27
use sha2::{Digest, Sha256};
28
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt};
29
30
use crate::common::DigestInfo;
31
use crate::origin_context::{ActiveOriginContext, OriginContext};
32
use crate::{fs, spawn_blocking, unsafe_make_symbol};
33
34
// The symbol can be used to retrieve the active hasher function.
35
// from an `OriginContext`.
36
// Safety: There is no other symbol named `ACTIVE_HASHER_FUNC`.
37
unsafe_make_symbol!(ACTIVE_HASHER_FUNC, DigestHasherFunc);
38
39
static DEFAULT_DIGEST_HASHER_FUNC: OnceLock<DigestHasherFunc> = OnceLock::new();
40
41
/// Utility function to make a context with a specific hasher function set.
42
38
pub fn make_ctx_for_hash_func<H>(hasher: H) -> Result<Arc<OriginContext>, Error>
43
38
where
44
38
    H: TryInto<DigestHasherFunc>,
45
38
    H::Error: Into<Error>,
46
38
{
47
38
    let digest_hasher_func = hasher
48
38
        .try_into()
49
38
        .err_tip(|| 
"Could not convert into DigestHasherFunc"0
)
?0
;
50
51
38
    let mut new_ctx = ActiveOriginContext::fork().err_tip(|| 
"In BytestreamServer::inner_write"0
)
?0
;
52
38
    new_ctx.set_value(&ACTIVE_HASHER_FUNC, Arc::new(digest_hasher_func));
53
38
    Ok(Arc::new(new_ctx))
54
38
}
55
56
/// Get the default hasher.
57
38
pub fn default_digest_hasher_func() -> DigestHasherFunc {
58
38
    *DEFAULT_DIGEST_HASHER_FUNC.get_or_init(|| 
DigestHasherFunc::Sha2565
)
59
38
}
60
61
/// Sets the default hasher to use if no hasher was requested by the client.
62
0
pub fn set_default_digest_hasher_func(hasher: DigestHasherFunc) -> Result<(), Error> {
63
0
    DEFAULT_DIGEST_HASHER_FUNC
64
0
        .set(hasher)
65
0
        .map_err(|_| make_err!(Code::Internal, "default_digest_hasher_func already set"))
66
0
}
67
68
/// Supported digest hash functions.
69
#[derive(Copy, Clone, Debug, Ord, PartialOrd, Eq, PartialEq, Hash, Serialize, Deserialize)]
70
pub enum DigestHasherFunc {
71
    Sha256,
72
    Blake3,
73
}
74
75
impl MetricsComponent for DigestHasherFunc {
76
0
    fn publish(
77
0
        &self,
78
0
        kind: MetricKind,
79
0
        field_metadata: MetricFieldData,
80
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
81
0
        format!("{self:?}").publish(kind, field_metadata)
82
0
    }
83
}
84
85
impl DigestHasherFunc {
86
5.12k
    pub fn hasher(&self) -> DigestHasherImpl {
87
5.12k
        self.into()
88
5.12k
    }
89
90
    #[must_use]
91
38
    pub const fn proto_digest_func(&self) -> ProtoDigestFunction {
92
38
        match self {
93
36
            Self::Sha256 => ProtoDigestFunction::Sha256,
94
2
            Self::Blake3 => ProtoDigestFunction::Blake3,
95
        }
96
38
    }
97
}
98
99
impl From<ConfigDigestHashFunction> for DigestHasherFunc {
100
0
    fn from(value: ConfigDigestHashFunction) -> Self {
101
0
        match value {
102
0
            ConfigDigestHashFunction::Sha256 => Self::Sha256,
103
0
            ConfigDigestHashFunction::Blake3 => Self::Blake3,
104
        }
105
0
    }
106
}
107
108
impl TryFrom<ProtoDigestFunction> for DigestHasherFunc {
109
    type Error = Error;
110
111
0
    fn try_from(value: ProtoDigestFunction) -> Result<Self, Self::Error> {
112
0
        match value {
113
0
            ProtoDigestFunction::Sha256 => Ok(Self::Sha256),
114
0
            ProtoDigestFunction::Blake3 => Ok(Self::Blake3),
115
0
            v => Err(make_input_err!(
116
0
                "Unknown or unsupported digest function for proto conversion {v:?}"
117
0
            )),
118
        }
119
0
    }
120
}
121
122
impl TryFrom<&str> for DigestHasherFunc {
123
    type Error = Error;
124
125
0
    fn try_from(value: &str) -> Result<Self, Self::Error> {
126
0
        match value.to_uppercase().as_str() {
127
0
            "SHA256" => Ok(Self::Sha256),
128
0
            "BLAKE3" => Ok(Self::Blake3),
129
0
            v => Err(make_input_err!(
130
0
                "Unknown or unsupported digest function for string conversion: {v:?}"
131
0
            )),
132
        }
133
0
    }
134
}
135
136
impl std::fmt::Display for DigestHasherFunc {
137
3
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138
3
        match self {
139
3
            DigestHasherFunc::Sha256 => write!(f, "SHA256"),
140
0
            DigestHasherFunc::Blake3 => write!(f, "BLAKE3"),
141
        }
142
3
    }
143
}
144
145
impl TryFrom<i32> for DigestHasherFunc {
146
    type Error = Error;
147
148
39
    fn try_from(value: i32) -> Result<Self, Self::Error> {
149
39
        // Zero means not-set.
150
39
        if value == 0 {
  Branch (150:12): [True: 17, False: 22]
  Branch (150:12): [Folded - Ignored]
151
17
            return Ok(default_digest_hasher_func());
152
22
        }
153
22
        match ProtoDigestFunction::try_from(value) {
154
19
            Ok(ProtoDigestFunction::Sha256) => Ok(Self::Sha256),
155
3
            Ok(ProtoDigestFunction::Blake3) => Ok(Self::Blake3),
156
0
            value => Err(make_input_err!(
157
0
                "Unknown or unsupported digest function for int conversion: {:?}",
158
0
                value.map(|v| v.as_str_name())
159
            )),
160
        }
161
39
    }
162
}
163
164
impl From<&DigestHasherFunc> for DigestHasherImpl {
165
5.12k
    fn from(value: &DigestHasherFunc) -> Self {
166
5.12k
        let hash_func_impl = match value {
167
92
            DigestHasherFunc::Sha256 => DigestHasherFuncImpl::Sha256(Sha256::new()),
168
5.03k
            DigestHasherFunc::Blake3 => DigestHasherFuncImpl::Blake3(Box::default()),
169
        };
170
5.12k
        Self {
171
5.12k
            hashed_size: 0,
172
5.12k
            hash_func_impl,
173
5.12k
        }
174
5.12k
    }
175
}
176
177
/// Wrapper to compute a hash of arbitrary data.
178
pub trait DigestHasher {
179
    /// Update the hasher with some additional data.
180
    fn update(&mut self, input: &[u8]);
181
182
    /// Finalize the hash function and collect the results into a digest.
183
    fn finalize_digest(&mut self) -> DigestInfo;
184
185
    /// Specialized version of the hashing function that is optimized for
186
    /// handling files. These optimizations take into account things like,
187
    /// the file size and the hasher algorithm to decide how to best process
188
    /// the file and feed it into the hasher.
189
    fn digest_for_file(
190
        self,
191
        file_path: impl AsRef<std::path::Path>,
192
        file: fs::FileSlot,
193
        size_hint: Option<u64>,
194
    ) -> impl Future<Output = Result<(DigestInfo, fs::FileSlot), Error>>;
195
196
    /// Utility function to compute a hash from a generic reader.
197
7
    fn compute_from_reader<R: AsyncRead + Unpin + Send>(
198
7
        &mut self,
199
7
        mut reader: R,
200
7
    ) -> impl Future<Output = Result<DigestInfo, Error>> {
201
7
        async move {
202
7
            let mut chunk = BytesMut::with_capacity(fs::DEFAULT_READ_BUFF_SIZE);
203
            loop {
204
12
                reader
205
12
                    .read_buf(&mut chunk)
206
12
                    .await
207
12
                    .err_tip(|| 
"Could not read chunk during compute_from_reader"0
)
?0
;
208
12
                if chunk.is_empty() {
  Branch (208:20): [Folded - Ignored]
  Branch (208:20): [Folded - Ignored]
  Branch (208:20): [True: 0, False: 0]
  Branch (208:20): [True: 7, False: 5]
209
7
                    break; // EOF.
210
5
                }
211
5
                DigestHasher::update(self, &chunk);
212
5
                chunk.clear();
213
            }
214
7
            Ok(DigestHasher::finalize_digest(self))
215
7
        }
216
7
    }
217
}
218
219
#[expect(
220
    variant_size_differences,
221
    reason = "some variants are already boxed; this is acceptable"
222
)]
223
#[derive(Debug)]
224
pub enum DigestHasherFuncImpl {
225
    Sha256(Sha256),
226
    Blake3(Box<Blake3Hasher>), // Box because Blake3Hasher is 1.3kb in size.
227
}
228
229
/// The individual implementation of the hash function.
230
#[derive(Debug)]
231
pub struct DigestHasherImpl {
232
    hashed_size: u64,
233
    hash_func_impl: DigestHasherFuncImpl,
234
}
235
236
impl DigestHasherImpl {
237
    #[inline]
238
7
    async fn hash_file(
239
7
        &mut self,
240
7
        mut file: fs::FileSlot,
241
7
    ) -> Result<(DigestInfo, fs::FileSlot), Error> {
242
7
        let digest = self
243
7
            .compute_from_reader(&mut file)
244
7
            .await
245
7
            .err_tip(|| 
"In digest_for_file"0
)
?0
;
246
7
        Ok((digest, file))
247
7
    }
248
}
249
250
impl DigestHasher for DigestHasherImpl {
251
    #[inline]
252
5.12k
    fn update(&mut self, input: &[u8]) {
253
5.12k
        self.hashed_size += input.len() as u64;
254
5.12k
        match &mut self.hash_func_impl {
255
90
            DigestHasherFuncImpl::Sha256(h) => sha2::digest::Update::update(h, input),
256
5.03k
            DigestHasherFuncImpl::Blake3(h) => {
257
5.03k
                Blake3Hasher::update(h, input);
258
5.03k
            }
259
        }
260
5.12k
    }
261
262
    #[inline]
263
5.12k
    fn finalize_digest(&mut self) -> DigestInfo {
264
5.12k
        let hash = match &mut self.hash_func_impl {
265
92
            DigestHasherFuncImpl::Sha256(h) => h.finalize_reset().into(),
266
5.03k
            DigestHasherFuncImpl::Blake3(h) => h.finalize().into(),
267
        };
268
5.12k
        DigestInfo::new(hash, self.hashed_size)
269
5.12k
    }
270
271
7
    async fn digest_for_file(
272
7
        mut self,
273
7
        file_path: impl AsRef<std::path::Path>,
274
7
        mut file: fs::FileSlot,
275
7
        size_hint: Option<u64>,
276
7
    ) -> Result<(DigestInfo, fs::FileSlot), Error> {
277
7
        let file_position = file
278
7
            .stream_position()
279
7
            .await
280
7
            .err_tip(|| 
"Couldn't get stream position in digest_for_file"0
)
?0
;
281
7
        if file_position != 0 {
  Branch (281:12): [Folded - Ignored]
  Branch (281:12): [Folded - Ignored]
  Branch (281:12): [True: 0, False: 3]
  Branch (281:12): [True: 0, False: 4]
282
0
            return self.hash_file(file).await;
283
7
        }
284
        // If we are a small file, it's faster to just do it the "slow" way.
285
        // Great read: https://github.com/david-slatinek/c-read-vs.-mmap
286
7
        if let Some(size_hint) = size_hint {
  Branch (286:16): [Folded - Ignored]
  Branch (286:16): [Folded - Ignored]
  Branch (286:16): [True: 3, False: 0]
  Branch (286:16): [True: 4, False: 0]
287
7
            if size_hint <= fs::DEFAULT_READ_BUFF_SIZE as u64 {
  Branch (287:16): [Folded - Ignored]
  Branch (287:16): [Folded - Ignored]
  Branch (287:16): [True: 3, False: 0]
  Branch (287:16): [True: 4, False: 0]
288
7
                return self.hash_file(file).await;
289
0
            }
290
0
        }
291
0
        let file_path = file_path.as_ref().to_path_buf();
292
0
        match self.hash_func_impl {
293
0
            DigestHasherFuncImpl::Sha256(_) => self.hash_file(file).await,
294
0
            DigestHasherFuncImpl::Blake3(mut hasher) => {
295
0
                spawn_blocking!("digest_for_file", move || {
296
0
                    hasher.update_mmap(file_path).map_err(|e| {
297
0
                        make_err!(Code::Internal, "Error in blake3's update_mmap: {e:?}")
298
0
                    })?;
299
0
                    Result::<_, Error>::Ok((
300
0
                        DigestInfo::new(hasher.finalize().into(), hasher.count()),
301
0
                        file,
302
0
                    ))
303
0
                })
304
0
                .await
305
0
                .err_tip(|| "Could not spawn blocking task in digest_for_file")?
306
            }
307
        }
308
7
    }
309
}