Coverage Report

Created: 2026-05-23 21:09

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/dedup_store.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::cmp;
16
use core::pin::Pin;
17
use std::sync::Arc;
18
19
use async_trait::async_trait;
20
use futures::stream::{self, FuturesOrdered, StreamExt, TryStreamExt};
21
use nativelink_config::stores::DedupSpec;
22
use nativelink_error::{Code, Error, ResultExt, make_err};
23
use nativelink_metric::MetricsComponent;
24
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
25
use nativelink_util::common::DigestInfo;
26
use nativelink_util::fastcdc::FastCDC;
27
use nativelink_util::health_utils::{HealthStatusIndicator, default_health_status_indicator};
28
use nativelink_util::store_trait::{
29
    RemoveItemCallback, Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo,
30
};
31
use serde::{Deserialize, Serialize};
32
use tokio_util::codec::FramedRead;
33
use tokio_util::io::StreamReader;
34
use tracing::warn;
35
36
use crate::cas_utils::is_zero_digest;
37
use crate::compression_store::WincodeConfig;
38
39
// NOTE: If these change update the comments in `stores.rs` to reflect
40
// the new defaults.
41
const DEFAULT_MIN_SIZE: u64 = 64 * 1024;
42
const DEFAULT_NORM_SIZE: u64 = 256 * 1024;
43
const DEFAULT_MAX_SIZE: u64 = 512 * 1024;
44
const DEFAULT_MAX_CONCURRENT_FETCH_PER_GET: usize = 10;
45
46
#[derive(
47
    Serialize,
48
    Deserialize,
49
    PartialEq,
50
    Eq,
51
    Debug,
52
    Default,
53
    Clone,
54
    wincode::SchemaRead,
55
0
    wincode::SchemaWrite,
56
)]
57
pub struct DedupIndex {
58
    pub entries: Vec<DigestInfo>,
59
}
60
61
#[derive(MetricsComponent)]
62
pub struct DedupStore {
63
    #[metric(group = "index_store")]
64
    index_store: Store,
65
    #[metric(group = "content_store")]
66
    content_store: Store,
67
    fast_cdc_decoder: FastCDC,
68
    #[metric(help = "Maximum number of concurrent fetches per get")]
69
    max_concurrent_fetch_per_get: usize,
70
    wincode_config: WincodeConfig,
71
}
72
73
impl core::fmt::Debug for DedupStore {
74
0
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
75
0
        f.debug_struct("DedupStore")
76
0
            .field("index_store", &self.index_store)
77
0
            .field("content_store", &self.content_store)
78
0
            .field("fast_cdc_decoder", &self.fast_cdc_decoder)
79
0
            .field(
80
0
                "max_concurrent_fetch_per_get",
81
0
                &self.max_concurrent_fetch_per_get,
82
0
            )
83
0
            .finish_non_exhaustive()
84
0
    }
