/build/source/nativelink-store/src/dedup_store.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // See LICENSE file for details |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use core::cmp; |
16 | | use core::pin::Pin; |
17 | | use std::sync::Arc; |
18 | | |
19 | | use async_trait::async_trait; |
20 | | use futures::stream::{self, FuturesOrdered, StreamExt, TryStreamExt}; |
21 | | use nativelink_config::stores::DedupSpec; |
22 | | use nativelink_error::{Code, Error, ResultExt, make_err}; |
23 | | use nativelink_metric::MetricsComponent; |
24 | | use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; |
25 | | use nativelink_util::common::DigestInfo; |
26 | | use nativelink_util::fastcdc::FastCDC; |
27 | | use nativelink_util::health_utils::{HealthStatusIndicator, default_health_status_indicator}; |
28 | | use nativelink_util::store_trait::{ |
29 | | RemoveItemCallback, Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo, |
30 | | }; |
31 | | use serde::{Deserialize, Serialize}; |
32 | | use tokio_util::codec::FramedRead; |
33 | | use tokio_util::io::StreamReader; |
34 | | use tracing::warn; |
35 | | |
36 | | use crate::cas_utils::is_zero_digest; |
37 | | use crate::compression_store::WincodeConfig; |
38 | | |
39 | | // NOTE: If these change update the comments in `stores.rs` to reflect |
40 | | // the new defaults. |
41 | | const DEFAULT_MIN_SIZE: u64 = 64 * 1024; |
42 | | const DEFAULT_NORM_SIZE: u64 = 256 * 1024; |
43 | | const DEFAULT_MAX_SIZE: u64 = 512 * 1024; |
44 | | const DEFAULT_MAX_CONCURRENT_FETCH_PER_GET: usize = 10; |
45 | | |
46 | | #[derive( |
47 | | Serialize, |
48 | | Deserialize, |
49 | | PartialEq, |
50 | | Eq, |
51 | | Debug, |
52 | | Default, |
53 | | Clone, |
54 | | wincode::SchemaRead, |
55 | 0 | wincode::SchemaWrite, |
56 | | )] |
57 | | pub struct DedupIndex { |
58 | | pub entries: Vec<DigestInfo>, |
59 | | } |
60 | | |
61 | | #[derive(MetricsComponent)] |
62 | | pub struct DedupStore { |
63 | | #[metric(group = "index_store")] |
64 | | index_store: Store, |
65 | | #[metric(group = "content_store")] |
66 | | content_store: Store, |
67 | | fast_cdc_decoder: FastCDC, |
68 | | #[metric(help = "Maximum number of concurrent fetches per get")] |
69 | | max_concurrent_fetch_per_get: usize, |
70 | | wincode_config: WincodeConfig, |
71 | | } |
72 | | |
73 | | impl core::fmt::Debug for DedupStore { |
74 | 0 | fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { |
75 | 0 | f.debug_struct("DedupStore") |
76 | 0 | .field("index_store", &self.index_store) |
77 | 0 | .field("content_store", &self.content_store) |
78 | 0 | .field("fast_cdc_decoder", &self.fast_cdc_decoder) |
79 | 0 | .field( |
80 | 0 | "max_concurrent_fetch_per_get", |
81 | 0 | &self.max_concurrent_fetch_per_get, |
82 | 0 | ) |
83 | 0 | .finish_non_exhaustive() |
84 | 0 | } |
85 | | } |
86 | | |
87 | | impl DedupStore { |
88 | 8 | pub fn new( |
89 | 8 | spec: &DedupSpec, |
90 | 8 | index_store: Store, |
91 | 8 | content_store: Store, |
92 | 8 | ) -> Result<Arc<Self>, Error> { |
93 | 8 | let min_size = if spec.min_size == 0 { |
94 | 0 | DEFAULT_MIN_SIZE |
95 | | } else { |
96 | 8 | u64::from(spec.min_size) |
97 | | }; |
98 | 8 | let normal_size = if spec.normal_size == 0 { |
99 | 0 | DEFAULT_NORM_SIZE |
100 | | } else { |
101 | 8 | u64::from(spec.normal_size) |
102 | | }; |
103 | 8 | let max_size = if spec.max_size == 0 { |
104 | 0 | DEFAULT_MAX_SIZE |
105 | | } else { |
106 | 8 | u64::from(spec.max_size) |
107 | | }; |
108 | 8 | let max_concurrent_fetch_per_get = if spec.max_concurrent_fetch_per_get == 0 { |
109 | 0 | DEFAULT_MAX_CONCURRENT_FETCH_PER_GET |
110 | | } else { |
111 | 8 | spec.max_concurrent_fetch_per_get as usize |
112 | | }; |
113 | 8 | Ok(Arc::new(Self { |
114 | 8 | index_store, |
115 | 8 | content_store, |
116 | 8 | fast_cdc_decoder: FastCDC::new( |
117 | 8 | usize::try_from(min_size).err_tip(|| "Could not convert min_size to usize")?0 , |
118 | 8 | usize::try_from(normal_size) |
119 | 8 | .err_tip(|| "Could not convert normal_size to usize")?0 , |
120 | 8 | usize::try_from(max_size).err_tip(|| "Could not convert max_size to usize")?0 , |
121 | | ), |
122 | 8 | max_concurrent_fetch_per_get, |
123 | 8 | wincode_config: WincodeConfig::new(), |
124 | | })) |
125 | 8 | } |
126 | | |
127 | 4 | async fn has(self: Pin<&Self>, key: StoreKey<'_>) -> Result<Option<u64>, Error> { |
128 | | // First we need to load the index that contains where the individual parts actually |
129 | | // can be fetched from. |
130 | 3 | let index_entries = { |
131 | 4 | let maybe_data = self |
132 | 4 | .index_store |
133 | 4 | .get_part_unchunked(key.borrow(), 0, None) |
134 | 4 | .await |
135 | 4 | .err_tip(|| "Failed to read index store in dedup store"); |
136 | 4 | let data3 = match maybe_data { |
137 | 1 | Err(e) => { |
138 | 1 | if e.code == Code::NotFound { |
139 | 1 | return Ok(None); |
140 | 0 | } |
141 | 0 | return Err(e); |
142 | | } |
143 | 3 | Ok(data) => data, |
144 | | }; |
145 | | |
146 | 3 | match wincode::config::deserialize::<DedupIndex, WincodeConfig>( |
147 | 3 | &data, |
148 | 3 | self.wincode_config, |
149 | 3 | ) { |
150 | 3 | Ok(dedup_index) => dedup_index, |
151 | 0 | Err(err) => { |
152 | 0 | warn!(?key, ?err, "Failed to deserialize index in dedup store",); |
153 | | // We return the equivalent of NotFound here so the client is happy. |
154 | 0 | return Ok(None); |
155 | | } |
156 | | } |
157 | | }; |
158 | | |
159 | 3 | let digests: Vec<_> = index_entries |
160 | 3 | .entries |
161 | 3 | .into_iter() |
162 | 3 | .map(StoreKey::Digest) |
163 | 3 | .collect(); |
164 | 3 | let mut sum = 0; |
165 | 8 | for size in self.content_store3 .has_many3 (&digests).await?0 { |
166 | 8 | let Some(size7 ) = size else { |
167 | | // A part is missing so return None meaning not-found. |
168 | | // This will abort all in-flight queries related to this request. |
169 | 1 | return Ok(None); |
170 | | }; |
171 | 7 | sum += size; |
172 | | } |
173 | 2 | Ok(Some(sum)) |
174 | 4 | } |
175 | | } |
176 | | |
177 | | #[async_trait] |
178 | | impl StoreDriver for DedupStore { |
179 | | async fn has_with_results( |
180 | | self: Pin<&Self>, |
181 | | digests: &[StoreKey<'_>], |
182 | | results: &mut [Option<u64>], |
183 | 5 | ) -> Result<(), Error> { |
184 | | digests |
185 | | .iter() |
186 | | .zip(results.iter_mut()) |
187 | 5 | .map(|(key, result)| async move { |
188 | 5 | if is_zero_digest(key.borrow()) { |
189 | 1 | *result = Some(0); |
190 | 1 | return Ok(()); |
191 | 4 | } |
192 | | |
193 | 4 | match self.has(key.borrow()).await { |
194 | 4 | Ok(maybe_size) => { |
195 | 4 | *result = maybe_size; |
196 | 4 | Ok(()) |
197 | | } |
198 | 0 | Err(err) => Err(err), |
199 | | } |
200 | 10 | }) |
201 | | .collect::<FuturesOrdered<_>>() |
202 | | .try_collect() |
203 | | .await |
204 | 5 | } |
205 | | |
206 | | async fn update( |
207 | | self: Pin<&Self>, |
208 | | key: StoreKey<'_>, |
209 | | reader: DropCloserReadHalf, |
210 | | _size_info: UploadSizeInfo, |
211 | 7 | ) -> Result<(), Error> { |
212 | | let mut bytes_reader = StreamReader::new(reader); |
213 | | let frame_reader = FramedRead::new(&mut bytes_reader, self.fast_cdc_decoder.clone()); |
214 | | let index_entries = frame_reader |
215 | 83 | .map(|r| r.err_tip(|| "Failed to decode frame from fast_cdc")) |
216 | 83 | .map_ok(|frame| async move { |
217 | 83 | let hash = blake3::hash(&frame[..]).into(); |
218 | 83 | let index_entry = DigestInfo::new(hash, frame.len() as u64); |
219 | 83 | if self |
220 | 83 | .content_store |
221 | 83 | .has(index_entry) |
222 | 83 | .await |
223 | 83 | .err_tip(|| "Failed to call .has() in DedupStore::update()")?0 |
224 | 83 | .is_some() |
225 | | { |
226 | | // If our store has this digest, we don't need to upload it. |
227 | 0 | return Result::<_, Error>::Ok(index_entry); |
228 | 83 | } |
229 | 83 | self.content_store |
230 | 83 | .update_oneshot(index_entry, frame) |
231 | 83 | .await |
232 | 83 | .err_tip(|| "Failed to update content store in dedup_store")?0 ; |
233 | 83 | Ok(index_entry) |
234 | 166 | }) |
235 | | .try_buffered(self.max_concurrent_fetch_per_get) |
236 | | .try_collect() |
237 | | .await?; |
238 | | |
239 | | let serialized_index = wincode::config::serialize( |
240 | | &DedupIndex { |
241 | | entries: index_entries, |
242 | | }, |
243 | | self.wincode_config, |
244 | | ) |
245 | 0 | .map_err(|e| { |
246 | 0 | make_err!( |
247 | 0 | Code::Internal, |
248 | | "Failed to serialize index in dedup_store : {:?}", |
249 | | e |
250 | | ) |
251 | 0 | })?; |
252 | | |
253 | | self.index_store |
254 | | .update_oneshot(key, serialized_index.into()) |
255 | | .await |
256 | | .err_tip(|| "Failed to insert our index entry to index_store in dedup_store")?; |
257 | | |
258 | | Ok(()) |
259 | 7 | } |
260 | | |
261 | | async fn get_part( |
262 | | self: Pin<&Self>, |
263 | | key: StoreKey<'_>, |
264 | | writer: &mut DropCloserWriteHalf, |
265 | | offset: u64, |
266 | | length: Option<u64>, |
267 | 935 | ) -> Result<(), Error> { |
268 | | // Special case for if a client tries to read zero bytes. |
269 | | if length == Some(0) { |
270 | | writer |
271 | | .send_eof() |
272 | | .err_tip(|| "Failed to write EOF out from get_part dedup")?; |
273 | | return Ok(()); |
274 | | } |
275 | | // First we need to download the index that contains where the individual parts actually |
276 | | // can be fetched from. |
277 | | let index_entries = { |
278 | | let data = self |
279 | | .index_store |
280 | | .get_part_unchunked(key, 0, None) |
281 | | .await |
282 | | .err_tip(|| "Failed to read index store in dedup store")?; |
283 | | wincode::config::deserialize::<DedupIndex, WincodeConfig>(&data, self.wincode_config) |
284 | 0 | .map_err(|e| { |
285 | 0 | make_err!( |
286 | 0 | Code::Internal, |
287 | | "Failed to deserialize index in dedup_store::get_part : {:?}", |
288 | | e |
289 | | ) |
290 | 0 | })? |
291 | | }; |
292 | | |
293 | | let mut start_byte_in_stream: u64 = 0; |
294 | | let entries = { |
295 | | if offset == 0 && length.is_none() { |
296 | | index_entries.entries |
297 | | } else { |
298 | | let mut current_entries_sum = 0; |
299 | | let mut entries = Vec::with_capacity(index_entries.entries.len()); |
300 | | for entry in index_entries.entries { |
301 | | let first_byte = current_entries_sum; |
302 | | let entry_size = entry.size_bytes(); |
303 | | current_entries_sum += entry_size; |
304 | | // Filter any items who's end byte is before the first requested byte. |
305 | | if current_entries_sum <= offset { |
306 | | start_byte_in_stream = current_entries_sum; |
307 | | continue; |
308 | | } |
309 | | // If we are not going to read any bytes past the length we are done. |
310 | | if let Some(length) = length |
311 | | && first_byte >= offset + length |
312 | | { |
313 | | break; |
314 | | } |
315 | | entries.push(entry); |
316 | | } |
317 | | entries |
318 | | } |
319 | | }; |
320 | | |
321 | | // Second we we create a stream of futures for each chunk, but buffer/limit them so only |
322 | | // `max_concurrent_fetch_per_get` will be executed at a time. |
323 | | // The results will be streamed out in the same order they are in the entries table. |
324 | | // The results will execute in a "window-like" fashion, meaning that if we limit to |
325 | | // 5 requests at a time, and request 3 is stalled, request 1 & 2 can be output and |
326 | | // request 4 & 5 can be executing (or finished) while waiting for 3 to finish. |
327 | | // Note: We will buffer our data here up to: |
328 | | // `spec.max_size * spec.max_concurrent_fetch_per_get` per `get_part()` request. |
329 | | let mut entries_stream = stream::iter(entries) |
330 | 2.11k | .map(move |index_entry| async move { |
331 | 2.11k | let data2.11k = self |
332 | 2.11k | .content_store |
333 | 2.11k | .get_part_unchunked(index_entry, 0, None) |
334 | 2.11k | .await |
335 | 2.11k | .err_tip(|| "Failed to get_part in content_store in dedup_store")?1 ; |
336 | | |
337 | 2.11k | Result::<_, Error>::Ok(data) |
338 | 4.22k | }) |
339 | | .buffered(self.max_concurrent_fetch_per_get); |
340 | | |
341 | | // Stream out the buffered data one at a time and write the data to our writer stream. |
342 | | // In the event any of these error, we will abort early and abandon all the rest of the |
343 | | // streamed data. |
344 | | // Note: Need to take special care to ensure we send the proper slice of data requested. |
345 | | let mut bytes_to_skip = usize::try_from(offset - start_byte_in_stream) |
346 | | .err_tip(|| "Could not convert (offset - start_byte_in_stream) to usize")?; |
347 | | let mut bytes_to_send = usize::try_from(length.unwrap_or(u64::MAX - offset)) |
348 | | .err_tip(|| "Could not convert length to usize")?; |
349 | | while let Some(result) = entries_stream.next().await { |
350 | | let mut data = result.err_tip(|| "Inner store iterator closed early in DedupStore")?; |
351 | | assert!( |
352 | | bytes_to_skip <= data.len(), |
353 | | "Formula above must be wrong, {} > {}", |
354 | | bytes_to_skip, |
355 | | data.len() |
356 | | ); |
357 | | let end_pos = cmp::min(data.len(), bytes_to_send + bytes_to_skip); |
358 | | if bytes_to_skip != 0 || data.len() > bytes_to_send { |
359 | | data = data.slice(bytes_to_skip..end_pos); |
360 | | } |
361 | | writer |
362 | | .send(data) |
363 | | .await |
364 | | .err_tip(|| "Failed to write data to get_part dedup")?; |
365 | | bytes_to_send -= end_pos - bytes_to_skip; |
366 | | bytes_to_skip = 0; |
367 | | } |
368 | | |
369 | | // Finish our stream by writing our EOF and shutdown the stream. |
370 | | writer |
371 | | .send_eof() |
372 | | .err_tip(|| "Failed to write EOF out from get_part dedup")?; |
373 | | Ok(()) |
374 | 935 | } |
375 | | |
376 | 0 | fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver { |
377 | 0 | self |
378 | 0 | } |
379 | | |
380 | 0 | fn as_any<'a>(&'a self) -> &'a (dyn core::any::Any + Sync + Send + 'static) { |
381 | 0 | self |
382 | 0 | } |
383 | | |
384 | 0 | fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> { |
385 | 0 | self |
386 | 0 | } |
387 | | |
388 | 0 | fn register_remove_callback( |
389 | 0 | self: Arc<Self>, |
390 | 0 | callback: Arc<dyn RemoveItemCallback>, |
391 | 0 | ) -> Result<(), Error> { |
392 | 0 | self.index_store |
393 | 0 | .register_remove_callback(callback.clone())?; |
394 | 0 | self.content_store.register_remove_callback(callback)?; |
395 | 0 | Ok(()) |
396 | 0 | } |
397 | | } |
398 | | |
399 | | default_health_status_indicator!(DedupStore); |