Coverage Report

Created: 2025-04-19 16:54

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/dedup_store.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::cmp;
16
use std::pin::Pin;
17
use std::sync::Arc;
18
19
use async_trait::async_trait;
20
use bincode::config::{FixintEncoding, WithOtherIntEncoding};
21
use bincode::{DefaultOptions, Options};
22
use futures::stream::{self, FuturesOrdered, StreamExt, TryStreamExt};
23
use nativelink_config::stores::DedupSpec;
24
use nativelink_error::{Code, Error, ResultExt, make_err};
25
use nativelink_metric::MetricsComponent;
26
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
27
use nativelink_util::common::DigestInfo;
28
use nativelink_util::fastcdc::FastCDC;
29
use nativelink_util::health_utils::{HealthStatusIndicator, default_health_status_indicator};
30
use nativelink_util::store_trait::{Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo};
31
use serde::{Deserialize, Serialize};
32
use tokio_util::codec::FramedRead;
33
use tokio_util::io::StreamReader;
34
use tracing::{Level, event};
35
36
use crate::cas_utils::is_zero_digest;
37
38
// NOTE: If these change update the comments in `stores.rs` to reflect
39
// the new defaults.
40
const DEFAULT_MIN_SIZE: u64 = 64 * 1024;
41
const DEFAULT_NORM_SIZE: u64 = 256 * 1024;
42
const DEFAULT_MAX_SIZE: u64 = 512 * 1024;
43
const DEFAULT_MAX_CONCURRENT_FETCH_PER_GET: usize = 10;
44
45
#[derive(Serialize, Deserialize, PartialEq, Debug, Default, Clone)]
46
pub struct DedupIndex {
47
    pub entries: Vec<DigestInfo>,
48
}
49
50
#[derive(MetricsComponent)]
51
pub struct DedupStore {
52
    #[metric(group = "index_store")]
53
    index_store: Store,
54
    #[metric(group = "content_store")]
55
    content_store: Store,
56
    fast_cdc_decoder: FastCDC,
57
    #[metric(help = "Maximum number of concurrent fetches per get")]
58
    max_concurrent_fetch_per_get: usize,
59
    bincode_options: WithOtherIntEncoding<DefaultOptions, FixintEncoding>,
60
}
61
62
impl std::fmt::Debug for DedupStore {
63
0
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64
0
        f.debug_struct("DedupStore")
65
0
            .field("index_store", &self.index_store)
66
0
            .field("content_store", &self.content_store)
67
0
            .field("fast_cdc_decoder", &self.fast_cdc_decoder)
68
0
            .field(
69
0
                "max_concurrent_fetch_per_get",
70
0
                &self.max_concurrent_fetch_per_get,
71
0
            )
72
0
            .finish_non_exhaustive()
73
0
    }
