Coverage Report

Created: 2025-05-30 16:37

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/digest_hasher.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::sync::OnceLock;
16
17
use blake3::Hasher as Blake3Hasher;
18
use bytes::BytesMut;
19
use futures::Future;
20
use nativelink_config::stores::ConfigDigestHashFunction;
21
use nativelink_error::{Code, Error, ResultExt, make_err, make_input_err};
22
use nativelink_metric::{
23
    MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent,
24
};
25
use nativelink_proto::build::bazel::remote::execution::v2::digest_function::Value as ProtoDigestFunction;
26
use opentelemetry::context::Context;
27
use serde::{Deserialize, Serialize};
28
use sha2::{Digest, Sha256};
29
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt};
30
31
use crate::common::DigestInfo;
32
use crate::{fs, spawn_blocking};
33
34
static DEFAULT_DIGEST_HASHER_FUNC: OnceLock<DigestHasherFunc> = OnceLock::new();
35
36
/// Utility function to make a context with a specific hasher function set.
37
39
pub fn make_ctx_for_hash_func<H>(hasher: H) -> Result<Context, Error>
38
39
where
39
39
    H: TryInto<DigestHasherFunc>,
40
39
    H::Error: Into<Error>,
41
{
42
39
    let digest_hasher_func = hasher
43
39
        .try_into()
44
39
        .err_tip(|| "Could not convert into DigestHasherFunc")
?0
;
45
46
39
    let new_ctx = Context::current_with_value(digest_hasher_func);
47
48
39
    Ok(new_ctx)
49
39
}
50
51
/// Get the default hasher.
52
39
pub fn default_digest_hasher_func() -> DigestHasherFunc {
53
39
    *DEFAULT_DIGEST_HASHER_FUNC.get_or_init(|| DigestHasherFunc::Sha256)
54
39
}
55
56
/// Sets the default hasher to use if no hasher was requested by the client.
57
0
pub fn set_default_digest_hasher_func(hasher: DigestHasherFunc) -> Result<(), Error> {
58
0
    DEFAULT_DIGEST_HASHER_FUNC
59
0
        .set(hasher)
60
0
        .map_err(|_| make_err!(Code::Internal, "default_digest_hasher_func already set"))
61
0
}
62
63
/// Supported digest hash functions.
64
#[derive(Copy, Clone, Debug, Ord, PartialOrd, Eq, PartialEq, Hash, Serialize, Deserialize)]
65
pub enum DigestHasherFunc {
66
    Sha256,
67
    Blake3,
68
}
69
70
impl MetricsComponent for DigestHasherFunc {
71
0
    fn publish(
72
0
        &self,
73
0
        kind: MetricKind,
74
0
        field_metadata: MetricFieldData,
75
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
76
0
        format!("{self:?}").publish(kind, field_metadata)
77
0
    }
78
}
79
80
impl DigestHasherFunc {
81
5.12k
    pub fn hasher(&self) -> DigestHasherImpl {
82
5.12k
        self.into()
83
5.12k
    }
84
85
    #[must_use]
86
38
    pub const fn proto_digest_func(&self) -> ProtoDigestFunction {
87
38
        match self {
88
36
            Self::Sha256 => ProtoDigestFunction::Sha256,
89
2
            Self::Blake3 => ProtoDigestFunction::Blake3,
90
        }
91
38
    }
92
}
93
94
impl From<ConfigDigestHashFunction> for DigestHasherFunc {
95
0
    fn from(value: ConfigDigestHashFunction) -> Self {
96
0
        match value {
97
0
            ConfigDigestHashFunction::Sha256 => Self::Sha256,
98
0
            ConfigDigestHashFunction::Blake3 => Self::Blake3,
99
        }
100
0
    }
101
}
102
103
impl TryFrom<ProtoDigestFunction> for DigestHasherFunc {
104
    type Error = Error;
105
106
0
    fn try_from(value: ProtoDigestFunction) -> Result<Self, Self::Error> {
107
0
        match value {
108
0
            ProtoDigestFunction::Sha256 => Ok(Self::Sha256),
109
0
            ProtoDigestFunction::Blake3 => Ok(Self::Blake3),
110
0
            v => Err(make_input_err!(
111
0
                "Unknown or unsupported digest function for proto conversion {v:?}"
112
0
            )),
113
        }
114
0
    }
115
}
116
117
impl TryFrom<&str> for DigestHasherFunc {
118
    type Error = Error;
119
120
0
    fn try_from(value: &str) -> Result<Self, Self::Error> {
121
0
        match value.to_uppercase().as_str() {
122
0
            "SHA256" => Ok(Self::Sha256),
123
0
            "BLAKE3" => Ok(Self::Blake3),
124
0
            v => Err(make_input_err!(
125
0
                "Unknown or unsupported digest function for string conversion: {v:?}"
126
0
            )),
127
        }
128
0
    }
129
}
130
131
impl core::fmt::Display for DigestHasherFunc {
132
7
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
133
7
        match self {
134
5
            Self::Sha256 => write!(f, "SHA256"),
135
2
            Self::Blake3 => write!(f, "BLAKE3"),
136
        }
137
7
    }
