Coverage Report

Created: 2025-03-08 07:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/digest_hasher.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::sync::{Arc, OnceLock};
16
17
use blake3::Hasher as Blake3Hasher;
18
use bytes::BytesMut;
19
use futures::Future;
20
use nativelink_config::stores::ConfigDigestHashFunction;
21
use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt};
22
use nativelink_metric::{
23
    MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent,
24
};
25
use nativelink_proto::build::bazel::remote::execution::v2::digest_function::Value as ProtoDigestFunction;
26
use serde::{Deserialize, Serialize};
27
use sha2::{Digest, Sha256};
28
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt};
29
30
use crate::common::DigestInfo;
31
use crate::origin_context::{ActiveOriginContext, OriginContext};
32
use crate::{fs, make_symbol, spawn_blocking};
33
34
// The symbol can be used to retrieve the active hasher function.
35
// from an `OriginContext`.
36
make_symbol!(ACTIVE_HASHER_FUNC, DigestHasherFunc);
37
38
static DEFAULT_DIGEST_HASHER_FUNC: OnceLock<DigestHasherFunc> = OnceLock::new();
39
40
/// Utility function to make a context with a specific hasher function set.
41
36
pub fn make_ctx_for_hash_func<H>(hasher: H) -> Result<Arc<OriginContext>, Error>
42
36
where
43
36
    H: TryInto<DigestHasherFunc>,
44
36
    H::Error: Into<Error>,
45
36
{
46
36
    let digest_hasher_func = hasher
47
36
        .try_into()
48
36
        .err_tip(|| 
"Could not convert into DigestHasherFunc"0
)
?0
;
49
50
36
    let mut new_ctx = ActiveOriginContext::fork().err_tip(|| 
"In BytestreamServer::inner_write"0
)
?0
;
51
36
    new_ctx.set_value(&ACTIVE_HASHER_FUNC, Arc::new(digest_hasher_func));
52
36
    Ok(Arc::new(new_ctx))
53
36
}
54
55
/// Get the default hasher.
56
35
pub fn default_digest_hasher_func() -> DigestHasherFunc {
57
35
    *DEFAULT_DIGEST_HASHER_FUNC.get_or_init(|| 
DigestHasherFunc::Sha2563
)
58
35
}
59
60
/// Sets the default hasher to use if no hasher was requested by the client.
61
0
pub fn set_default_digest_hasher_func(hasher: DigestHasherFunc) -> Result<(), Error> {
62
0
    DEFAULT_DIGEST_HASHER_FUNC
63
0
        .set(hasher)
64
0
        .map_err(|_| make_err!(Code::Internal, "default_digest_hasher_func already set"))
65
0
}
66
67
/// Supported digest hash functions.
68
#[derive(Copy, Clone, Debug, Ord, PartialOrd, Eq, PartialEq, Hash, Serialize, Deserialize)]
69
pub enum DigestHasherFunc {
70
    Sha256,
71
    Blake3,
72
}
73
74
impl MetricsComponent for DigestHasherFunc {
75
0
    fn publish(
76
0
        &self,
77
0
        kind: MetricKind,
78
0
        field_metadata: MetricFieldData,
79
0
    ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
80
0
        format!("{self:?}").publish(kind, field_metadata)
81
0
    }
82
}
83
84
impl DigestHasherFunc {
85
5.12k
    pub fn hasher(&self) -> DigestHasherImpl {
86
5.12k
        self.into()
87
5.12k
    }
88
89
    #[must_use]
90
37
    pub const fn proto_digest_func(&self) -> ProtoDigestFunction {
91
37
        match self {
92
35
            Self::Sha256 => ProtoDigestFunction::Sha256,
93
2
            Self::Blake3 => ProtoDigestFunction::Blake3,
94
        }
95
37
    }
96
}
97
98
impl From<ConfigDigestHashFunction> for DigestHasherFunc {
99
0
    fn from(value: ConfigDigestHashFunction) -> Self {
100
0
        match value {
101
0
            ConfigDigestHashFunction::sha256 => Self::Sha256,
102
0
            ConfigDigestHashFunction::blake3 => Self::Blake3,
103
        }
104
0
    }
105
}
106
107
impl TryFrom<ProtoDigestFunction> for DigestHasherFunc {
108
    type Error = Error;
109
110
0
    fn try_from(value: ProtoDigestFunction) -> Result<Self, Self::Error> {
111
0
        match value {
112
0
            ProtoDigestFunction::Sha256 => Ok(Self::Sha256),
113
0
            ProtoDigestFunction::Blake3 => Ok(Self::Blake3),
114
0
            v => Err(make_input_err!(
115
0
                "Unknown or unsupported digest function for proto conversion {v:?}"
116
0
            )),
117
        }
118
0
    }
119
}
120
121
impl TryFrom<&str> for DigestHasherFunc {
122
    type Error = Error;
123
124
0
    fn try_from(value: &str) -> Result<Self, Self::Error> {
125
0
        match value.to_uppercase().as_str() {
126
0
            "SHA256" => Ok(Self::Sha256),
127
0
            "BLAKE3" => Ok(Self::Blake3),
128
0
            v => Err(make_input_err!(
129
0
                "Unknown or unsupported digest function for string conversion: {v:?}"
130
0
            )),
131
        }
132
0
    }
133
}
134
135
impl std::fmt::Display for DigestHasherFunc {
136
3
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
137
3
        match self {
138
3
            DigestHasherFunc::Sha256 => write!(f, "SHA256"),
139
0
            DigestHasherFunc::Blake3 => write!(f, "BLAKE3"),
140
        }
141
3
    }