85
}
86
87
impl DedupStore {
88
8
    pub fn new(
89
8
        spec: &DedupSpec,
90
8
        index_store: Store,
91
8
        content_store: Store,
92
8
    ) -> Result<Arc<Self>, Error> {
93
8
        let min_size = if spec.min_size == 0 {
94
0
            DEFAULT_MIN_SIZE
95
        } else {
96
8
            u64::from(spec.min_size)
97
        };
98
8
        let normal_size = if spec.normal_size == 0 {
99
0
            DEFAULT_NORM_SIZE
100
        } else {
101
8
            u64::from(spec.normal_size)
102
        };
103
8
        let max_size = if spec.max_size == 0 {
104
0
            DEFAULT_MAX_SIZE
105
        } else {
106
8
            u64::from(spec.max_size)
107
        };
108
8
        let max_concurrent_fetch_per_get = if spec.max_concurrent_fetch_per_get == 0 {
109
0
            DEFAULT_MAX_CONCURRENT_FETCH_PER_GET
110
        } else {
111
8
            spec.max_concurrent_fetch_per_get as usize
112
        };
113
8
        Ok(Arc::new(Self {
114
8
            index_store,
115
8
            content_store,
116
8
            fast_cdc_decoder: FastCDC::new(
117
8
                usize::try_from(min_size).err_tip(|| "Could not convert min_size to usize")
?0
,
118
8
                usize::try_from(normal_size)
119
8
                    .err_tip(|| "Could not convert normal_size to usize")
?0
,
120
8
                usize::try_from(max_size).err_tip(|| "Could not convert max_size to usize")
?0
,
121
            ),
122
8
            max_concurrent_fetch_per_get,
123
8
            wincode_config: WincodeConfig::new(),
124
        }))
125
8
    }
126
127
4
    async fn has(self: Pin<&Self>, key: StoreKey<'_>) -> Result<Option<u64>, Error> {
128
        // First we need to load the index that contains where the individual parts actually
129
        // can be fetched from.
130
3
        let index_entries = {
131
4
            let maybe_data = self
132
4
                .index_store
133
4
                .get_part_unchunked(key.borrow(), 0, None)
134
4
                .await
135
4
                .err_tip(|| "Failed to read index store in dedup store");
136
4
            let 
data3
= match maybe_data {
137
1
                Err(e) => {
138
1
                    if e.code == Code::NotFound {
139
1
                        return Ok(None);
140
0
                    }
141
0
                    return Err(e);
142
                }
143
3
                Ok(data) => data,
144
            };
145
146
3
            match wincode::config::deserialize::<DedupIndex, WincodeConfig>(
147
3
                &data,
148
3
                self.wincode_config,
149
3
            ) {
150
3
                Ok(dedup_index) => dedup_index,
151
0
                Err(err) => {
152
0
                    warn!(?key, ?err, "Failed to deserialize index in dedup store",);
153
                    // We return the equivalent of NotFound here so the client is happy.
154
0
                    return Ok(None);
155
                }
156
            }
157
        };
158
159
3
        let digests: Vec<_> = index_entries
160
3
            .entries
161
3
            .into_iter()
162
3
            .map(StoreKey::Digest)
163
3
            .collect();
164
3
        let mut sum = 0;
165
8
        for size in 
self.content_store3
.
has_many3
(&digests).await
?0
{
166
8
            let Some(
size7
) = size else {
167
                // A part is missing so return None meaning not-found.
168
                // This will abort all in-flight queries related to this request.
169
1
                return Ok(None);
170
            };
171
7
            sum += size;
172
        }
173
2
        Ok(Some(sum))
174
4
    }
175
}
176
177
#[async_trait]
178
impl StoreDriver for DedupStore {
179
    async fn has_with_results(
180
        self: Pin<&Self>,
181
        digests: &[StoreKey<'_>],
182
        results: &mut [Option<u64>],
183
5
    ) -> Result<(), Error> {
184
        digests
185
            .iter()
186
            .zip(results.iter_mut())
187
5
            .map(|(key, result)| async move {
188
5
                if is_zero_digest(key.borrow()) {
189
1
                    *result = Some(0);
190
1
                    return Ok(());
191
4
                }
192
193
4
                match self.has(key.borrow()).await {
194
4
                    Ok(maybe_size) => {
195
4
                        *result = maybe_size;
196
4
                        Ok(())
197
                    }
198
0
                    Err(err) => Err(err),
199
                }
200
10
            })
201
            .collect::<FuturesOrdered<_>>()
202
            .try_collect()
203
            .await
204
5
    }
205
206
    async fn update(
207
        self: Pin<&Self>,
208
        key: StoreKey<'_>,
209
        reader: DropCloserReadHalf,
210
        _size_info: UploadSizeInfo,
211
7
    ) -> Result<(), Error> {
212
        let mut bytes_reader = StreamReader::new(reader);
213
        let frame_reader = FramedRead::new(&mut bytes_reader, self.fast_cdc_decoder.clone());
214
        let index_entries = frame_reader
215
83
            .map(|r| r.err_tip(|| "Failed to decode frame from fast_cdc"))
216
83
            .map_ok(|frame| async move {
217
83
                let hash = blake3::hash(&frame[..]).into();
218
83
                let index_entry = DigestInfo::new(hash, frame.len() as u64);
219
83
                if self
220
83
                    .content_store
221
83
                    .has(index_entry)
222
83
                    .await
223
83
                    .err_tip(|| "Failed to call .has() in DedupStore::update()")
?0
224
83
                    .is_some()
225
                {
226
                    // If our store has this digest, we don't need to upload it.
227
0
                    return Result::<_, Error>::Ok(index_entry);
228
83
                }
229
83
                self.content_store
230
83
                    .update_oneshot(index_entry, frame)
231
83
                    .await
232
83
                    .err_tip(|| "Failed to update content store in dedup_store")
?0
;
233
83
                Ok(index_entry)
234
166
            })
235
            .try_buffered(self.max_concurrent_fetch_per_get)
236
            .try_collect()
237
            .await?;
238
239
        let serialized_index = wincode::config::serialize(
240
            &DedupIndex {
241
                entries: index_entries,
242
            },
243
            self.wincode_config,
244
        )
245
0
        .map_err(|e| {
246
0
            make_err!(
247
0
                Code::Internal,
248
                "Failed to serialize index in dedup_store : {:?}",
249
                e
250
            )
251
0
        })?;
252
253
        self.index_store
254
            .update_oneshot(key, serialized_index.into())
255
            .await
256
            .err_tip(|| "Failed to insert our index entry to index_store in dedup_store")?;
257
258
        Ok(())
259
7
    }
260
261
    async fn get_part(
262
        self: Pin<&Self>,
263
        key: StoreKey<'_>,
264
        writer: &mut DropCloserWriteHalf,
265
        offset: u64,
266
        length: Option<u64>,
267
935
    ) -> Result<(), Error> {
268
        // Special case for if a client tries to read zero bytes.
269
        if length == Some(0) {
270
            writer
271
                .send_eof()
272
                .err_tip(|| "Failed to write EOF out from get_part dedup")?;
273
            return Ok(());
274
        }
275
        // First we need to download the index that contains where the individual parts actually
276
        // can be fetched from.
277
        let index_entries = {
278
            let data = self
279
                .index_store
280
                .get_part_unchunked(key, 0, None)
281
                .await
282
                .err_tip(|| "Failed to read index store in dedup store")?;
283
            wincode::config::deserialize::<DedupIndex, WincodeConfig>(&data, self.wincode_config)
284
0
                .map_err(|e| {
285
0
                    make_err!(
286
0
                        Code::Internal,
287
                        "Failed to deserialize index in dedup_store::get_part : {:?}",
288
                        e
289
                    )
290
0
                })?
291
        };
292
293
        let mut start_byte_in_stream: u64 = 0;
294
        let entries = {
295
            if offset == 0 && length.is_none() {
296
                index_entries.entries
297
            } else {
298
                let mut current_entries_sum = 0;
299
                let mut entries = Vec::with_capacity(index_entries.entries.len());
300
                for entry in index_entries.entries {
301
                    let first_byte = current_entries_sum;
302
                    let entry_size = entry.size_bytes();
303
                    current_entries_sum += entry_size;
304
                    // Filter any items who's end byte is before the first requested byte.
305
                    if current_entries_sum <= offset {
306
                        start_byte_in_stream = current_entries_sum;
307
                        continue;
308
                    }
309
                    // If we are not going to read any bytes past the length we are done.
310
                    if let Some(length) = length
311
                        && first_byte >= offset + length
312
                    {
313
                        break;
314
                    }
315
                    entries.push(entry);
316
                }
317
                entries
318
            }
319
        };
320
321
        // Second we we create a stream of futures for each chunk, but buffer/limit them so only
322
        // `max_concurrent_fetch_per_get` will be executed at a time.
323
        // The results will be streamed out in the same order they are in the entries table.
324
        // The results will execute in a "window-like" fashion, meaning that if we limit to
325
        // 5 requests at a time, and request 3 is stalled, request 1 & 2 can be output and
326
        // request 4 & 5 can be executing (or finished) while waiting for 3 to finish.
327
        // Note: We will buffer our data here up to:
328
        // `spec.max_size * spec.max_concurrent_fetch_per_get` per `get_part()` request.
329
        let mut entries_stream = stream::iter(entries)
330
2.11k
            .map(move |index_entry| async move {
331
2.11k
                let 
data2.11k
= self
332
2.11k
                    .content_store
333
2.11k
                    .get_part_unchunked(index_entry, 0, None)
334
2.11k
                    .await
335
2.11k
                    .err_tip(|| "Failed to get_part in content_store in dedup_store")
?1
;
336
337
2.11k
                Result::<_, Error>::Ok(data)
338
4.22k
            })
339
            .buffered(self.max_concurrent_fetch_per_get);
340
341
        // Stream out the buffered data one at a time and write the data to our writer stream.
342
        // In the event any of these error, we will abort early and abandon all the rest of the
343
        // streamed data.
344
        // Note: Need to take special care to ensure we send the proper slice of data requested.
345
        let mut bytes_to_skip = usize::try_from(offset - start_byte_in_stream)
346
            .err_tip(|| "Could not convert (offset - start_byte_in_stream) to usize")?;
347
        let mut bytes_to_send = usize::try_from(length.unwrap_or(u64::MAX - offset))
348
            .err_tip(|| "Could not convert length to usize")?;
349
        while let Some(result) = entries_stream.next().await {
350
            let mut data = result.err_tip(|| "Inner store iterator closed early in DedupStore")?;
351
            assert!(
352
                bytes_to_skip <= data.len(),
353
                "Formula above must be wrong, {} > {}",
354
                bytes_to_skip,
355
                data.len()
356
            );
357
            let end_pos = cmp::min(data.len(), bytes_to_send + bytes_to_skip);
358
            if bytes_to_skip != 0 || data.len() > bytes_to_send {
359
                data = data.slice(bytes_to_skip..end_pos);
360
            }
361
            writer
362
                .send(data)
363
                .await
364
                .err_tip(|| "Failed to write data to get_part dedup")?;
365
            bytes_to_send -= end_pos - bytes_to_skip;
366
            bytes_to_skip = 0;
367
        }
368
369
        // Finish our stream by writing our EOF and shutdown the stream.
370
        writer
371
            .send_eof()
372
            .err_tip(|| "Failed to write EOF out from get_part dedup")?;
373
        Ok(())
374
935
    }
375
376
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
377
0
        self
378
0
    }
379
380
0
    fn as_any<'a>(&'a self) -> &'a (dyn core::any::Any + Sync + Send + 'static) {
381
0
        self
382
0
    }
383
384
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> {
385
0
        self
386
0
    }
387
388
0
    fn register_remove_callback(
389
0
        self: Arc<Self>,
390
0
        callback: Arc<dyn RemoveItemCallback>,
391
0
    ) -> Result<(), Error> {
392
0
        self.index_store
393
0
            .register_remove_callback(callback.clone())?;
394
0
        self.content_store.register_remove_callback(callback)?;
395
0
        Ok(())
396
0
    }
397
}
398
399
default_health_status_indicator!(DedupStore);