/build/source/nativelink-util/src/digest_hasher.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use std::sync::{Arc, OnceLock}; |
16 | | |
17 | | use blake3::Hasher as Blake3Hasher; |
18 | | use bytes::BytesMut; |
19 | | use futures::Future; |
20 | | use nativelink_config::stores::ConfigDigestHashFunction; |
21 | | use nativelink_error::{Code, Error, ResultExt, make_err, make_input_err}; |
22 | | use nativelink_metric::{ |
23 | | MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent, |
24 | | }; |
25 | | use nativelink_proto::build::bazel::remote::execution::v2::digest_function::Value as ProtoDigestFunction; |
26 | | use serde::{Deserialize, Serialize}; |
27 | | use sha2::{Digest, Sha256}; |
28 | | use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt}; |
29 | | |
30 | | use crate::common::DigestInfo; |
31 | | use crate::origin_context::{ActiveOriginContext, OriginContext}; |
32 | | use crate::{fs, spawn_blocking, unsafe_make_symbol}; |
33 | | |
34 | | // The symbol can be used to retrieve the active hasher function. |
35 | | // from an `OriginContext`. |
36 | | // Safety: There is no other symbol named `ACTIVE_HASHER_FUNC`. |
37 | | unsafe_make_symbol!(ACTIVE_HASHER_FUNC, DigestHasherFunc); |
38 | | |
39 | | static DEFAULT_DIGEST_HASHER_FUNC: OnceLock<DigestHasherFunc> = OnceLock::new(); |
40 | | |
41 | | /// Utility function to make a context with a specific hasher function set. |
42 | 38 | pub fn make_ctx_for_hash_func<H>(hasher: H) -> Result<Arc<OriginContext>, Error> |
43 | 38 | where |
44 | 38 | H: TryInto<DigestHasherFunc>, |
45 | 38 | H::Error: Into<Error>, |
46 | 38 | { |
47 | 38 | let digest_hasher_func = hasher |
48 | 38 | .try_into() |
49 | 38 | .err_tip(|| "Could not convert into DigestHasherFunc"0 )?0 ; |
50 | | |
51 | 38 | let mut new_ctx = ActiveOriginContext::fork().err_tip(|| "In BytestreamServer::inner_write"0 )?0 ; |
52 | 38 | new_ctx.set_value(&ACTIVE_HASHER_FUNC, Arc::new(digest_hasher_func)); |
53 | 38 | Ok(Arc::new(new_ctx)) |
54 | 38 | } |
55 | | |
56 | | /// Get the default hasher. |
57 | 38 | pub fn default_digest_hasher_func() -> DigestHasherFunc { |
58 | 38 | *DEFAULT_DIGEST_HASHER_FUNC.get_or_init(|| DigestHasherFunc::Sha2565 ) |
59 | 38 | } |
60 | | |
61 | | /// Sets the default hasher to use if no hasher was requested by the client. |
62 | 0 | pub fn set_default_digest_hasher_func(hasher: DigestHasherFunc) -> Result<(), Error> { |
63 | 0 | DEFAULT_DIGEST_HASHER_FUNC |
64 | 0 | .set(hasher) |
65 | 0 | .map_err(|_| make_err!(Code::Internal, "default_digest_hasher_func already set")) |
66 | 0 | } |
67 | | |
68 | | /// Supported digest hash functions. |
69 | | #[derive(Copy, Clone, Debug, Ord, PartialOrd, Eq, PartialEq, Hash, Serialize, Deserialize)] |
70 | | pub enum DigestHasherFunc { |
71 | | Sha256, |
72 | | Blake3, |
73 | | } |
74 | | |
75 | | impl MetricsComponent for DigestHasherFunc { |
76 | 0 | fn publish( |
77 | 0 | &self, |
78 | 0 | kind: MetricKind, |
79 | 0 | field_metadata: MetricFieldData, |
80 | 0 | ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> { |
81 | 0 | format!("{self:?}").publish(kind, field_metadata) |
82 | 0 | } |
83 | | } |
84 | | |
85 | | impl DigestHasherFunc { |
86 | 5.12k | pub fn hasher(&self) -> DigestHasherImpl { |
87 | 5.12k | self.into() |
88 | 5.12k | } |
89 | | |
90 | | #[must_use] |
91 | 38 | pub const fn proto_digest_func(&self) -> ProtoDigestFunction { |
92 | 38 | match self { |
93 | 36 | Self::Sha256 => ProtoDigestFunction::Sha256, |
94 | 2 | Self::Blake3 => ProtoDigestFunction::Blake3, |
95 | | } |
96 | 38 | } |
97 | | } |
98 | | |
99 | | impl From<ConfigDigestHashFunction> for DigestHasherFunc { |
100 | 0 | fn from(value: ConfigDigestHashFunction) -> Self { |
101 | 0 | match value { |
102 | 0 | ConfigDigestHashFunction::Sha256 => Self::Sha256, |
103 | 0 | ConfigDigestHashFunction::Blake3 => Self::Blake3, |
104 | | } |
105 | 0 | } |
106 | | } |
107 | | |
108 | | impl TryFrom<ProtoDigestFunction> for DigestHasherFunc { |
109 | | type Error = Error; |
110 | | |
111 | 0 | fn try_from(value: ProtoDigestFunction) -> Result<Self, Self::Error> { |
112 | 0 | match value { |
113 | 0 | ProtoDigestFunction::Sha256 => Ok(Self::Sha256), |
114 | 0 | ProtoDigestFunction::Blake3 => Ok(Self::Blake3), |
115 | 0 | v => Err(make_input_err!( |
116 | 0 | "Unknown or unsupported digest function for proto conversion {v:?}" |
117 | 0 | )), |
118 | | } |
119 | 0 | } |
120 | | } |
121 | | |
122 | | impl TryFrom<&str> for DigestHasherFunc { |
123 | | type Error = Error; |
124 | | |
125 | 0 | fn try_from(value: &str) -> Result<Self, Self::Error> { |
126 | 0 | match value.to_uppercase().as_str() { |
127 | 0 | "SHA256" => Ok(Self::Sha256), |
128 | 0 | "BLAKE3" => Ok(Self::Blake3), |
129 | 0 | v => Err(make_input_err!( |
130 | 0 | "Unknown or unsupported digest function for string conversion: {v:?}" |
131 | 0 | )), |
132 | | } |
133 | 0 | } |
134 | | } |
135 | | |
136 | | impl std::fmt::Display for DigestHasherFunc { |
137 | 3 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
138 | 3 | match self { |
139 | 3 | DigestHasherFunc::Sha256 => write!(f, "SHA256"), |
140 | 0 | DigestHasherFunc::Blake3 => write!(f, "BLAKE3"), |
141 | | } |
142 | 3 | } |
143 | | } |
144 | | |
145 | | impl TryFrom<i32> for DigestHasherFunc { |
146 | | type Error = Error; |
147 | | |
148 | 39 | fn try_from(value: i32) -> Result<Self, Self::Error> { |
149 | 39 | // Zero means not-set. |
150 | 39 | if value == 0 { Branch (150:12): [True: 17, False: 22]
Branch (150:12): [Folded - Ignored]
|
151 | 17 | return Ok(default_digest_hasher_func()); |
152 | 22 | } |
153 | 22 | match ProtoDigestFunction::try_from(value) { |
154 | 19 | Ok(ProtoDigestFunction::Sha256) => Ok(Self::Sha256), |
155 | 3 | Ok(ProtoDigestFunction::Blake3) => Ok(Self::Blake3), |
156 | 0 | value => Err(make_input_err!( |
157 | 0 | "Unknown or unsupported digest function for int conversion: {:?}", |
158 | 0 | value.map(|v| v.as_str_name()) |
159 | | )), |
160 | | } |
161 | 39 | } |
162 | | } |
163 | | |
164 | | impl From<&DigestHasherFunc> for DigestHasherImpl { |
165 | 5.12k | fn from(value: &DigestHasherFunc) -> Self { |
166 | 5.12k | let hash_func_impl = match value { |
167 | 92 | DigestHasherFunc::Sha256 => DigestHasherFuncImpl::Sha256(Sha256::new()), |
168 | 5.03k | DigestHasherFunc::Blake3 => DigestHasherFuncImpl::Blake3(Box::default()), |
169 | | }; |
170 | 5.12k | Self { |
171 | 5.12k | hashed_size: 0, |
172 | 5.12k | hash_func_impl, |
173 | 5.12k | } |
174 | 5.12k | } |
175 | | } |
176 | | |
177 | | /// Wrapper to compute a hash of arbitrary data. |
178 | | pub trait DigestHasher { |
179 | | /// Update the hasher with some additional data. |
180 | | fn update(&mut self, input: &[u8]); |
181 | | |
182 | | /// Finalize the hash function and collect the results into a digest. |
183 | | fn finalize_digest(&mut self) -> DigestInfo; |
184 | | |
185 | | /// Specialized version of the hashing function that is optimized for |
186 | | /// handling files. These optimizations take into account things like, |
187 | | /// the file size and the hasher algorithm to decide how to best process |
188 | | /// the file and feed it into the hasher. |
189 | | fn digest_for_file( |
190 | | self, |
191 | | file_path: impl AsRef<std::path::Path>, |
192 | | file: fs::FileSlot, |
193 | | size_hint: Option<u64>, |
194 | | ) -> impl Future<Output = Result<(DigestInfo, fs::FileSlot), Error>>; |
195 | | |
196 | | /// Utility function to compute a hash from a generic reader. |
197 | 7 | fn compute_from_reader<R: AsyncRead + Unpin + Send>( |
198 | 7 | &mut self, |
199 | 7 | mut reader: R, |
200 | 7 | ) -> impl Future<Output = Result<DigestInfo, Error>> { |
201 | 7 | async move { |
202 | 7 | let mut chunk = BytesMut::with_capacity(fs::DEFAULT_READ_BUFF_SIZE); |
203 | | loop { |
204 | 12 | reader |
205 | 12 | .read_buf(&mut chunk) |
206 | 12 | .await |
207 | 12 | .err_tip(|| "Could not read chunk during compute_from_reader"0 )?0 ; |
208 | 12 | if chunk.is_empty() { Branch (208:20): [Folded - Ignored]
Branch (208:20): [Folded - Ignored]
Branch (208:20): [True: 0, False: 0]
Branch (208:20): [True: 7, False: 5]
|
209 | 7 | break; // EOF. |
210 | 5 | } |
211 | 5 | DigestHasher::update(self, &chunk); |
212 | 5 | chunk.clear(); |
213 | | } |
214 | 7 | Ok(DigestHasher::finalize_digest(self)) |
215 | 7 | } |
216 | 7 | } |
217 | | } |
218 | | |
219 | | #[expect( |
220 | | variant_size_differences, |
221 | | reason = "some variants are already boxed; this is acceptable" |
222 | | )] |
223 | | #[derive(Debug)] |
224 | | pub enum DigestHasherFuncImpl { |
225 | | Sha256(Sha256), |
226 | | Blake3(Box<Blake3Hasher>), // Box because Blake3Hasher is 1.3kb in size. |
227 | | } |
228 | | |
229 | | /// The individual implementation of the hash function. |
230 | | #[derive(Debug)] |
231 | | pub struct DigestHasherImpl { |
232 | | hashed_size: u64, |
233 | | hash_func_impl: DigestHasherFuncImpl, |
234 | | } |
235 | | |
236 | | impl DigestHasherImpl { |
237 | | #[inline] |
238 | 7 | async fn hash_file( |
239 | 7 | &mut self, |
240 | 7 | mut file: fs::FileSlot, |
241 | 7 | ) -> Result<(DigestInfo, fs::FileSlot), Error> { |
242 | 7 | let digest = self |
243 | 7 | .compute_from_reader(&mut file) |
244 | 7 | .await |
245 | 7 | .err_tip(|| "In digest_for_file"0 )?0 ; |
246 | 7 | Ok((digest, file)) |
247 | 7 | } |
248 | | } |
249 | | |
250 | | impl DigestHasher for DigestHasherImpl { |
251 | | #[inline] |
252 | 5.12k | fn update(&mut self, input: &[u8]) { |
253 | 5.12k | self.hashed_size += input.len() as u64; |
254 | 5.12k | match &mut self.hash_func_impl { |
255 | 90 | DigestHasherFuncImpl::Sha256(h) => sha2::digest::Update::update(h, input), |
256 | 5.03k | DigestHasherFuncImpl::Blake3(h) => { |
257 | 5.03k | Blake3Hasher::update(h, input); |
258 | 5.03k | } |
259 | | } |
260 | 5.12k | } |
261 | | |
262 | | #[inline] |
263 | 5.12k | fn finalize_digest(&mut self) -> DigestInfo { |
264 | 5.12k | let hash = match &mut self.hash_func_impl { |
265 | 92 | DigestHasherFuncImpl::Sha256(h) => h.finalize_reset().into(), |
266 | 5.03k | DigestHasherFuncImpl::Blake3(h) => h.finalize().into(), |
267 | | }; |
268 | 5.12k | DigestInfo::new(hash, self.hashed_size) |
269 | 5.12k | } |
270 | | |
271 | 7 | async fn digest_for_file( |
272 | 7 | mut self, |
273 | 7 | file_path: impl AsRef<std::path::Path>, |
274 | 7 | mut file: fs::FileSlot, |
275 | 7 | size_hint: Option<u64>, |
276 | 7 | ) -> Result<(DigestInfo, fs::FileSlot), Error> { |
277 | 7 | let file_position = file |
278 | 7 | .stream_position() |
279 | 7 | .await |
280 | 7 | .err_tip(|| "Couldn't get stream position in digest_for_file"0 )?0 ; |
281 | 7 | if file_position != 0 { Branch (281:12): [Folded - Ignored]
Branch (281:12): [Folded - Ignored]
Branch (281:12): [True: 0, False: 3]
Branch (281:12): [True: 0, False: 4]
|
282 | 0 | return self.hash_file(file).await; |
283 | 7 | } |
284 | | // If we are a small file, it's faster to just do it the "slow" way. |
285 | | // Great read: https://github.com/david-slatinek/c-read-vs.-mmap |
286 | 7 | if let Some(size_hint) = size_hint { Branch (286:16): [Folded - Ignored]
Branch (286:16): [Folded - Ignored]
Branch (286:16): [True: 3, False: 0]
Branch (286:16): [True: 4, False: 0]
|
287 | 7 | if size_hint <= fs::DEFAULT_READ_BUFF_SIZE as u64 { Branch (287:16): [Folded - Ignored]
Branch (287:16): [Folded - Ignored]
Branch (287:16): [True: 3, False: 0]
Branch (287:16): [True: 4, False: 0]
|
288 | 7 | return self.hash_file(file).await; |
289 | 0 | } |
290 | 0 | } |
291 | 0 | let file_path = file_path.as_ref().to_path_buf(); |
292 | 0 | match self.hash_func_impl { |
293 | 0 | DigestHasherFuncImpl::Sha256(_) => self.hash_file(file).await, |
294 | 0 | DigestHasherFuncImpl::Blake3(mut hasher) => { |
295 | 0 | spawn_blocking!("digest_for_file", move || { |
296 | 0 | hasher.update_mmap(file_path).map_err(|e| { |
297 | 0 | make_err!(Code::Internal, "Error in blake3's update_mmap: {e:?}") |
298 | 0 | })?; |
299 | 0 | Result::<_, Error>::Ok(( |
300 | 0 | DigestInfo::new(hasher.finalize().into(), hasher.count()), |
301 | 0 | file, |
302 | 0 | )) |
303 | 0 | }) |
304 | 0 | .await |
305 | 0 | .err_tip(|| "Could not spawn blocking task in digest_for_file")? |
306 | | } |
307 | | } |
308 | 7 | } |
309 | | } |