74
}
75
76
impl DedupStore {
77
8
    pub fn new(
78
8
        spec: &DedupSpec,
79
8
        index_store: Store,
80
8
        content_store: Store,
81
8
    ) -> Result<Arc<Self>, Error> {
82
8
        let min_size = if spec.min_size == 0 {
  Branch (82:27): [True: 0, False: 8]
  Branch (82:27): [Folded - Ignored]
83
0
            DEFAULT_MIN_SIZE
84
        } else {
85
8
            u64::from(spec.min_size)
86
        };
87
8
        let normal_size = if spec.normal_size == 0 {
  Branch (87:30): [True: 0, False: 8]
  Branch (87:30): [Folded - Ignored]
88
0
            DEFAULT_NORM_SIZE
89
        } else {
90
8
            u64::from(spec.normal_size)
91
        };
92
8
        let max_size = if spec.max_size == 0 {
  Branch (92:27): [True: 0, False: 8]
  Branch (92:27): [Folded - Ignored]
93
0
            DEFAULT_MAX_SIZE
94
        } else {
95
8
            u64::from(spec.max_size)
96
        };
97
8
        let max_concurrent_fetch_per_get = if spec.max_concurrent_fetch_per_get == 0 {
  Branch (97:47): [True: 0, False: 8]
  Branch (97:47): [Folded - Ignored]
98
0
            DEFAULT_MAX_CONCURRENT_FETCH_PER_GET
99
        } else {
100
8
            spec.max_concurrent_fetch_per_get as usize
101
        };
102
8
        Ok(Arc::new(Self {
103
8
            index_store,
104
8
            content_store,
105
8
            fast_cdc_decoder: FastCDC::new(
106
8
                usize::try_from(min_size).err_tip(|| 
"Could not convert min_size to usize"0
)
?0
,
107
8
                usize::try_from(normal_size)
108
8
                    .err_tip(|| 
"Could not convert normal_size to usize"0
)
?0
,
109
8
                usize::try_from(max_size).err_tip(|| 
"Could not convert max_size to usize"0
)
?0
,
110
            ),
111
8
            max_concurrent_fetch_per_get,
112
8
            bincode_options: DefaultOptions::new().with_fixint_encoding(),
113
        }))
114
8
    }
115
116
4
    async fn has(self: Pin<&Self>, key: StoreKey<'_>) -> Result<Option<u64>, Error> {
117
        // First we need to load the index that contains where the individual parts actually
118
        // can be fetched from.
119
3
        let index_entries = {
120
4
            let maybe_data = self
121
4
                .index_store
122
4
                .get_part_unchunked(key.borrow(), 0, None)
123
4
                .await
124
4
                .err_tip(|| 
"Failed to read index store in dedup store"1
);
125
4
            let 
data3
= match maybe_data {
126
1
                Err(e) => {
127
1
                    if e.code == Code::NotFound {
  Branch (127:24): [True: 1, False: 0]
  Branch (127:24): [Folded - Ignored]
128
1
                        return Ok(None);
129
0
                    }
130
0
                    return Err(e);
131
                }
132
3
                Ok(data) => data,
133
3
            };
134
3
135
3
            match self.bincode_options.deserialize::<DedupIndex>(&data) {
136
0
                Err(err) => {
137
0
                    event!(
138
0
                        Level::WARN,
139
                        ?key,
140
                        ?err,
141
0
                        "Failed to deserialize index in dedup store",
142
                    );
143
                    // We return the equivalent of NotFound here so the client is happy.
144
0
                    return Ok(None);
145
                }
146
3
                Ok(v) => v,
147
3
            }
148
3
        };
149
3
150
3
        let digests: Vec<_> = index_entries
151
3
            .entries
152
3
            .into_iter()
153
3
            .map(StoreKey::Digest)
154
3
            .collect();
155
3
        let mut sum = 0;
156
8
        for size in 
self.content_store.has_many(&digests).await3
?0
{
157
8
            let Some(
size7
) = size else {
  Branch (157:17): [True: 7, False: 1]
  Branch (157:17): [Folded - Ignored]
158
                // A part is missing so return None meaning not-found.
159
                // This will abort all in-flight queries related to this request.
160
1
                return Ok(None);
161
            };
162
7
            sum += size;
163
        }
164
2
        Ok(Some(sum))
165
4
    }
166
}
167
168
#[async_trait]
169
impl StoreDriver for DedupStore {
170
    async fn has_with_results(
171
        self: Pin<&Self>,
172
        digests: &[StoreKey<'_>],
173
        results: &mut [Option<u64>],
174
10
    ) -> Result<(), Error> {
175
5
        digests
176
5
            .iter()
177
5
            .zip(results.iter_mut())
178
5
            .map(|(key, result)| async move {
179
5
                if is_zero_digest(key.borrow()) {
  Branch (179:20): [True: 1, False: 4]
  Branch (179:20): [Folded - Ignored]
180
1
                    *result = Some(0);
181
1
                    return Ok(());
182
4
                }
183
4
184
4
                match self.has(key.borrow()).await {
185
4
                    Ok(maybe_size) => {
186
4
                        *result = maybe_size;
187
4
                        Ok(())
188
                    }
189
0
                    Err(err) => Err(err),
190
                }
191
10
            })
192
5
            .collect::<FuturesOrdered<_>>()
193
5
            .try_collect()
194
5
            .await
195
10
    }
196
197
    async fn update(
198
        self: Pin<&Self>,
199
        key: StoreKey<'_>,
200
        reader: DropCloserReadHalf,
201
        _size_info: UploadSizeInfo,
202
14
    ) -> Result<(), Error> {
203
7
        let mut bytes_reader = StreamReader::new(reader);
204
7
        let frame_reader = FramedRead::new(&mut bytes_reader, self.fast_cdc_decoder.clone());
205
7
        let index_entries = frame_reader
206
83
            .map(|r| r.err_tip(|| 
"Failed to decode frame from fast_cdc"0
))
207
83
            .
map_ok7
(|frame| async move {
208
83
                let hash = blake3::hash(&frame[..]).into();
209
83
                let index_entry = DigestInfo::new(hash, frame.len() as u64);
210
83
                if self
  Branch (210:20): [True: 0, False: 83]
  Branch (210:20): [Folded - Ignored]
211
83
                    .content_store
212
83
                    .has(index_entry)
213
83
                    .await
214
83
                    .err_tip(|| 
"Failed to call .has() in DedupStore::update()"0
)
?0
215
83
                    .is_some()
216
                {
217
                    // If our store has this digest, we don't need to upload it.
218
0
                    return Result::<_, Error>::Ok(index_entry);
219
83
                }
220
83
                self.content_store
221
83
                    .update_oneshot(index_entry, frame)
222
83
                    .await
223
83
                    .err_tip(|| 
"Failed to update content store in dedup_store"0
)
?0
;
224
83
                Ok(index_entry)
225
166
            })
226
7
            .try_buffered(self.max_concurrent_fetch_per_get)
227
7
            .try_collect()
228
7
            .await
?0
;
229
230
7
        let serialized_index = self
231
7
            .bincode_options
232
7
            .serialize(&DedupIndex {
233
7
                entries: index_entries,
234
7
            })
235
7
            .map_err(|e| {
236
0
                make_err!(
237
0
                    Code::Internal,
238
0
                    "Failed to serialize index in dedup_store : {:?}",
239
0
                    e
240
0
                )
241
0
            })?;
242
243
7
        self.index_store
244
7
            .update_oneshot(key, serialized_index.into())
245
7
            .await
246
7
            .err_tip(|| 
"Failed to insert our index entry to index_store in dedup_store"0
)
?0
;
247
248
7
        Ok(())
249
14
    }
250
251
    async fn get_part(
252
        self: Pin<&Self>,
253
        key: StoreKey<'_>,
254
        writer: &mut DropCloserWriteHalf,
255
        offset: u64,
256
        length: Option<u64>,
257
1.87k
    ) -> Result<(), Error> {
258
        // Special case for if a client tries to read zero bytes.
259
935
        if length == Some(0) {
  Branch (259:12): [True: 30, False: 905]
  Branch (259:12): [Folded - Ignored]
260
30
            writer
261
30
                .send_eof()
262
30
                .err_tip(|| 
"Failed to write EOF out from get_part dedup"0
)
?0
;
263
30
            return Ok(());
264
905
        }
265
        // First we need to download the index that contains where the individual parts actually
266
        // can be fetched from.
267
905
        let index_entries = {
268
905
            let data = self
269
905
                .index_store
270
905
                .get_part_unchunked(key, 0, None)
271
905
                .await
272
905
                .err_tip(|| 
"Failed to read index store in dedup store"0
)
?0
;
273
274
905
            self.bincode_options
275
905
                .deserialize::<DedupIndex>(&data)
276
905
                .map_err(|e| {
277
0
                    make_err!(
278
0
                        Code::Internal,
279
0
                        "Failed to deserialize index in dedup_store::get_part : {:?}",
280
0
                        e
281
0
                    )
282
0
                })?
283
        };
284
285
905
        let mut start_byte_in_stream: u64 = 0;
286
905
        let entries = {
287
905
            if offset == 0 && 
length.is_none31
() {
  Branch (287:16): [True: 31, False: 874]
  Branch (287:31): [True: 2, False: 29]
  Branch (287:16): [Folded - Ignored]
  Branch (287:31): [Folded - Ignored]
288
2
                index_entries.entries
289
            } else {
290
903
                let mut current_entries_sum = 0;
291
903
                let mut entries = Vec::with_capacity(index_entries.entries.len());
292
4.82k
                for 
entry4.24k
in index_entries.entries {
293
4.24k
                    let first_byte = current_entries_sum;
294
4.24k
                    let entry_size = entry.size_bytes();
295
4.24k
                    current_entries_sum += entry_size;
296
4.24k
                    // Filter any items who's end byte is before the first requested byte.
297
4.24k
                    if current_entries_sum <= offset {
  Branch (297:24): [True: 1.86k, False: 2.38k]
  Branch (297:24): [Folded - Ignored]
298
1.86k
                        start_byte_in_stream = current_entries_sum;
299
1.86k
                        continue;
300
2.38k
                    }
301
                    // If we are not going to read any bytes past the length we are done.
302
2.38k
                    if let Some(
length2.37k
) = length {
  Branch (302:28): [True: 2.37k, False: 8]
  Branch (302:28): [Folded - Ignored]
303
2.37k
                        if first_byte >= offset + length {
  Branch (303:28): [True: 326, False: 2.04k]
  Branch (303:28): [Folded - Ignored]
304
326
                            break;
305
2.04k
                        }
306
8
                    }
307
2.05k
                    entries.push(entry);
308
                }
309
903
                entries
310
            }
311
        };
312
313
        // Second we we create a stream of futures for each chunk, but buffer/limit them so only
314
        // `max_concurrent_fetch_per_get` will be executed at a time.
315
        // The results will be streamed out in the same order they are in the entries table.
316
        // The results will execute in a "window-like" fashion, meaning that if we limit to
317
        // 5 requests at a time, and request 3 is stalled, request 1 & 2 can be output and
318
        // request 4 & 5 can be executing (or finished) while waiting for 3 to finish.
319
        // Note: We will buffer our data here up to:
320
        // `spec.max_size * spec.max_concurrent_fetch_per_get` per `get_part()` request.
321
905
        let mut entries_stream = stream::iter(entries)
322
2.11k
            .map(move |index_entry| async move {
323
2.11k
                let 
data2.11k
= self
324
2.11k
                    .content_store
325
2.11k
                    .get_part_unchunked(index_entry, 0, None)
326
2.11k
                    .await
327
2.11k
                    .err_tip(|| 
"Failed to get_part in content_store in dedup_store"1
)
?1
;
328
329
2.11k
                Result::<_, Error>::Ok(data)
330
4.23k
            })
331
905
            .buffered(self.max_concurrent_fetch_per_get);
332
333
        // Stream out the buffered data one at a time and write the data to our writer stream.
334
        // In the event any of these error, we will abort early and abandon all the rest of the
335
        // streamed data.
336
        // Note: Need to take special care to ensure we send the proper slice of data requested.
337
905
        let mut bytes_to_skip = usize::try_from(offset - start_byte_in_stream)
338
905
            .err_tip(|| 
"Could not convert (offset - start_byte_in_stream) to usize"0
)
?0
;
339
905
        let mut bytes_to_send = usize::try_from(length.unwrap_or(u64::MAX - offset))
340
905
            .err_tip(|| 
"Could not convert length to usize"0
)
?0
;
341
3.01k
        while let Some(
result2.11k
) = entries_stream.next().await {
  Branch (341:19): [True: 2.11k, False: 904]
  Branch (341:19): [Folded - Ignored]
342
2.11k
            let 
mut data2.11k
= result.err_tip(||
"Inner store iterator closed early in DedupStore"1
)
?1
;
343
2.11k
            assert!(
344
2.11k
                bytes_to_skip <= data.len(),
345
0
                "Formula above must be wrong, {} > {}",
346
0
                bytes_to_skip,
347
0
                data.len()
348
            );
349
2.11k
            let end_pos = cmp::min(data.len(), bytes_to_send + bytes_to_skip);
350
2.11k
            if bytes_to_skip != 0 || 
data.len() > bytes_to_send1.38k
{
  Branch (350:16): [True: 728, False: 1.38k]
  Branch (350:38): [True: 324, False: 1.06k]
  Branch (350:16): [Folded - Ignored]
  Branch (350:38): [Folded - Ignored]
351
1.05k
                data = data.slice(bytes_to_skip..end_pos);
352
1.06k
            }
353
2.11k
            writer
354
2.11k
                .send(data)
355
2.11k
                .await
356
2.11k
                .err_tip(|| 
"Failed to write data to get_part dedup"0
)
?0
;
357
2.11k
            bytes_to_send -= end_pos - bytes_to_skip;
358
2.11k
            bytes_to_skip = 0;
359
        }
360
361
        // Finish our stream by writing our EOF and shutdown the stream.
362
904
        writer
363
904
            .send_eof()
364
904
            .err_tip(|| 
"Failed to write EOF out from get_part dedup"0
)
?0
;
365
904
        Ok(())
366
1.87k
    }
367
368
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
369
0
        self
370
0
    }
371
372
0
    fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) {
373
0
        self
374
0
    }
375
376
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
377
0
        self
378
0
    }
379
}
380
381
default_health_status_indicator!(DedupStore);