138
}
139
140
impl TryFrom<i32> for DigestHasherFunc {
141
    type Error = Error;
142
143
39
    fn try_from(value: i32) -> Result<Self, Self::Error> {
144
        // Zero means not-set.
145
39
        if value == 0 {
  Branch (145:12): [True: 17, False: 22]
  Branch (145:12): [Folded - Ignored]
146
17
            return Ok(default_digest_hasher_func());
147
22
        }
148
22
        match ProtoDigestFunction::try_from(value) {
149
19
            Ok(ProtoDigestFunction::Sha256) => Ok(Self::Sha256),
150
3
            Ok(ProtoDigestFunction::Blake3) => Ok(Self::Blake3),
151
0
            value => Err(make_input_err!(
152
                "Unknown or unsupported digest function for int conversion: {:?}",
153
0
                value.map(|v| v.as_str_name())
154
            )),
155
        }
156
39
    }
157
}
158
159
impl From<&DigestHasherFunc> for DigestHasherImpl {
160
5.12k
    fn from(value: &DigestHasherFunc) -> Self {
161
5.12k
        let hash_func_impl = match value {
162
92
            DigestHasherFunc::Sha256 => DigestHasherFuncImpl::Sha256(Sha256::new()),
163
5.03k
            DigestHasherFunc::Blake3 => DigestHasherFuncImpl::Blake3(Box::default()),
164
        };
165
5.12k
        Self {
166
5.12k
            hashed_size: 0,
167
5.12k
            hash_func_impl,
168
5.12k
        }
169
5.12k
    }
170
}
171
172
/// Wrapper to compute a hash of arbitrary data.
173
pub trait DigestHasher {
174
    /// Update the hasher with some additional data.
175
    fn update(&mut self, input: &[u8]);
176
177
    /// Finalize the hash function and collect the results into a digest.
178
    fn finalize_digest(&mut self) -> DigestInfo;
179
180
    /// Specialized version of the hashing function that is optimized for
181
    /// handling files. These optimizations take into account things like,
182
    /// the file size and the hasher algorithm to decide how to best process
183
    /// the file and feed it into the hasher.
184
    fn digest_for_file(
185
        self,
186
        file_path: impl AsRef<std::path::Path>,
187
        file: fs::FileSlot,
188
        size_hint: Option<u64>,
189
    ) -> impl Future<Output = Result<(DigestInfo, fs::FileSlot), Error>>;
190
191
    /// Utility function to compute a hash from a generic reader.
192
7
    fn compute_from_reader<R: AsyncRead + Unpin + Send>(
193
7
        &mut self,
194
7
        mut reader: R,
195
7
    ) -> impl Future<Output = Result<DigestInfo, Error>> {
196
7
        async move {
197
7
            let mut chunk = BytesMut::with_capacity(fs::DEFAULT_READ_BUFF_SIZE);
198
            loop {
199
12
                reader
200
12
                    .read_buf(&mut chunk)
201
12
                    .await
202
12
                    .err_tip(|| "Could not read chunk during compute_from_reader")
?0
;
203
12
                if chunk.is_empty() {
  Branch (203:20): [Folded - Ignored]
  Branch (203:20): [Folded - Ignored]
  Branch (203:20): [True: 0, False: 0]
  Branch (203:20): [True: 7, False: 5]
204
7
                    break; // EOF.
205
5
                }
206
5
                DigestHasher::update(self, &chunk);
207
5
                chunk.clear();
208
            }
209
7
            Ok(DigestHasher::finalize_digest(self))
210
7
        }
211
7
    }
212
}
213
214
#[expect(
215
    variant_size_differences,
216
    reason = "some variants are already boxed; this is acceptable"
