Coverage Report

Created: 2024-11-20 10:13

/build/source/nativelink-store/src/dedup_store.rs
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::cmp;
16
use std::pin::Pin;
17
use std::sync::Arc;
18
19
use async_trait::async_trait;
20
use bincode::config::{FixintEncoding, WithOtherIntEncoding};
21
use bincode::{DefaultOptions, Options};
22
use futures::stream::{self, FuturesOrdered, StreamExt, TryStreamExt};
23
use nativelink_config::stores::DedupSpec;
24
use nativelink_error::{make_err, Code, Error, ResultExt};
25
use nativelink_metric::MetricsComponent;
26
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
27
use nativelink_util::common::DigestInfo;
28
use nativelink_util::fastcdc::FastCDC;
29
use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator};
30
use nativelink_util::store_trait::{Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo};
31
use serde::{Deserialize, Serialize};
32
use tokio_util::codec::FramedRead;
33
use tokio_util::io::StreamReader;
34
use tracing::{event, Level};
35
36
use crate::cas_utils::is_zero_digest;
37
38
// NOTE: If these change update the comments in `stores.rs` to reflect
39
// the new defaults.
40
const DEFAULT_MIN_SIZE: u64 = 64 * 1024;
41
const DEFAULT_NORM_SIZE: u64 = 256 * 1024;
42
const DEFAULT_MAX_SIZE: u64 = 512 * 1024;
43
const DEFAULT_MAX_CONCURRENT_FETCH_PER_GET: usize = 10;
44
45
908
#[derive(Serialize, Deserialize, PartialEq, Debug, Default, Clone)]
46
pub struct DedupIndex {
47
    pub entries: Vec<DigestInfo>,
48
}
49
50
0
#[derive(MetricsComponent)]
51
pub struct DedupStore {
52
    #[metric(group = "index_store")]
53
    index_store: Store,
54
    #[metric(group = "content_store")]
55
    content_store: Store,
56
    fast_cdc_decoder: FastCDC,
57
    #[metric(help = "Maximum number of concurrent fetches per get")]
58
    max_concurrent_fetch_per_get: usize,
59
    bincode_options: WithOtherIntEncoding<DefaultOptions, FixintEncoding>,
60
}
61
62
impl DedupStore {
63
8
    pub fn new(
64
8
        spec: &DedupSpec,
65
8
        index_store: Store,
66
8
        content_store: Store,
67
8
    ) -> Result<Arc<Self>, Error> {
68
8
        let min_size = if spec.min_size == 0 {
  Branch (68:27): [True: 0, False: 8]
  Branch (68:27): [Folded - Ignored]
69
0
            DEFAULT_MIN_SIZE
70
        } else {
71
8
            u64::from(spec.min_size)
72
        };
73
8
        let normal_size = if spec.normal_size == 0 {
  Branch (73:30): [True: 0, False: 8]
  Branch (73:30): [Folded - Ignored]
74
0
            DEFAULT_NORM_SIZE
75
        } else {
76
8
            u64::from(spec.normal_size)
77
        };
78
8
        let max_size = if spec.max_size == 0 {
  Branch (78:27): [True: 0, False: 8]
  Branch (78:27): [Folded - Ignored]
79
0
            DEFAULT_MAX_SIZE
80
        } else {
81
8
            u64::from(spec.max_size)
82
        };
83
8
        let max_concurrent_fetch_per_get = if spec.max_concurrent_fetch_per_get == 0 {
  Branch (83:47): [True: 0, False: 8]
  Branch (83:47): [Folded - Ignored]
84
0
            DEFAULT_MAX_CONCURRENT_FETCH_PER_GET
85
        } else {
86
8
            spec.max_concurrent_fetch_per_get as usize
87
        };
88
        Ok(Arc::new(Self {
89
8
            index_store,
90
8
            content_store,
91
8
            fast_cdc_decoder: FastCDC::new(
92
8
                usize::try_from(min_size).err_tip(|| 
"Could not convert min_size to usize"0
)
?0
,
93
8
                usize::try_from(normal_size)
94
8
                    .err_tip(|| 
"Could not convert normal_size to usize"0
)
?0
,
95
8
                usize::try_from(max_size).err_tip(|| 
"Could not convert max_size to usize"0
)
?0
,
96
            ),
97
8
            max_concurrent_fetch_per_get,
98
8
            bincode_options: DefaultOptions::new().with_fixint_encoding(),
99
        }))
100
8
    }
101
102
4
    async fn has(self: Pin<&Self>, key: StoreKey<'_>) -> Result<Option<u64>, Error> {
103
        // First we need to load the index that contains where the individual parts actually
104
        // can be fetched from.
105
3
        let index_entries = {
106
4
            let maybe_data = self
107
4
                .index_store
108
4
                .get_part_unchunked(key.borrow(), 0, None)
109
4
                .await
110
4
                .err_tip(|| 
"Failed to read index store in dedup store"1
);
111
4
            let 
data3
= match maybe_data {
112
1
                Err(e) => {
113
1
                    if e.code == Code::NotFound {
  Branch (113:24): [True: 1, False: 0]
  Branch (113:24): [Folded - Ignored]
114
1
                        return Ok(None);
115
0
                    }
116
0
                    return Err(e);
117
                }
118
3
                Ok(data) => data,
119
3
            };
120
3
121
3
            match self.bincode_options.deserialize::<DedupIndex>(&data) {
122
0
                Err(err) => {
123
0
                    event!(
124
0
                        Level::WARN,
125
                        ?key,
126
                        ?err,
127
0
                        "Failed to deserialize index in dedup store",
128
                    );
129
                    // We return the equivalent of NotFound here so the client is happy.
130
0
                    return Ok(None);
131
                }
132
3
                Ok(v) => v,
133
3
            }
134
3
        };
135
3
136
3
        let digests: Vec<_> = index_entries
137
3
            .entries
138
3
            .into_iter()
139
3
            .map(StoreKey::Digest)
140
3
            .collect();
141
3
        let mut sum = 0;
142
12
        for size in 
self.content_store.has_many(&digests)3
.
await0
?0
{
143
12
            let Some(
size11
) = size else {
  Branch (143:17): [True: 11, False: 1]
  Branch (143:17): [Folded - Ignored]
144
                // A part is missing so return None meaning not-found.
145
                // This will abort all in-flight queries related to this request.
146
1
                return Ok(None);
147
            };
148
11
            sum += size;
149
        }
150
2
        Ok(Some(sum))
151
4
    }
152
}
153
154
#[async_trait]
155
impl StoreDriver for DedupStore {
156
    async fn has_with_results(
157
        self: Pin<&Self>,
158
        digests: &[StoreKey<'_>],
159
        results: &mut [Option<u64>],
160
5
    ) -> Result<(), Error> {
161
5
        digests
162
5
            .iter()
163
5
            .zip(results.iter_mut())
164
5
            .map(|(key, result)| async move {
165
5
                if is_zero_digest(key.borrow()) {
  Branch (165:20): [True: 1, False: 4]
  Branch (165:20): [Folded - Ignored]
166
1
                    *result = Some(0);
167
1
                    return Ok(());
168
4
                }
169
4
170
4
                match self.has(key.borrow()).await {
171
4
                    Ok(maybe_size) => {
172
4
                        *result = maybe_size;
173
4
                        Ok(())
174
                    }
175
0
                    Err(err) => Err(err),
176
                }
177
10
            })
178
5
            .collect::<FuturesOrdered<_>>()
179
5
            .try_collect()
180
4
            .await
181
10
    }
182
183
    async fn update(
184
        self: Pin<&Self>,
185
        key: StoreKey<'_>,
186
        reader: DropCloserReadHalf,
187
        _size_info: UploadSizeInfo,
188
7
    ) -> Result<(), Error> {
189
7
        let mut bytes_reader = StreamReader::new(reader);
190
7
        let frame_reader = FramedRead::new(&mut bytes_reader, self.fast_cdc_decoder.clone());
191
7
        let index_entries = frame_reader
192
97
            .map(|r| r.err_tip(|| 
"Failed to decode frame from fast_cdc"0
))
193
97
            .map_ok(|frame| async move {
194
97
                let hash = blake3::hash(&frame[..]).into();
195
97
                let index_entry = DigestInfo::new(hash, frame.len() as u64);
196
97
                if self
  Branch (196:20): [True: 0, False: 97]
  Branch (196:20): [Folded - Ignored]
197
97
                    .content_store
198
97
                    .has(index_entry)
199
0
                    .await
200
97
                    .err_tip(|| 
"Failed to call .has() in DedupStore::update()"0
)
?0
201
97
                    .is_some()
202
                {
203
                    // If our store has this digest, we don't need to upload it.
204
0
                    return Result::<_, Error>::Ok(index_entry);
205
97
                }
206
97
                self.content_store
207
97
                    .update_oneshot(index_entry, frame)
208
0
                    .await
209
97
                    .err_tip(|| 
"Failed to update content store in dedup_store"0
)
?0
;
210
97
                Ok(index_entry)
211
194
            })
212
7
            .try_buffered(self.max_concurrent_fetch_per_get)
213
7
            .try_collect()
214
0
            .await?;
215
216
7
        let serialized_index = self
217
7
            .bincode_options
218
7
            .serialize(&DedupIndex {
219
7
                entries: index_entries,
220
7
            })
221
7
            .map_err(|e| {
222
0
                make_err!(
223
0
                    Code::Internal,
224
0
                    "Failed to serialize index in dedup_store : {:?}",
225
0
                    e
226
0
                )
227
7
            })
?0
;
228
229
7
        self.index_store
230
7
            .update_oneshot(key, serialized_index.into())
231
0
            .await
232
7
            .err_tip(|| 
"Failed to insert our index entry to index_store in dedup_store"0
)
?0
;
233
234
7
        Ok(())
235
14
    }
236
237
    async fn get_part(
238
        self: Pin<&Self>,
239
        key: StoreKey<'_>,
240
        writer: &mut DropCloserWriteHalf,
241
        offset: u64,
242
        length: Option<u64>,
243
935
    ) -> Result<(), Error> {
244
        // Special case for if a client tries to read zero bytes.
245
935
        if length == Some(0) {
  Branch (245:12): [True: 30, False: 905]
  Branch (245:12): [Folded - Ignored]
246
30
            writer
247
30
                .send_eof()
248
30
                .err_tip(|| 
"Failed to write EOF out from get_part dedup"0
)
?0
;
249
30
            return Ok(());
250
905
        }
251
        // First we need to download the index that contains where the individual parts actually
252
        // can be fetched from.
253
905
        let index_entries = {
254
905
            let data = self
255
905
                .index_store
256
905
                .get_part_unchunked(key, 0, None)
257
905
                .await
258
905
                .err_tip(|| 
"Failed to read index store in dedup store"0
)
?0
;
259
260
905
            self.bincode_options
261
905
                .deserialize::<DedupIndex>(&data)
262
905
                .map_err(|e| {
263
0
                    make_err!(
264
0
                        Code::Internal,
265
0
                        "Failed to deserialize index in dedup_store::get_part : {:?}",
266
0
                        e
267
0
                    )
268
905
                })
?0
269
        };
270
271
905
        let mut start_byte_in_stream: u64 = 0;
272
905
        let entries = {
273
905
            if offset == 0 && 
length.is_none()31
{
  Branch (273:16): [True: 31, False: 874]
  Branch (273:31): [True: 2, False: 29]
  Branch (273:16): [Folded - Ignored]
  Branch (273:31): [Folded - Ignored]
274
2
                index_entries.entries
275
            } else {
276
903
                let mut current_entries_sum = 0;
277
903
                let mut entries = Vec::with_capacity(index_entries.entries.len());
278
4.65k
                for 
entry4.15k
in index_entries.entries {
279
4.15k
                    let first_byte = current_entries_sum;
280
4.15k
                    let entry_size = entry.size_bytes();
281
4.15k
                    current_entries_sum += entry_size;
282
4.15k
                    // Filter any items who's end byte is before the first requested byte.
283
4.15k
                    if current_entries_sum <= offset {
  Branch (283:24): [True: 1.60k, False: 2.54k]
  Branch (283:24): [Folded - Ignored]
284
1.60k
                        start_byte_in_stream = current_entries_sum;
285
1.60k
                        continue;
286
2.54k
                    }
287
                    // If we are not going to read any bytes past the length we are done.
288
2.54k
                    if let Some(
length2.54k
) = length {
  Branch (288:28): [True: 2.54k, False: 8]
  Branch (288:28): [Folded - Ignored]
289
2.54k
                        if first_byte >= offset + length {
  Branch (289:28): [True: 407, False: 2.13k]
  Branch (289:28): [Folded - Ignored]
290
407
                            break;
291
2.13k
                        }
292
8
                    }
293
2.14k
                    entries.push(entry);
294
                }
295
903
                entries
296
            }
297
        };
298
299
        // Second we we create a stream of futures for each chunk, but buffer/limit them so only
300
        // `max_concurrent_fetch_per_get` will be executed at a time.
301
        // The results will be streamed out in the same order they are in the entries table.
302
        // The results will execute in a "window-like" fashion, meaning that if we limit to
303
        // 5 requests at a time, and request 3 is stalled, request 1 & 2 can be output and
304
        // request 4 & 5 can be executing (or finished) while waiting for 3 to finish.
305
        // Note: We will buffer our data here up to:
306
        // `spec.max_size * spec.max_concurrent_fetch_per_get` per `get_part()` request.
307
905
        let mut entries_stream = stream::iter(entries)
308
2.20k
            .map(move |index_entry| async move {
309
2.20k
                let 
data2.20k
= self
310
2.20k
                    .content_store
311
2.20k
                    .get_part_unchunked(index_entry, 0, None)
312
2.20k
                    .await
313
2.20k
                    .err_tip(|| 
"Failed to get_part in content_store in dedup_store"1
)
?1
;
314
315
2.20k
                Result::<_, Error>::Ok(data)
316
4.41k
            })
317
905
            .buffered(self.max_concurrent_fetch_per_get);
318
319
        // Stream out the buffered data one at a time and write the data to our writer stream.
320
        // In the event any of these error, we will abort early and abandon all the rest of the
321
        // streamed data.
322
        // Note: Need to take special care to ensure we send the proper slice of data requested.
323
905
        let mut bytes_to_skip = usize::try_from(offset - start_byte_in_stream)
324
905
            .err_tip(|| 
"Could not convert (offset - start_byte_in_stream) to usize"0
)
?0
;
325
905
        let mut bytes_to_send = usize::try_from(length.unwrap_or(u64::MAX - offset))
326
905
            .err_tip(|| 
"Could not convert length to usize"0
)
?0
;
327
3.11k
        while let Some(
result2.20k
) = entries_stream.next().
await1.06k
{
  Branch (327:19): [True: 2.20k, False: 904]
  Branch (327:19): [Folded - Ignored]
328
2.20k
            let 
mut data2.20k
= result.err_tip(||
"Inner store iterator closed early in DedupStore"1
)
?1
;
329
2.20k
            assert!(
330
2.20k
                bytes_to_skip <= data.len(),
331
0
                "Formula above must be wrong, {} > {}",
332
0
                bytes_to_skip,
333
0
                data.len()
334
            );
335
2.20k
            let end_pos = cmp::min(data.len(), bytes_to_send + bytes_to_skip);
336
2.20k
            if bytes_to_skip != 0 || 
data.len() > bytes_to_send1.48k
{
  Branch (336:16): [True: 727, False: 1.48k]
  Branch (336:38): [True: 306, False: 1.17k]
  Branch (336:16): [Folded - Ignored]
  Branch (336:38): [Folded - Ignored]
337
1.03k
                data = data.slice(bytes_to_skip..end_pos);
338
1.17k
            }
339
2.20k
            writer
340
2.20k
                .send(data)
341
434
                .await
342
2.20k
                .err_tip(|| 
"Failed to write data to get_part dedup"0
)
?0
;
343
2.20k
            bytes_to_send -= end_pos - bytes_to_skip;
344
2.20k
            bytes_to_skip = 0;
345
        }
346
347
        // Finish our stream by writing our EOF and shutdown the stream.
348
904
        writer
349
904
            .send_eof()
350
904
            .err_tip(|| 
"Failed to write EOF out from get_part dedup"0
)
?0
;
351
904
        Ok(())
352
1.87k
    }
353
354
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
355
0
        self
356
0
    }
357
358
0
    fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) {
359
0
        self
360
0
    }
361
362
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
363
0
        self
364
0
    }
365
}
366
367
default_health_status_indicator!(DedupStore);