/build/source/nativelink-util/src/digest_hasher.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use std::sync::OnceLock; |
16 | | |
17 | | use blake3::Hasher as Blake3Hasher; |
18 | | use bytes::BytesMut; |
19 | | use futures::Future; |
20 | | use nativelink_config::stores::ConfigDigestHashFunction; |
21 | | use nativelink_error::{Code, Error, ResultExt, make_err, make_input_err}; |
22 | | use nativelink_metric::{ |
23 | | MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent, |
24 | | }; |
25 | | use nativelink_proto::build::bazel::remote::execution::v2::digest_function::Value as ProtoDigestFunction; |
26 | | use opentelemetry::context::Context; |
27 | | use serde::{Deserialize, Serialize}; |
28 | | use sha2::{Digest, Sha256}; |
29 | | use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt}; |
30 | | |
31 | | use crate::common::DigestInfo; |
32 | | use crate::{fs, spawn_blocking}; |
33 | | |
34 | | static DEFAULT_DIGEST_HASHER_FUNC: OnceLock<DigestHasherFunc> = OnceLock::new(); |
35 | | |
36 | | /// Utility function to make a context with a specific hasher function set. |
37 | 39 | pub fn make_ctx_for_hash_func<H>(hasher: H) -> Result<Context, Error> |
38 | 39 | where |
39 | 39 | H: TryInto<DigestHasherFunc>, |
40 | 39 | H::Error: Into<Error>, |
41 | | { |
42 | 39 | let digest_hasher_func = hasher |
43 | 39 | .try_into() |
44 | 39 | .err_tip(|| "Could not convert into DigestHasherFunc")?0 ; |
45 | | |
46 | 39 | let new_ctx = Context::current_with_value(digest_hasher_func); |
47 | | |
48 | 39 | Ok(new_ctx) |
49 | 39 | } |
50 | | |
51 | | /// Get the default hasher. |
52 | 39 | pub fn default_digest_hasher_func() -> DigestHasherFunc { |
53 | 39 | *DEFAULT_DIGEST_HASHER_FUNC.get_or_init(|| DigestHasherFunc::Sha256) |
54 | 39 | } |
55 | | |
56 | | /// Sets the default hasher to use if no hasher was requested by the client. |
57 | 0 | pub fn set_default_digest_hasher_func(hasher: DigestHasherFunc) -> Result<(), Error> { |
58 | 0 | DEFAULT_DIGEST_HASHER_FUNC |
59 | 0 | .set(hasher) |
60 | 0 | .map_err(|_| make_err!(Code::Internal, "default_digest_hasher_func already set")) |
61 | 0 | } |
62 | | |
63 | | /// Supported digest hash functions. |
64 | | #[derive(Copy, Clone, Debug, Ord, PartialOrd, Eq, PartialEq, Hash, Serialize, Deserialize)] |
65 | | pub enum DigestHasherFunc { |
66 | | Sha256, |
67 | | Blake3, |
68 | | } |
69 | | |
70 | | impl MetricsComponent for DigestHasherFunc { |
71 | 0 | fn publish( |
72 | 0 | &self, |
73 | 0 | kind: MetricKind, |
74 | 0 | field_metadata: MetricFieldData, |
75 | 0 | ) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> { |
76 | 0 | format!("{self:?}").publish(kind, field_metadata) |
77 | 0 | } |
78 | | } |
79 | | |
80 | | impl DigestHasherFunc { |
81 | 5.12k | pub fn hasher(&self) -> DigestHasherImpl { |
82 | 5.12k | self.into() |
83 | 5.12k | } |
84 | | |
85 | | #[must_use] |
86 | 38 | pub const fn proto_digest_func(&self) -> ProtoDigestFunction { |
87 | 38 | match self { |
88 | 36 | Self::Sha256 => ProtoDigestFunction::Sha256, |
89 | 2 | Self::Blake3 => ProtoDigestFunction::Blake3, |
90 | | } |
91 | 38 | } |
92 | | } |
93 | | |
94 | | impl From<ConfigDigestHashFunction> for DigestHasherFunc { |
95 | 0 | fn from(value: ConfigDigestHashFunction) -> Self { |
96 | 0 | match value { |
97 | 0 | ConfigDigestHashFunction::Sha256 => Self::Sha256, |
98 | 0 | ConfigDigestHashFunction::Blake3 => Self::Blake3, |
99 | | } |
100 | 0 | } |
101 | | } |
102 | | |
103 | | impl TryFrom<ProtoDigestFunction> for DigestHasherFunc { |
104 | | type Error = Error; |
105 | | |
106 | 0 | fn try_from(value: ProtoDigestFunction) -> Result<Self, Self::Error> { |
107 | 0 | match value { |
108 | 0 | ProtoDigestFunction::Sha256 => Ok(Self::Sha256), |
109 | 0 | ProtoDigestFunction::Blake3 => Ok(Self::Blake3), |
110 | 0 | v => Err(make_input_err!( |
111 | 0 | "Unknown or unsupported digest function for proto conversion {v:?}" |
112 | 0 | )), |
113 | | } |
114 | 0 | } |
115 | | } |
116 | | |
117 | | impl TryFrom<&str> for DigestHasherFunc { |
118 | | type Error = Error; |
119 | | |
120 | 0 | fn try_from(value: &str) -> Result<Self, Self::Error> { |
121 | 0 | match value.to_uppercase().as_str() { |
122 | 0 | "SHA256" => Ok(Self::Sha256), |
123 | 0 | "BLAKE3" => Ok(Self::Blake3), |
124 | 0 | v => Err(make_input_err!( |
125 | 0 | "Unknown or unsupported digest function for string conversion: {v:?}" |
126 | 0 | )), |
127 | | } |
128 | 0 | } |
129 | | } |
130 | | |
131 | | impl core::fmt::Display for DigestHasherFunc { |
132 | 7 | fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { |
133 | 7 | match self { |
134 | 5 | Self::Sha256 => write!(f, "SHA256"), |
135 | 2 | Self::Blake3 => write!(f, "BLAKE3"), |
136 | | } |
137 | 7 | } |
138 | | } |
139 | | |
140 | | impl TryFrom<i32> for DigestHasherFunc { |
141 | | type Error = Error; |
142 | | |
143 | 39 | fn try_from(value: i32) -> Result<Self, Self::Error> { |
144 | | // Zero means not-set. |
145 | 39 | if value == 0 { Branch (145:12): [True: 17, False: 22]
Branch (145:12): [Folded - Ignored]
|
146 | 17 | return Ok(default_digest_hasher_func()); |
147 | 22 | } |
148 | 22 | match ProtoDigestFunction::try_from(value) { |
149 | 19 | Ok(ProtoDigestFunction::Sha256) => Ok(Self::Sha256), |
150 | 3 | Ok(ProtoDigestFunction::Blake3) => Ok(Self::Blake3), |
151 | 0 | value => Err(make_input_err!( |
152 | | "Unknown or unsupported digest function for int conversion: {:?}", |
153 | 0 | value.map(|v| v.as_str_name()) |
154 | | )), |
155 | | } |
156 | 39 | } |
157 | | } |
158 | | |
159 | | impl From<&DigestHasherFunc> for DigestHasherImpl { |
160 | 5.12k | fn from(value: &DigestHasherFunc) -> Self { |
161 | 5.12k | let hash_func_impl = match value { |
162 | 92 | DigestHasherFunc::Sha256 => DigestHasherFuncImpl::Sha256(Sha256::new()), |
163 | 5.03k | DigestHasherFunc::Blake3 => DigestHasherFuncImpl::Blake3(Box::default()), |
164 | | }; |
165 | 5.12k | Self { |
166 | 5.12k | hashed_size: 0, |
167 | 5.12k | hash_func_impl, |
168 | 5.12k | } |
169 | 5.12k | } |
170 | | } |
171 | | |
172 | | /// Wrapper to compute a hash of arbitrary data. |
173 | | pub trait DigestHasher { |
174 | | /// Update the hasher with some additional data. |
175 | | fn update(&mut self, input: &[u8]); |
176 | | |
177 | | /// Finalize the hash function and collect the results into a digest. |
178 | | fn finalize_digest(&mut self) -> DigestInfo; |
179 | | |
180 | | /// Specialized version of the hashing function that is optimized for |
181 | | /// handling files. These optimizations take into account things like, |
182 | | /// the file size and the hasher algorithm to decide how to best process |
183 | | /// the file and feed it into the hasher. |
184 | | fn digest_for_file( |
185 | | self, |
186 | | file_path: impl AsRef<std::path::Path>, |
187 | | file: fs::FileSlot, |
188 | | size_hint: Option<u64>, |
189 | | ) -> impl Future<Output = Result<(DigestInfo, fs::FileSlot), Error>>; |
190 | | |
191 | | /// Utility function to compute a hash from a generic reader. |
192 | 7 | fn compute_from_reader<R: AsyncRead + Unpin + Send>( |
193 | 7 | &mut self, |
194 | 7 | mut reader: R, |
195 | 7 | ) -> impl Future<Output = Result<DigestInfo, Error>> { |
196 | 7 | async move { |
197 | 7 | let mut chunk = BytesMut::with_capacity(fs::DEFAULT_READ_BUFF_SIZE); |
198 | | loop { |
199 | 12 | reader |
200 | 12 | .read_buf(&mut chunk) |
201 | 12 | .await |
202 | 12 | .err_tip(|| "Could not read chunk during compute_from_reader")?0 ; |
203 | 12 | if chunk.is_empty() { Branch (203:20): [Folded - Ignored]
Branch (203:20): [Folded - Ignored]
Branch (203:20): [True: 0, False: 0]
Branch (203:20): [True: 7, False: 5]
|
204 | 7 | break; // EOF. |
205 | 5 | } |
206 | 5 | DigestHasher::update(self, &chunk); |
207 | 5 | chunk.clear(); |
208 | | } |
209 | 7 | Ok(DigestHasher::finalize_digest(self)) |
210 | 7 | } |
211 | 7 | } |
212 | | } |
213 | | |
214 | | #[expect( |
215 | | variant_size_differences, |
216 | | reason = "some variants are already boxed; this is acceptable" |
217 | | )] |
218 | | #[derive(Debug)] |
219 | | pub enum DigestHasherFuncImpl { |
220 | | Sha256(Sha256), |
221 | | Blake3(Box<Blake3Hasher>), // Box because Blake3Hasher is 1.3kb in size. |
222 | | } |
223 | | |
224 | | /// The individual implementation of the hash function. |
225 | | #[derive(Debug)] |
226 | | pub struct DigestHasherImpl { |
227 | | hashed_size: u64, |
228 | | hash_func_impl: DigestHasherFuncImpl, |
229 | | } |
230 | | |
231 | | impl DigestHasherImpl { |
232 | | #[inline] |
233 | 7 | async fn hash_file( |
234 | 7 | &mut self, |
235 | 7 | mut file: fs::FileSlot, |
236 | 7 | ) -> Result<(DigestInfo, fs::FileSlot), Error> { |
237 | 7 | let digest = self |
238 | 7 | .compute_from_reader(&mut file) |
239 | 7 | .await |
240 | 7 | .err_tip(|| "In digest_for_file")?0 ; |
241 | 7 | Ok((digest, file)) |
242 | 7 | } |
243 | | } |
244 | | |
245 | | impl DigestHasher for DigestHasherImpl { |
246 | | #[inline] |
247 | 5.12k | fn update(&mut self, input: &[u8]) { |
248 | 5.12k | self.hashed_size += input.len() as u64; |
249 | 5.12k | match &mut self.hash_func_impl { |
250 | 90 | DigestHasherFuncImpl::Sha256(h) => sha2::digest::Update::update(h, input), |
251 | 5.03k | DigestHasherFuncImpl::Blake3(h) => { |
252 | 5.03k | Blake3Hasher::update(h, input); |
253 | 5.03k | } |
254 | | } |
255 | 5.12k | } |
256 | | |
257 | | #[inline] |
258 | 5.12k | fn finalize_digest(&mut self) -> DigestInfo { |
259 | 5.12k | let hash = match &mut self.hash_func_impl { |
260 | 92 | DigestHasherFuncImpl::Sha256(h) => h.finalize_reset().into(), |
261 | 5.03k | DigestHasherFuncImpl::Blake3(h) => h.finalize().into(), |
262 | | }; |
263 | 5.12k | DigestInfo::new(hash, self.hashed_size) |
264 | 5.12k | } |
265 | | |
266 | 7 | async fn digest_for_file( |
267 | 7 | mut self, |
268 | 7 | file_path: impl AsRef<std::path::Path>, |
269 | 7 | mut file: fs::FileSlot, |
270 | 7 | size_hint: Option<u64>, |
271 | 7 | ) -> Result<(DigestInfo, fs::FileSlot), Error> { |
272 | 7 | let file_position = file |
273 | 7 | .stream_position() |
274 | 7 | .await |
275 | 7 | .err_tip(|| "Couldn't get stream position in digest_for_file")?0 ; |
276 | 7 | if file_position != 0 { Branch (276:12): [Folded - Ignored]
Branch (276:12): [Folded - Ignored]
Branch (276:12): [True: 0, False: 3]
Branch (276:12): [True: 0, False: 4]
|
277 | 0 | return self.hash_file(file).await; |
278 | 7 | } |
279 | | // If we are a small file, it's faster to just do it the "slow" way. |
280 | | // Great read: https://github.com/david-slatinek/c-read-vs.-mmap |
281 | 7 | if let Some(size_hint) = size_hint { Branch (281:16): [Folded - Ignored]
Branch (281:16): [Folded - Ignored]
Branch (281:16): [True: 3, False: 0]
Branch (281:16): [True: 4, False: 0]
|
282 | 7 | if size_hint <= fs::DEFAULT_READ_BUFF_SIZE as u64 { Branch (282:16): [Folded - Ignored]
Branch (282:16): [Folded - Ignored]
Branch (282:16): [True: 3, False: 0]
Branch (282:16): [True: 4, False: 0]
|
283 | 7 | return self.hash_file(file).await; |
284 | 0 | } |
285 | 0 | } |
286 | 0 | let file_path = file_path.as_ref().to_path_buf(); |
287 | 0 | match self.hash_func_impl { |
288 | 0 | DigestHasherFuncImpl::Sha256(_) => self.hash_file(file).await, |
289 | 0 | DigestHasherFuncImpl::Blake3(mut hasher) => { |
290 | 0 | spawn_blocking!("digest_for_file", move || { |
291 | 0 | hasher.update_mmap(file_path).map_err(|e| { |
292 | 0 | make_err!(Code::Internal, "Error in blake3's update_mmap: {e:?}") |
293 | 0 | })?; |
294 | 0 | Result::<_, Error>::Ok(( |
295 | 0 | DigestInfo::new(hasher.finalize().into(), hasher.count()), |
296 | 0 | file, |
297 | 0 | )) |
298 | 0 | }) |
299 | 0 | .await |
300 | 0 | .err_tip(|| "Could not spawn blocking task in digest_for_file")? |
301 | | } |
302 | | } |
303 | 7 | } |
304 | | } |