Coverage Report

Created: 2025-10-20 10:50

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/dedup_store.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::cmp;
16
use core::pin::Pin;
17
use std::sync::Arc;
18
19
use async_trait::async_trait;
20
use bincode::serde::{decode_from_slice, encode_to_vec};
21
use futures::stream::{self, FuturesOrdered, StreamExt, TryStreamExt};
22
use nativelink_config::stores::DedupSpec;
23
use nativelink_error::{Code, Error, ResultExt, make_err};
24
use nativelink_metric::MetricsComponent;
25
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
26
use nativelink_util::common::DigestInfo;
27
use nativelink_util::fastcdc::FastCDC;
28
use nativelink_util::health_utils::{HealthStatusIndicator, default_health_status_indicator};
29
use nativelink_util::store_trait::{
30
    RemoveItemCallback, Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo,
31
};
32
use serde::{Deserialize, Serialize};
33
use tokio_util::codec::FramedRead;
34
use tokio_util::io::StreamReader;
35
use tracing::warn;
36
37
use crate::cas_utils::is_zero_digest;
38
39
// NOTE: If these change update the comments in `stores.rs` to reflect
40
// the new defaults.
41
const DEFAULT_MIN_SIZE: u64 = 64 * 1024;
42
const DEFAULT_NORM_SIZE: u64 = 256 * 1024;
43
const DEFAULT_MAX_SIZE: u64 = 512 * 1024;
44
const DEFAULT_MAX_CONCURRENT_FETCH_PER_GET: usize = 10;
45
46
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Default, Clone)]
47
pub struct DedupIndex {
48
    pub entries: Vec<DigestInfo>,
49
}
50
51
type LegacyBincodeConfig = bincode::config::Configuration<
52
    bincode::config::LittleEndian,
53
    bincode::config::Fixint,
54
    bincode::config::NoLimit,
55
>;
56
57
#[derive(MetricsComponent)]
58
pub struct DedupStore {
59
    #[metric(group = "index_store")]
60
    index_store: Store,
61
    #[metric(group = "content_store")]
62
    content_store: Store,
63
    fast_cdc_decoder: FastCDC,
64
    #[metric(help = "Maximum number of concurrent fetches per get")]
65
    max_concurrent_fetch_per_get: usize,
66
    bincode_config: LegacyBincodeConfig,
67
}
68
69
impl core::fmt::Debug for DedupStore {
70
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
71
0
        f.debug_struct("DedupStore")
72
0
            .field("index_store", &self.index_store)
73
0
            .field("content_store", &self.content_store)
74
0
            .field("fast_cdc_decoder", &self.fast_cdc_decoder)
75
0
            .field(
76
0
                "max_concurrent_fetch_per_get",
77
0
                &self.max_concurrent_fetch_per_get,
78
0
            )
79
0
            .finish_non_exhaustive()
80
0
    }