217
)]
218
#[derive(Debug)]
219
pub enum DigestHasherFuncImpl {
220
    Sha256(Sha256),
221
    Blake3(Box<Blake3Hasher>), // Box because Blake3Hasher is 1.3kb in size.
222
}
223
224
/// The individual implementation of the hash function.
225
#[derive(Debug)]
226
pub struct DigestHasherImpl {
227
    hashed_size: u64,
228
    hash_func_impl: DigestHasherFuncImpl,
229
}
230
231
impl DigestHasherImpl {
232
    #[inline]
233
7
    async fn hash_file(
234
7
        &mut self,
235
7
        mut file: fs::FileSlot,
236
7
    ) -> Result<(DigestInfo, fs::FileSlot), Error> {
237
7
        let digest = self
238
7
            .compute_from_reader(&mut file)
239
7
            .await
240
7
            .err_tip(|| "In digest_for_file")
?0
;
241
7
        Ok((digest, file))
242
7
    }
243
}
244
245
impl DigestHasher for DigestHasherImpl {
246
    #[inline]
247
5.12k
    fn update(&mut self, input: &[u8]) {
248
5.12k
        self.hashed_size += input.len() as u64;
249
5.12k
        match &mut self.hash_func_impl {
250
90
            DigestHasherFuncImpl::Sha256(h) => sha2::digest::Update::update(h, input),
251
5.03k
            DigestHasherFuncImpl::Blake3(h) => {
252
5.03k
                Blake3Hasher::update(h, input);
253
5.03k
            }
254
        }
255
5.12k
    }
256
257
    #[inline]
258
5.12k
    fn finalize_digest(&mut self) -> DigestInfo {
259
5.12k
        let hash = match &mut self.hash_func_impl {
260
92
            DigestHasherFuncImpl::Sha256(h) => h.finalize_reset().into(),
261
5.03k
            DigestHasherFuncImpl::Blake3(h) => h.finalize().into(),
262
        };
263
5.12k
        DigestInfo::new(hash, self.hashed_size)
264
5.12k
    }
265
266
7
    async fn digest_for_file(
267
7
        mut self,
268
7
        file_path: impl AsRef<std::path::Path>,
269
7
        mut file: fs::FileSlot,
270
7
        size_hint: Option<u64>,
271
7
    ) -> Result<(DigestInfo, fs::FileSlot), Error> {
272
7
        let file_position = file
273
7
            .stream_position()
274
7
            .await
275
7
            .err_tip(|| "Couldn't get stream position in digest_for_file")
?0
;
276
7
        if file_position != 0 {
  Branch (276:12): [Folded - Ignored]
  Branch (276:12): [Folded - Ignored]
  Branch (276:12): [True: 0, False: 3]
  Branch (276:12): [True: 0, False: 4]
277
0
            return self.hash_file(file).await;
278
7
        }
279
        // If we are a small file, it's faster to just do it the "slow" way.
280
        // Great read: https://github.com/david-slatinek/c-read-vs.-mmap
281
7
        if let Some(size_hint) = size_hint {
  Branch (281:16): [Folded - Ignored]
  Branch (281:16): [Folded - Ignored]
  Branch (281:16): [True: 3, False: 0]
  Branch (281:16): [True: 4, False: 0]
282
7
            if size_hint <= fs::DEFAULT_READ_BUFF_SIZE as u64 {
  Branch (282:16): [Folded - Ignored]
  Branch (282:16): [Folded - Ignored]
  Branch (282:16): [True: 3, False: 0]
  Branch (282:16): [True: 4, False: 0]
283
7
                return self.hash_file(file).await;
284
0
            }
285
0
        }
286
0
        let file_path = file_path.as_ref().to_path_buf();
287
0
        match self.hash_func_impl {
288
0
            DigestHasherFuncImpl::Sha256(_) => self.hash_file(file).await,
289
0
            DigestHasherFuncImpl::Blake3(mut hasher) => {
290
0
                spawn_blocking!("digest_for_file", move || {
291
0
                    hasher.update_mmap(file_path).map_err(|e| {
292
0
                        make_err!(Code::Internal, "Error in blake3's update_mmap: {e:?}")
293
0
                    })?;
294
0
                    Result::<_, Error>::Ok((
295
0
                        DigestInfo::new(hasher.finalize().into(), hasher.count()),
296
0
                        file,
297
0
                    ))
298
0
                })
299
0
                .await
300
0
                .err_tip(|| "Could not spawn blocking task in digest_for_file")?
301
            }
302
        }
303
7
    }
304
}