142
}
143
144
impl TryFrom<i32> for DigestHasherFunc {
145
    type Error = Error;
146
147
37
    fn try_from(value: i32) -> Result<Self, Self::Error> {
148
37
        // Zero means not-set.
149
37
        if value == 0 {
  Branch (149:12): [True: 15, False: 22]
  Branch (149:12): [Folded - Ignored]
150
15
            return Ok(default_digest_hasher_func());
151
22
        }
152
22
        match ProtoDigestFunction::try_from(value) {
153
19
            Ok(ProtoDigestFunction::Sha256) => Ok(Self::Sha256),
154
3
            Ok(ProtoDigestFunction::Blake3) => Ok(Self::Blake3),
155
0
            value => Err(make_input_err!(
156
0
                "Unknown or unsupported digest function for int conversion: {:?}",
157
0
                value.map(|v| v.as_str_name())
158
0
            )),
159
        }
160
37
    }
161
}
162
163
impl From<&DigestHasherFunc> for DigestHasherImpl {
164
5.12k
    fn from(value: &DigestHasherFunc) -> Self {
165
5.12k
        let hash_func_impl = match value {
166
92
            DigestHasherFunc::Sha256 => DigestHasherFuncImpl::Sha256(Sha256::new()),
167
5.03k
            DigestHasherFunc::Blake3 => DigestHasherFuncImpl::Blake3(Box::new(Blake3Hasher::new())),
168
        };
169
5.12k
        Self {
170
5.12k
            hashed_size: 0,
171
5.12k
            hash_func_impl,
172
5.12k
        }
173
5.12k
    }
174
}
175
176
/// Wrapper to compute a hash of arbitrary data.
177
pub trait DigestHasher {
178
    /// Update the hasher with some additional data.
179
    fn update(&mut self, input: &[u8]);
180
181
    /// Finalize the hash function and collect the results into a digest.
182
    fn finalize_digest(&mut self) -> DigestInfo;
183
184
    /// Specialized version of the hashing function that is optimized for
185
    /// handling files. These optimizations take into account things like,
186
    /// the file size and the hasher algorithm to decide how to best process
187
    /// the file and feed it into the hasher.
188
    fn digest_for_file(
189
        self,
190
        file_path: impl AsRef<std::path::Path>,
191
        file: fs::FileSlot,
192
        size_hint: Option<u64>,
193
    ) -> impl Future<Output = Result<(DigestInfo, fs::FileSlot), Error>>;
194
195
    /// Utility function to compute a hash from a generic reader.
196
7
    fn compute_from_reader<R: AsyncRead + Unpin + Send>(
197
7
        &mut self,
198
7
        mut reader: R,
199
7
    ) -> impl Future<Output = Result<DigestInfo, Error>> {
200
7
        async move {
201
7
            let mut chunk = BytesMut::with_capacity(fs::DEFAULT_READ_BUFF_SIZE);
202
            loop {
203
12
                reader
204
12
                    .read_buf(&mut chunk)
205
12
                    .await
206
12
                    .err_tip(|| 
"Could not read chunk during compute_from_reader"0
)
?0
;
207
12
                if chunk.is_empty() {
  Branch (207:20): [Folded - Ignored]
  Branch (207:20): [Folded - Ignored]
  Branch (207:20): [True: 0, False: 0]
  Branch (207:20): [True: 7, False: 5]
208
7
                    break; // EOF.
209
5
                }
210
5
                DigestHasher::update(self, &chunk);
211
5
                chunk.clear();
212
            }
213
7
            Ok(DigestHasher::finalize_digest(self))
214
7
        }
215
7
    }
216
}
217
218
pub enum DigestHasherFuncImpl {
219
    Sha256(Sha256),
220
    Blake3(Box<Blake3Hasher>), // Box because Blake3Hasher is 1.3kb in size.
221
}
222
223
/// The individual implementation of the hash function.
224
pub struct DigestHasherImpl {
225
    hashed_size: u64,
226
    hash_func_impl: DigestHasherFuncImpl,
227
}
228
229
impl DigestHasherImpl {
230
    #[inline]
231
0
    async fn hash_file(
232
0
        &mut self,
233
0
        mut file: fs::FileSlot,
234
7
    ) -> Result<(DigestInfo, fs::FileSlot), Error> {
235
7
        let digest = self
236
7
            .compute_from_reader(&mut file)
237
7
            .await
238
7
            .err_tip(|| 
"In digest_for_file"0
)
?0
;
239
7
        Ok((digest, file))
240
7
    }
241
}
242
243
impl DigestHasher for DigestHasherImpl {
244
    #[inline]
245
5.12k
    fn update(&mut self, input: &[u8]) {
246
5.12k
        self.hashed_size += input.len() as u64;
247
5.12k
        match &mut self.hash_func_impl {
248
90
            DigestHasherFuncImpl::Sha256(h) => sha2::digest::Update::update(h, input),
249
5.03k
            DigestHasherFuncImpl::Blake3(h) => {
250
5.03k
                Blake3Hasher::update(h, input);
251
5.03k
            }
252
        }
253
5.12k
    }
254
255
    #[inline]
256
5.12k
    fn finalize_digest(&mut self) -> DigestInfo {
257
5.12k
        let hash = match &mut self.hash_func_impl {
258
92
            DigestHasherFuncImpl::Sha256(h) => h.finalize_reset().into(),
259
5.03k
            DigestHasherFuncImpl::Blake3(h) => h.finalize().into(),
260
        };
261
5.12k
        DigestInfo::new(hash, self.hashed_size)
262
5.12k
    }
263
264
7
    async fn digest_for_file(
265
7
        mut self,
266
7
        file_path: impl AsRef<std::path::Path>,
267
7
        mut file: fs::FileSlot,
268
7
        size_hint: Option<u64>,
269
7
    ) -> Result<(DigestInfo, fs::FileSlot), Error> {
270
7
        let file_position = file
271
7
            .stream_position()
272
7
            .await
273
7
            .err_tip(|| 
"Couldn't get stream position in digest_for_file"0
)
?0
;
274
7
        if file_position != 0 {
  Branch (274:12): [Folded - Ignored]
  Branch (274:12): [Folded - Ignored]
  Branch (274:12): [True: 0, False: 4]
  Branch (274:12): [True: 0, False: 3]
275
0
            return self.hash_file(file).await;
276
7
        }
277
        // If we are a small file, it's faster to just do it the "slow" way.
278
        // Great read: https://github.com/david-slatinek/c-read-vs.-mmap
279
7
        if let Some(size_hint) = size_hint {
  Branch (279:16): [Folded - Ignored]
  Branch (279:16): [Folded - Ignored]
  Branch (279:16): [True: 4, False: 0]
  Branch (279:16): [True: 3, False: 0]
280
7
            if size_hint <= fs::DEFAULT_READ_BUFF_SIZE as u64 {
  Branch (280:16): [Folded - Ignored]
  Branch (280:16): [Folded - Ignored]
  Branch (280:16): [True: 4, False: 0]
  Branch (280:16): [True: 3, False: 0]
281
7
                return self.hash_file(file).await;
282
0
            }
283
0
        }
284
0
        let file_path = file_path.as_ref().to_path_buf();
285
0
        match self.hash_func_impl {
286
0
            DigestHasherFuncImpl::Sha256(_) => self.hash_file(file).await,
287
0
            DigestHasherFuncImpl::Blake3(mut hasher) => {
288
0
                spawn_blocking!("digest_for_file", move || {
289
0
                    hasher.update_mmap(file_path).map_err(|e| {
290
0
                        make_err!(Code::Internal, "Error in blake3's update_mmap: {e:?}")
291
0
                    })?;
292
0
                    Result::<_, Error>::Ok((
293
0
                        DigestInfo::new(hasher.finalize().into(), hasher.count()),
294
0
                        file,
295
0
                    ))
296
0
                })
297
0
                .await
298
0
                .err_tip(|| "Could not spawn blocking task in digest_for_file")?
299
            }
300
        }
301
7
    }
302
}