81
}
82
83
impl DedupStore {
84
8
    pub fn new(
85
8
        spec: &DedupSpec,
86
8
        index_store: Store,
87
8
        content_store: Store,
88
8
    ) -> Result<Arc<Self>, Error> {
89
8
        let min_size = if spec.min_size == 0 {
  Branch (89:27): [True: 0, False: 8]
  Branch (89:27): [Folded - Ignored]
90
0
            DEFAULT_MIN_SIZE
91
        } else {
92
8
            u64::from(spec.min_size)
93
        };
94
8
        let normal_size = if spec.normal_size == 0 {
  Branch (94:30): [True: 0, False: 8]
  Branch (94:30): [Folded - Ignored]
95
0
            DEFAULT_NORM_SIZE
96
        } else {
97
8
            u64::from(spec.normal_size)
98
        };
99
8
        let max_size = if spec.max_size == 0 {
  Branch (99:27): [True: 0, False: 8]
  Branch (99:27): [Folded - Ignored]
100
0
            DEFAULT_MAX_SIZE
101
        } else {
102
8
            u64::from(spec.max_size)
103
        };
104
8
        let max_concurrent_fetch_per_get = if spec.max_concurrent_fetch_per_get == 0 {
  Branch (104:47): [True: 0, False: 8]
  Branch (104:47): [Folded - Ignored]
105
0
            DEFAULT_MAX_CONCURRENT_FETCH_PER_GET
106
        } else {
107
8
            spec.max_concurrent_fetch_per_get as usize
108
        };
109
8
        Ok(Arc::new(Self {
110
8
            index_store,
111
8
            content_store,
112
8
            fast_cdc_decoder: FastCDC::new(
113
8
                usize::try_from(min_size).err_tip(|| "Could not convert min_size to usize")
?0
,
114
8
                usize::try_from(normal_size)
115
8
                    .err_tip(|| "Could not convert normal_size to usize")
?0
,
116
8
                usize::try_from(max_size).err_tip(|| "Could not convert max_size to usize")
?0
,
117
            ),
118
8
            max_concurrent_fetch_per_get,
119
8
            bincode_config: bincode::config::legacy(),
120
        }))
121
8
    }
122
123
4
    async fn has(self: Pin<&Self>, key: StoreKey<'_>) -> Result<Option<u64>, Error> {
124
        // First we need to load the index that contains where the individual parts actually
125
        // can be fetched from.
126
3
        let index_entries = {
127
4
            let maybe_data = self
128
4
                .index_store
129
4
                .get_part_unchunked(key.borrow(), 0, None)
130
4
                .await
131
4
                .err_tip(|| "Failed to read index store in dedup store");
132
4
            let 
data3
= match maybe_data {
133
1
                Err(e) => {
134
1
                    if e.code == Code::NotFound {
  Branch (134:24): [True: 1, False: 0]
  Branch (134:24): [Folded - Ignored]
135
1
                        return Ok(None);
136
0
                    }
137
0
                    return Err(e);
138
                }
139
3
                Ok(data) => data,
140
            };
141
142
3
            match decode_from_slice::<DedupIndex, _>(&data, self.bincode_config) {
143
3
                Ok((dedup_index, _)) => dedup_index,
144
0
                Err(err) => {
145
0
                    warn!(?key, ?err, "Failed to deserialize index in dedup store",);
146
                    // We return the equivalent of NotFound here so the client is happy.
147
0
                    return Ok(None);
148
                }
149
            }
150
        };
151
152
3
        let digests: Vec<_> = index_entries
153
3
            .entries
154
3
            .into_iter()
155
3
            .map(StoreKey::Digest)
156
3
            .collect();
157
3
        let mut sum = 0;
158
8
        for size in 
self.content_store3
.
has_many3
(&digests).await
?0
{
159
8
            let Some(
size7
) = size else {
  Branch (159:17): [True: 7, False: 1]
  Branch (159:17): [Folded - Ignored]
160
                // A part is missing so return None meaning not-found.
161
                // This will abort all in-flight queries related to this request.
162
1
                return Ok(None);
163
            };
164
7
            sum += size;
165
        }
166
2
        Ok(Some(sum))
167
4
    }
168
}
169
170
#[async_trait]
171
impl StoreDriver for DedupStore {
172
    async fn has_with_results(
173
        self: Pin<&Self>,
174
        digests: &[StoreKey<'_>],
175
        results: &mut [Option<u64>],
176
5
    ) -> Result<(), Error> {
177
        digests
178
            .iter()
179
            .zip(results.iter_mut())
180
5
            .map(|(key, result)| async move {
181
5
                if is_zero_digest(key.borrow()) {
  Branch (181:20): [True: 1, False: 4]
  Branch (181:20): [Folded - Ignored]
182
1
                    *result = Some(0);
183
1
                    return Ok(());
184
4
                }
185
186
4
                match self.has(key.borrow()).await {
187
4
                    Ok(maybe_size) => {
188
4
                        *result = maybe_size;
189
4
                        Ok(())
190
                    }
191
0
                    Err(err) => Err(err),
192
                }
193
10
            })
194
            .collect::<FuturesOrdered<_>>()
195
            .try_collect()
196
            .await
197
5
    }
198
199
    async fn update(
200
        self: Pin<&Self>,
201
        key: StoreKey<'_>,
202
        reader: DropCloserReadHalf,
203
        _size_info: UploadSizeInfo,
204
7
    ) -> Result<(), Error> {
205
        let mut bytes_reader = StreamReader::new(reader);
206
        let frame_reader = FramedRead::new(&mut bytes_reader, self.fast_cdc_decoder.clone());
207
        let index_entries = frame_reader
208
83
            .map(|r| r.err_tip(|| "Failed to decode frame from fast_cdc"))
209
83
            .map_ok(|frame| async move {
210
83
                let hash = blake3::hash(&frame[..]).into();
211
83
                let index_entry = DigestInfo::new(hash, frame.len() as u64);
212
83
                if self
  Branch (212:20): [True: 0, False: 83]
  Branch (212:20): [Folded - Ignored]
213
83
                    .content_store
214
83
                    .has(index_entry)
215
83
                    .await
216
83
                    .err_tip(|| "Failed to call .has() in DedupStore::update()")
?0
217
83
                    .is_some()
218
                {
219
                    // If our store has this digest, we don't need to upload it.
220
0
                    return Result::<_, Error>::Ok(index_entry);
221
83
                }
222
83
                self.content_store
223
83
                    .update_oneshot(index_entry, frame)
224
83
                    .await
225
83
                    .err_tip(|| "Failed to update content store in dedup_store")
?0
;
226
83
                Ok(index_entry)
227
166
            })
228
            .try_buffered(self.max_concurrent_fetch_per_get)
229
            .try_collect()
230
            .await?;
231
232
        let serialized_index = encode_to_vec(
233
            &DedupIndex {
234
                entries: index_entries,
235
            },
236
            self.bincode_config,
237
        )
238
0
        .map_err(|e| {
239
0
            make_err!(
240
0
                Code::Internal,
241
                "Failed to serialize index in dedup_store : {:?}",
242
                e
243
            )
244
0
        })?;
245
246
        self.index_store
247
            .update_oneshot(key, serialized_index.into())
248
            .await
249
            .err_tip(|| "Failed to insert our index entry to index_store in dedup_store")?;
250
251
        Ok(())
252
7
    }
253
254
    async fn get_part(
255
        self: Pin<&Self>,
256
        key: StoreKey<'_>,
257
        writer: &mut DropCloserWriteHalf,
258
        offset: u64,
259
        length: Option<u64>,
260
935
    ) -> Result<(), Error> {
261
        // Special case for if a client tries to read zero bytes.
262
        if length == Some(0) {
263
            writer
264
                .send_eof()
265
                .err_tip(|| "Failed to write EOF out from get_part dedup")?;
266
            return Ok(());
267
        }
268
        // First we need to download the index that contains where the individual parts actually
269
        // can be fetched from.
270
        let index_entries = {
271
            let data = self
272
                .index_store
273
                .get_part_unchunked(key, 0, None)
274
                .await
275
                .err_tip(|| "Failed to read index store in dedup store")?;
276
            let (dedup_index, _) = decode_from_slice::<DedupIndex, _>(&data, self.bincode_config)
277
0
                .map_err(|e| {
278
0
                make_err!(
279
0
                    Code::Internal,
280
                    "Failed to deserialize index in dedup_store::get_part : {:?}",
281
                    e
282
                )
283
0
            })?;
284
            dedup_index
285
        };
286
287
        let mut start_byte_in_stream: u64 = 0;
288
        let entries = {
289
            if offset == 0 && length.is_none() {
290
                index_entries.entries
291
            } else {
292
                let mut current_entries_sum = 0;
293
                let mut entries = Vec::with_capacity(index_entries.entries.len());
294
                for entry in index_entries.entries {
295
                    let first_byte = current_entries_sum;
296
                    let entry_size = entry.size_bytes();
297
                    current_entries_sum += entry_size;
298
                    // Filter any items who's end byte is before the first requested byte.
299
                    if current_entries_sum <= offset {
300
                        start_byte_in_stream = current_entries_sum;
301
                        continue;
302
                    }
303
                    // If we are not going to read any bytes past the length we are done.
304
                    if let Some(length) = length {
305
                        if first_byte >= offset + length {
306
                            break;
307
                        }
308
                    }
309
                    entries.push(entry);
310
                }
311
                entries
312
            }
313
        };
314
315
        // Second we we create a stream of futures for each chunk, but buffer/limit them so only
316
        // `max_concurrent_fetch_per_get` will be executed at a time.
317
        // The results will be streamed out in the same order they are in the entries table.
318
        // The results will execute in a "window-like" fashion, meaning that if we limit to
319
        // 5 requests at a time, and request 3 is stalled, request 1 & 2 can be output and
320
        // request 4 & 5 can be executing (or finished) while waiting for 3 to finish.
321
        // Note: We will buffer our data here up to:
322
        // `spec.max_size * spec.max_concurrent_fetch_per_get` per `get_part()` request.
323
        let mut entries_stream = stream::iter(entries)
324
2.11k
            .map(move |index_entry| async move {
325
2.11k
                let 
data2.11k
= self
326
2.11k
                    .content_store
327
2.11k
                    .get_part_unchunked(index_entry, 0, None)
328
2.11k
                    .await
329
2.11k
                    .err_tip(|| "Failed to get_part in content_store in dedup_store")
?1
;
330
331
2.11k
                Result::<_, Error>::Ok(data)
332
4.22k
            })
333
            .buffered(self.max_concurrent_fetch_per_get);
334
335
        // Stream out the buffered data one at a time and write the data to our writer stream.
336
        // In the event any of these error, we will abort early and abandon all the rest of the
337
        // streamed data.
338
        // Note: Need to take special care to ensure we send the proper slice of data requested.
339
        let mut bytes_to_skip = usize::try_from(offset - start_byte_in_stream)
340
            .err_tip(|| "Could not convert (offset - start_byte_in_stream) to usize")?;
341
        let mut bytes_to_send = usize::try_from(length.unwrap_or(u64::MAX - offset))
342
            .err_tip(|| "Could not convert length to usize")?;
343
        while let Some(result) = entries_stream.next().await {
344
            let mut data = result.err_tip(|| "Inner store iterator closed early in DedupStore")?;
345
            assert!(
346
                bytes_to_skip <= data.len(),
347
                "Formula above must be wrong, {} > {}",
348
                bytes_to_skip,
349
                data.len()
350
            );
351
            let end_pos = cmp::min(data.len(), bytes_to_send + bytes_to_skip);
352
            if bytes_to_skip != 0 || data.len() > bytes_to_send {
353
                data = data.slice(bytes_to_skip..end_pos);
354
            }
355
            writer
356
                .send(data)
357
                .await
358
                .err_tip(|| "Failed to write data to get_part dedup")?;
359
            bytes_to_send -= end_pos - bytes_to_skip;
360
            bytes_to_skip = 0;
361
        }
362
363
        // Finish our stream by writing our EOF and shutdown the stream.
364
        writer
365
            .send_eof()
366
            .err_tip(|| "Failed to write EOF out from get_part dedup")?;
367
        Ok(())
368
935
    }
369
370
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
371
0
        self
372
0
    }
373
374
0
    fn as_any<'a>(&'a self) -> &'a (dyn core::any::Any + Sync + Send + 'static) {
375
0
        self
376
0
    }
377
378
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> {
379
0
        self
380
0
    }
381
382
0
    fn register_remove_callback(
383
0
        self: Arc<Self>,
384
0
        callback: Arc<dyn RemoveItemCallback>,
385
0
    ) -> Result<(), Error> {
386
0
        self.index_store
387
0
            .register_remove_callback(callback.clone())?;
388
0
        self.content_store.register_remove_callback(callback)?;
389
0
        Ok(())
390
0
    }
391
}
392
393
default_health_status_indicator!(DedupStore);