/build/source/nativelink-store/src/dedup_store.rs
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use std::cmp; |
16 | | use std::pin::Pin; |
17 | | use std::sync::Arc; |
18 | | |
19 | | use async_trait::async_trait; |
20 | | use bincode::config::{FixintEncoding, WithOtherIntEncoding}; |
21 | | use bincode::{DefaultOptions, Options}; |
22 | | use futures::stream::{self, FuturesOrdered, StreamExt, TryStreamExt}; |
23 | | use nativelink_error::{make_err, Code, Error, ResultExt}; |
24 | | use nativelink_metric::MetricsComponent; |
25 | | use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; |
26 | | use nativelink_util::common::DigestInfo; |
27 | | use nativelink_util::fastcdc::FastCDC; |
28 | | use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; |
29 | | use nativelink_util::store_trait::{Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo}; |
30 | | use serde::{Deserialize, Serialize}; |
31 | | use tokio_util::codec::FramedRead; |
32 | | use tokio_util::io::StreamReader; |
33 | | use tracing::{event, Level}; |
34 | | |
35 | | use crate::cas_utils::is_zero_digest; |
36 | | |
37 | | // NOTE: If these change update the comments in `stores.rs` to reflect |
38 | | // the new defaults. |
39 | | const DEFAULT_MIN_SIZE: u64 = 64 * 1024; |
40 | | const DEFAULT_NORM_SIZE: u64 = 256 * 1024; |
41 | | const DEFAULT_MAX_SIZE: u64 = 512 * 1024; |
42 | | const DEFAULT_MAX_CONCURRENT_FETCH_PER_GET: usize = 10; |
43 | | |
44 | 908 | #[derive(Serialize, Deserialize, PartialEq, Debug, Default, Clone)] |
45 | | pub struct DedupIndex { |
46 | | pub entries: Vec<DigestInfo>, |
47 | | } |
48 | | |
49 | 0 | #[derive(MetricsComponent)] |
50 | | pub struct DedupStore { |
51 | | #[metric(group = "index_store")] |
52 | | index_store: Store, |
53 | | #[metric(group = "content_store")] |
54 | | content_store: Store, |
55 | | fast_cdc_decoder: FastCDC, |
56 | | #[metric(help = "Maximum number of concurrent fetches per get")] |
57 | | max_concurrent_fetch_per_get: usize, |
58 | | bincode_options: WithOtherIntEncoding<DefaultOptions, FixintEncoding>, |
59 | | } |
60 | | |
61 | | impl DedupStore { |
62 | 8 | pub fn new( |
63 | 8 | config: &nativelink_config::stores::DedupStore, |
64 | 8 | index_store: Store, |
65 | 8 | content_store: Store, |
66 | 8 | ) -> Result<Arc<Self>, Error> { |
67 | 8 | let min_size = if config.min_size == 0 { Branch (67:27): [True: 0, False: 8]
Branch (67:27): [Folded - Ignored]
|
68 | 0 | DEFAULT_MIN_SIZE |
69 | | } else { |
70 | 8 | config.min_size as u64 |
71 | | }; |
72 | 8 | let normal_size = if config.normal_size == 0 { Branch (72:30): [True: 0, False: 8]
Branch (72:30): [Folded - Ignored]
|
73 | 0 | DEFAULT_NORM_SIZE |
74 | | } else { |
75 | 8 | config.normal_size as u64 |
76 | | }; |
77 | 8 | let max_size = if config.max_size == 0 { Branch (77:27): [True: 0, False: 8]
Branch (77:27): [Folded - Ignored]
|
78 | 0 | DEFAULT_MAX_SIZE |
79 | | } else { |
80 | 8 | config.max_size as u64 |
81 | | }; |
82 | 8 | let max_concurrent_fetch_per_get = if config.max_concurrent_fetch_per_get == 0 { Branch (82:47): [True: 0, False: 8]
Branch (82:47): [Folded - Ignored]
|
83 | 0 | DEFAULT_MAX_CONCURRENT_FETCH_PER_GET |
84 | | } else { |
85 | 8 | config.max_concurrent_fetch_per_get as usize |
86 | | }; |
87 | | Ok(Arc::new(Self { |
88 | 8 | index_store, |
89 | 8 | content_store, |
90 | 8 | fast_cdc_decoder: FastCDC::new( |
91 | 8 | usize::try_from(min_size).err_tip(|| "Could not convert min_size to usize"0 )?0 , |
92 | 8 | usize::try_from(normal_size) |
93 | 8 | .err_tip(|| "Could not convert normal_size to usize"0 )?0 , |
94 | 8 | usize::try_from(max_size).err_tip(|| "Could not convert max_size to usize"0 )?0 , |
95 | | ), |
96 | 8 | max_concurrent_fetch_per_get, |
97 | 8 | bincode_options: DefaultOptions::new().with_fixint_encoding(), |
98 | | })) |
99 | 8 | } |
100 | | |
101 | 4 | async fn has(self: Pin<&Self>, key: StoreKey<'_>) -> Result<Option<u64>, Error> { |
102 | | // First we need to load the index that contains where the individual parts actually |
103 | | // can be fetched from. |
104 | 3 | let index_entries = { |
105 | 4 | let maybe_data = self |
106 | 4 | .index_store |
107 | 4 | .get_part_unchunked(key.borrow(), 0, None) |
108 | 4 | .await |
109 | 4 | .err_tip(|| "Failed to read index store in dedup store"1 ); |
110 | 4 | let data3 = match maybe_data { |
111 | 1 | Err(e) => { |
112 | 1 | if e.code == Code::NotFound { Branch (112:24): [True: 1, False: 0]
Branch (112:24): [Folded - Ignored]
|
113 | 1 | return Ok(None); |
114 | 0 | } |
115 | 0 | return Err(e); |
116 | | } |
117 | 3 | Ok(data) => data, |
118 | 3 | }; |
119 | 3 | |
120 | 3 | match self.bincode_options.deserialize::<DedupIndex>(&data) { |
121 | 0 | Err(err) => { |
122 | 0 | event!( |
123 | 0 | Level::WARN, |
124 | | ?key, |
125 | | ?err, |
126 | 0 | "Failed to deserialize index in dedup store", |
127 | | ); |
128 | | // We return the equivalent of NotFound here so the client is happy. |
129 | 0 | return Ok(None); |
130 | | } |
131 | 3 | Ok(v) => v, |
132 | 3 | } |
133 | 3 | }; |
134 | 3 | |
135 | 3 | let digests: Vec<_> = index_entries |
136 | 3 | .entries |
137 | 3 | .into_iter() |
138 | 3 | .map(StoreKey::Digest) |
139 | 3 | .collect(); |
140 | 3 | let mut sum = 0; |
141 | 12 | for size in self.content_store.has_many(&digests)3 .await0 ?0 { |
142 | 12 | let Some(size11 ) = size else { Branch (142:17): [True: 11, False: 1]
Branch (142:17): [Folded - Ignored]
|
143 | | // A part is missing so return None meaning not-found. |
144 | | // This will abort all in-flight queries related to this request. |
145 | 1 | return Ok(None); |
146 | | }; |
147 | 11 | sum += size; |
148 | | } |
149 | 2 | Ok(Some(sum)) |
150 | 4 | } |
151 | | } |
152 | | |
153 | | #[async_trait] |
154 | | impl StoreDriver for DedupStore { |
155 | | async fn has_with_results( |
156 | | self: Pin<&Self>, |
157 | | digests: &[StoreKey<'_>], |
158 | | results: &mut [Option<u64>], |
159 | 5 | ) -> Result<(), Error> { |
160 | 5 | digests |
161 | 5 | .iter() |
162 | 5 | .zip(results.iter_mut()) |
163 | 5 | .map(|(key, result)| async move { |
164 | 5 | if is_zero_digest(key.borrow()) { Branch (164:20): [True: 1, False: 4]
Branch (164:20): [Folded - Ignored]
|
165 | 1 | *result = Some(0); |
166 | 1 | return Ok(()); |
167 | 4 | } |
168 | 4 | |
169 | 4 | match self.has(key.borrow()).await { |
170 | 4 | Ok(maybe_size) => { |
171 | 4 | *result = maybe_size; |
172 | 4 | Ok(()) |
173 | | } |
174 | 0 | Err(err) => Err(err), |
175 | | } |
176 | 10 | }) |
177 | 5 | .collect::<FuturesOrdered<_>>() |
178 | 5 | .try_collect() |
179 | 4 | .await |
180 | 10 | } |
181 | | |
182 | | async fn update( |
183 | | self: Pin<&Self>, |
184 | | key: StoreKey<'_>, |
185 | | reader: DropCloserReadHalf, |
186 | | _size_info: UploadSizeInfo, |
187 | 7 | ) -> Result<(), Error> { |
188 | 7 | let mut bytes_reader = StreamReader::new(reader); |
189 | 7 | let frame_reader = FramedRead::new(&mut bytes_reader, self.fast_cdc_decoder.clone()); |
190 | 7 | let index_entries = frame_reader |
191 | 97 | .map(|r| r.err_tip(|| "Failed to decode frame from fast_cdc"0 )) |
192 | 97 | .map_ok(|frame| async move { |
193 | 97 | let hash = blake3::hash(&frame[..]).into(); |
194 | 97 | let index_entry = DigestInfo::new(hash, frame.len() as u64); |
195 | 97 | if self Branch (195:20): [True: 0, False: 97]
Branch (195:20): [Folded - Ignored]
|
196 | 97 | .content_store |
197 | 97 | .has(index_entry) |
198 | 0 | .await |
199 | 97 | .err_tip(|| "Failed to call .has() in DedupStore::update()"0 )?0 |
200 | 97 | .is_some() |
201 | | { |
202 | | // If our store has this digest, we don't need to upload it. |
203 | 0 | return Result::<_, Error>::Ok(index_entry); |
204 | 97 | } |
205 | 97 | self.content_store |
206 | 97 | .update_oneshot(index_entry, frame) |
207 | 0 | .await |
208 | 97 | .err_tip(|| "Failed to update content store in dedup_store"0 )?0 ; |
209 | 97 | Ok(index_entry) |
210 | 194 | }) |
211 | 7 | .try_buffered(self.max_concurrent_fetch_per_get) |
212 | 7 | .try_collect() |
213 | 0 | .await?; |
214 | | |
215 | 7 | let serialized_index = self |
216 | 7 | .bincode_options |
217 | 7 | .serialize(&DedupIndex { |
218 | 7 | entries: index_entries, |
219 | 7 | }) |
220 | 7 | .map_err(|e| { |
221 | 0 | make_err!( |
222 | 0 | Code::Internal, |
223 | 0 | "Failed to serialize index in dedup_store : {:?}", |
224 | 0 | e |
225 | 0 | ) |
226 | 7 | })?0 ; |
227 | | |
228 | 7 | self.index_store |
229 | 7 | .update_oneshot(key, serialized_index.into()) |
230 | 0 | .await |
231 | 7 | .err_tip(|| "Failed to insert our index entry to index_store in dedup_store"0 )?0 ; |
232 | | |
233 | 7 | Ok(()) |
234 | 14 | } |
235 | | |
236 | | async fn get_part( |
237 | | self: Pin<&Self>, |
238 | | key: StoreKey<'_>, |
239 | | writer: &mut DropCloserWriteHalf, |
240 | | offset: u64, |
241 | | length: Option<u64>, |
242 | 935 | ) -> Result<(), Error> { |
243 | | // Special case for if a client tries to read zero bytes. |
244 | 935 | if length == Some(0) { Branch (244:12): [True: 30, False: 905]
Branch (244:12): [Folded - Ignored]
|
245 | 30 | writer |
246 | 30 | .send_eof() |
247 | 30 | .err_tip(|| "Failed to write EOF out from get_part dedup"0 )?0 ; |
248 | 30 | return Ok(()); |
249 | 905 | } |
250 | | // First we need to download the index that contains where the individual parts actually |
251 | | // can be fetched from. |
252 | 905 | let index_entries = { |
253 | 905 | let data = self |
254 | 905 | .index_store |
255 | 905 | .get_part_unchunked(key, 0, None) |
256 | 905 | .await |
257 | 905 | .err_tip(|| "Failed to read index store in dedup store"0 )?0 ; |
258 | | |
259 | 905 | self.bincode_options |
260 | 905 | .deserialize::<DedupIndex>(&data) |
261 | 905 | .map_err(|e| { |
262 | 0 | make_err!( |
263 | 0 | Code::Internal, |
264 | 0 | "Failed to deserialize index in dedup_store::get_part : {:?}", |
265 | 0 | e |
266 | 0 | ) |
267 | 905 | })?0 |
268 | | }; |
269 | | |
270 | 905 | let mut start_byte_in_stream: u64 = 0; |
271 | 905 | let entries = { |
272 | 905 | if offset == 0 && length.is_none()31 { Branch (272:16): [True: 31, False: 874]
Branch (272:31): [True: 2, False: 29]
Branch (272:16): [Folded - Ignored]
Branch (272:31): [Folded - Ignored]
|
273 | 2 | index_entries.entries |
274 | | } else { |
275 | 903 | let mut current_entries_sum = 0; |
276 | 903 | let mut entries = Vec::with_capacity(index_entries.entries.len()); |
277 | 4.65k | for entry4.15k in index_entries.entries { |
278 | 4.15k | let first_byte = current_entries_sum; |
279 | 4.15k | let entry_size = entry.size_bytes(); |
280 | 4.15k | current_entries_sum += entry_size; |
281 | 4.15k | // Filter any items who's end byte is before the first requested byte. |
282 | 4.15k | if current_entries_sum <= offset { Branch (282:24): [True: 1.60k, False: 2.54k]
Branch (282:24): [Folded - Ignored]
|
283 | 1.60k | start_byte_in_stream = current_entries_sum; |
284 | 1.60k | continue; |
285 | 2.54k | } |
286 | | // If we are not going to read any bytes past the length we are done. |
287 | 2.54k | if let Some(length2.54k ) = length { Branch (287:28): [True: 2.54k, False: 8]
Branch (287:28): [Folded - Ignored]
|
288 | 2.54k | if first_byte >= offset + length { Branch (288:28): [True: 407, False: 2.13k]
Branch (288:28): [Folded - Ignored]
|
289 | 407 | break; |
290 | 2.13k | } |
291 | 8 | } |
292 | 2.14k | entries.push(entry); |
293 | | } |
294 | 903 | entries |
295 | | } |
296 | | }; |
297 | | |
298 | | // Second we we create a stream of futures for each chunk, but buffer/limit them so only |
299 | | // `max_concurrent_fetch_per_get` will be executed at a time. |
300 | | // The results will be streamed out in the same order they are in the entries table. |
301 | | // The results will execute in a "window-like" fashion, meaning that if we limit to |
302 | | // 5 requests at a time, and request 3 is stalled, request 1 & 2 can be output and |
303 | | // request 4 & 5 can be executing (or finished) while waiting for 3 to finish. |
304 | | // Note: We will buffer our data here up to: |
305 | | // `config.max_size * config.max_concurrent_fetch_per_get` per `get_part()` request. |
306 | 905 | let mut entries_stream = stream::iter(entries) |
307 | 2.20k | .map(move |index_entry| async move { |
308 | 2.20k | let data2.20k = self |
309 | 2.20k | .content_store |
310 | 2.20k | .get_part_unchunked(index_entry, 0, None) |
311 | 2.20k | .await |
312 | 2.20k | .err_tip(|| "Failed to get_part in content_store in dedup_store"1 )?1 ; |
313 | | |
314 | 2.20k | Result::<_, Error>::Ok(data) |
315 | 4.41k | }) |
316 | 905 | .buffered(self.max_concurrent_fetch_per_get); |
317 | | |
318 | | // Stream out the buffered data one at a time and write the data to our writer stream. |
319 | | // In the event any of these error, we will abort early and abandon all the rest of the |
320 | | // streamed data. |
321 | | // Note: Need to take special care to ensure we send the proper slice of data requested. |
322 | 905 | let mut bytes_to_skip = usize::try_from(offset - start_byte_in_stream) |
323 | 905 | .err_tip(|| "Could not convert (offset - start_byte_in_stream) to usize"0 )?0 ; |
324 | 905 | let mut bytes_to_send = usize::try_from(length.unwrap_or(u64::MAX - offset)) |
325 | 905 | .err_tip(|| "Could not convert length to usize"0 )?0 ; |
326 | 3.11k | while let Some(result2.20k ) = entries_stream.next().await1.06k { Branch (326:19): [True: 2.20k, False: 904]
Branch (326:19): [Folded - Ignored]
|
327 | 2.20k | let mut data2.20k = result.err_tip(|| "Inner store iterator closed early in DedupStore"1 )?1 ; |
328 | 2.20k | assert!( |
329 | 2.20k | bytes_to_skip <= data.len(), |
330 | 0 | "Formula above must be wrong, {} > {}", |
331 | 0 | bytes_to_skip, |
332 | 0 | data.len() |
333 | | ); |
334 | 2.20k | let end_pos = cmp::min(data.len(), bytes_to_send + bytes_to_skip); |
335 | 2.20k | if bytes_to_skip != 0 || data.len() > bytes_to_send1.48k { Branch (335:16): [True: 727, False: 1.48k]
Branch (335:38): [True: 306, False: 1.17k]
Branch (335:16): [Folded - Ignored]
Branch (335:38): [Folded - Ignored]
|
336 | 1.03k | data = data.slice(bytes_to_skip..end_pos); |
337 | 1.17k | } |
338 | 2.20k | writer |
339 | 2.20k | .send(data) |
340 | 434 | .await |
341 | 2.20k | .err_tip(|| "Failed to write data to get_part dedup"0 )?0 ; |
342 | 2.20k | bytes_to_send -= end_pos - bytes_to_skip; |
343 | 2.20k | bytes_to_skip = 0; |
344 | | } |
345 | | |
346 | | // Finish our stream by writing our EOF and shutdown the stream. |
347 | 904 | writer |
348 | 904 | .send_eof() |
349 | 904 | .err_tip(|| "Failed to write EOF out from get_part dedup"0 )?0 ; |
350 | 904 | Ok(()) |
351 | 1.87k | } |
352 | | |
353 | 0 | fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver { |
354 | 0 | self |
355 | 0 | } |
356 | | |
357 | 0 | fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) { |
358 | 0 | self |
359 | 0 | } |
360 | | |
361 | 0 | fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> { |
362 | 0 | self |
363 | 0 | } |
364 | | } |
365 | | |
366 | | default_health_status_indicator!(DedupStore); |