Coverage Report

Created: 2024-10-22 12:33

/build/source/nativelink-store/src/dedup_store.rs
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::cmp;
16
use std::pin::Pin;
17
use std::sync::Arc;
18
19
use async_trait::async_trait;
20
use bincode::config::{FixintEncoding, WithOtherIntEncoding};
21
use bincode::{DefaultOptions, Options};
22
use futures::stream::{self, FuturesOrdered, StreamExt, TryStreamExt};
23
use nativelink_error::{make_err, Code, Error, ResultExt};
24
use nativelink_metric::MetricsComponent;
25
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
26
use nativelink_util::common::DigestInfo;
27
use nativelink_util::fastcdc::FastCDC;
28
use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator};
29
use nativelink_util::store_trait::{Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo};
30
use serde::{Deserialize, Serialize};
31
use tokio_util::codec::FramedRead;
32
use tokio_util::io::StreamReader;
33
use tracing::{event, Level};
34
35
use crate::cas_utils::is_zero_digest;
36
37
// NOTE: If these change update the comments in `stores.rs` to reflect
38
// the new defaults.
39
const DEFAULT_MIN_SIZE: u64 = 64 * 1024;
40
const DEFAULT_NORM_SIZE: u64 = 256 * 1024;
41
const DEFAULT_MAX_SIZE: u64 = 512 * 1024;
42
const DEFAULT_MAX_CONCURRENT_FETCH_PER_GET: usize = 10;
43
44
908
#[derive(Serialize, Deserialize, PartialEq, Debug, Default, Clone)]
45
pub struct DedupIndex {
46
    pub entries: Vec<DigestInfo>,
47
}
48
49
0
#[derive(MetricsComponent)]
50
pub struct DedupStore {
51
    #[metric(group = "index_store")]
52
    index_store: Store,
53
    #[metric(group = "content_store")]
54
    content_store: Store,
55
    fast_cdc_decoder: FastCDC,
56
    #[metric(help = "Maximum number of concurrent fetches per get")]
57
    max_concurrent_fetch_per_get: usize,
58
    bincode_options: WithOtherIntEncoding<DefaultOptions, FixintEncoding>,
59
}
60
61
impl DedupStore {
62
8
    pub fn new(
63
8
        config: &nativelink_config::stores::DedupStore,
64
8
        index_store: Store,
65
8
        content_store: Store,
66
8
    ) -> Result<Arc<Self>, Error> {
67
8
        let min_size = if config.min_size == 0 {
  Branch (67:27): [True: 0, False: 8]
  Branch (67:27): [Folded - Ignored]
68
0
            DEFAULT_MIN_SIZE
69
        } else {
70
8
            config.min_size as u64
71
        };
72
8
        let normal_size = if config.normal_size == 0 {
  Branch (72:30): [True: 0, False: 8]
  Branch (72:30): [Folded - Ignored]
73
0
            DEFAULT_NORM_SIZE
74
        } else {
75
8
            config.normal_size as u64
76
        };
77
8
        let max_size = if config.max_size == 0 {
  Branch (77:27): [True: 0, False: 8]
  Branch (77:27): [Folded - Ignored]
78
0
            DEFAULT_MAX_SIZE
79
        } else {
80
8
            config.max_size as u64
81
        };
82
8
        let max_concurrent_fetch_per_get = if config.max_concurrent_fetch_per_get == 0 {
  Branch (82:47): [True: 0, False: 8]
  Branch (82:47): [Folded - Ignored]
83
0
            DEFAULT_MAX_CONCURRENT_FETCH_PER_GET
84
        } else {
85
8
            config.max_concurrent_fetch_per_get as usize
86
        };
87
        Ok(Arc::new(Self {
88
8
            index_store,
89
8
            content_store,
90
8
            fast_cdc_decoder: FastCDC::new(
91
8
                usize::try_from(min_size).err_tip(|| 
"Could not convert min_size to usize"0
)
?0
,
92
8
                usize::try_from(normal_size)
93
8
                    .err_tip(|| 
"Could not convert normal_size to usize"0
)
?0
,
94
8
                usize::try_from(max_size).err_tip(|| 
"Could not convert max_size to usize"0
)
?0
,
95
            ),
96
8
            max_concurrent_fetch_per_get,
97
8
            bincode_options: DefaultOptions::new().with_fixint_encoding(),
98
        }))
99
8
    }
100
101
4
    async fn has(self: Pin<&Self>, key: StoreKey<'_>) -> Result<Option<u64>, Error> {
102
        // First we need to load the index that contains where the individual parts actually
103
        // can be fetched from.
104
3
        let index_entries = {
105
4
            let maybe_data = self
106
4
                .index_store
107
4
                .get_part_unchunked(key.borrow(), 0, None)
108
4
                .await
109
4
                .err_tip(|| 
"Failed to read index store in dedup store"1
);
110
4
            let 
data3
= match maybe_data {
111
1
                Err(e) => {
112
1
                    if e.code == Code::NotFound {
  Branch (112:24): [True: 1, False: 0]
  Branch (112:24): [Folded - Ignored]
113
1
                        return Ok(None);
114
0
                    }
115
0
                    return Err(e);
116
                }
117
3
                Ok(data) => data,
118
3
            };
119
3
120
3
            match self.bincode_options.deserialize::<DedupIndex>(&data) {
121
0
                Err(err) => {
122
0
                    event!(
123
0
                        Level::WARN,
124
                        ?key,
125
                        ?err,
126
0
                        "Failed to deserialize index in dedup store",
127
                    );
128
                    // We return the equivalent of NotFound here so the client is happy.
129
0
                    return Ok(None);
130
                }
131
3
                Ok(v) => v,
132
3
            }
133
3
        };
134
3
135
3
        let digests: Vec<_> = index_entries
136
3
            .entries
137
3
            .into_iter()
138
3
            .map(StoreKey::Digest)
139
3
            .collect();
140
3
        let mut sum = 0;
141
12
        for size in 
self.content_store.has_many(&digests)3
.
await0
?0
{
142
12
            let Some(
size11
) = size else {
  Branch (142:17): [True: 11, False: 1]
  Branch (142:17): [Folded - Ignored]
143
                // A part is missing so return None meaning not-found.
144
                // This will abort all in-flight queries related to this request.
145
1
                return Ok(None);
146
            };
147
11
            sum += size;
148
        }
149
2
        Ok(Some(sum))
150
4
    }
151
}
152
153
#[async_trait]
154
impl StoreDriver for DedupStore {
155
    async fn has_with_results(
156
        self: Pin<&Self>,
157
        digests: &[StoreKey<'_>],
158
        results: &mut [Option<u64>],
159
5
    ) -> Result<(), Error> {
160
5
        digests
161
5
            .iter()
162
5
            .zip(results.iter_mut())
163
5
            .map(|(key, result)| async move {
164
5
                if is_zero_digest(key.borrow()) {
  Branch (164:20): [True: 1, False: 4]
  Branch (164:20): [Folded - Ignored]
165
1
                    *result = Some(0);
166
1
                    return Ok(());
167
4
                }
168
4
169
4
                match self.has(key.borrow()).await {
170
4
                    Ok(maybe_size) => {
171
4
                        *result = maybe_size;
172
4
                        Ok(())
173
                    }
174
0
                    Err(err) => Err(err),
175
                }
176
10
            })
177
5
            .collect::<FuturesOrdered<_>>()
178
5
            .try_collect()
179
4
            .await
180
10
    }
181
182
    async fn update(
183
        self: Pin<&Self>,
184
        key: StoreKey<'_>,
185
        reader: DropCloserReadHalf,
186
        _size_info: UploadSizeInfo,
187
7
    ) -> Result<(), Error> {
188
7
        let mut bytes_reader = StreamReader::new(reader);
189
7
        let frame_reader = FramedRead::new(&mut bytes_reader, self.fast_cdc_decoder.clone());
190
7
        let index_entries = frame_reader
191
97
            .map(|r| r.err_tip(|| 
"Failed to decode frame from fast_cdc"0
))
192
97
            .map_ok(|frame| async move {
193
97
                let hash = blake3::hash(&frame[..]).into();
194
97
                let index_entry = DigestInfo::new(hash, frame.len() as u64);
195
97
                if self
  Branch (195:20): [True: 0, False: 97]
  Branch (195:20): [Folded - Ignored]
196
97
                    .content_store
197
97
                    .has(index_entry)
198
0
                    .await
199
97
                    .err_tip(|| 
"Failed to call .has() in DedupStore::update()"0
)
?0
200
97
                    .is_some()
201
                {
202
                    // If our store has this digest, we don't need to upload it.
203
0
                    return Result::<_, Error>::Ok(index_entry);
204
97
                }
205
97
                self.content_store
206
97
                    .update_oneshot(index_entry, frame)
207
0
                    .await
208
97
                    .err_tip(|| 
"Failed to update content store in dedup_store"0
)
?0
;
209
97
                Ok(index_entry)
210
194
            })
211
7
            .try_buffered(self.max_concurrent_fetch_per_get)
212
7
            .try_collect()
213
0
            .await?;
214
215
7
        let serialized_index = self
216
7
            .bincode_options
217
7
            .serialize(&DedupIndex {
218
7
                entries: index_entries,
219
7
            })
220
7
            .map_err(|e| {
221
0
                make_err!(
222
0
                    Code::Internal,
223
0
                    "Failed to serialize index in dedup_store : {:?}",
224
0
                    e
225
0
                )
226
7
            })
?0
;
227
228
7
        self.index_store
229
7
            .update_oneshot(key, serialized_index.into())
230
0
            .await
231
7
            .err_tip(|| 
"Failed to insert our index entry to index_store in dedup_store"0
)
?0
;
232
233
7
        Ok(())
234
14
    }
235
236
    async fn get_part(
237
        self: Pin<&Self>,
238
        key: StoreKey<'_>,
239
        writer: &mut DropCloserWriteHalf,
240
        offset: u64,
241
        length: Option<u64>,
242
935
    ) -> Result<(), Error> {
243
        // Special case for if a client tries to read zero bytes.
244
935
        if length == Some(0) {
  Branch (244:12): [True: 30, False: 905]
  Branch (244:12): [Folded - Ignored]
245
30
            writer
246
30
                .send_eof()
247
30
                .err_tip(|| 
"Failed to write EOF out from get_part dedup"0
)
?0
;
248
30
            return Ok(());
249
905
        }
250
        // First we need to download the index that contains where the individual parts actually
251
        // can be fetched from.
252
905
        let index_entries = {
253
905
            let data = self
254
905
                .index_store
255
905
                .get_part_unchunked(key, 0, None)
256
905
                .await
257
905
                .err_tip(|| 
"Failed to read index store in dedup store"0
)
?0
;
258
259
905
            self.bincode_options
260
905
                .deserialize::<DedupIndex>(&data)
261
905
                .map_err(|e| {
262
0
                    make_err!(
263
0
                        Code::Internal,
264
0
                        "Failed to deserialize index in dedup_store::get_part : {:?}",
265
0
                        e
266
0
                    )
267
905
                })
?0
268
        };
269
270
905
        let mut start_byte_in_stream: u64 = 0;
271
905
        let entries = {
272
905
            if offset == 0 && 
length.is_none()31
{
  Branch (272:16): [True: 31, False: 874]
  Branch (272:31): [True: 2, False: 29]
  Branch (272:16): [Folded - Ignored]
  Branch (272:31): [Folded - Ignored]
273
2
                index_entries.entries
274
            } else {
275
903
                let mut current_entries_sum = 0;
276
903
                let mut entries = Vec::with_capacity(index_entries.entries.len());
277
4.65k
                for 
entry4.15k
in index_entries.entries {
278
4.15k
                    let first_byte = current_entries_sum;
279
4.15k
                    let entry_size = entry.size_bytes();
280
4.15k
                    current_entries_sum += entry_size;
281
4.15k
                    // Filter any items who's end byte is before the first requested byte.
282
4.15k
                    if current_entries_sum <= offset {
  Branch (282:24): [True: 1.60k, False: 2.54k]
  Branch (282:24): [Folded - Ignored]
283
1.60k
                        start_byte_in_stream = current_entries_sum;
284
1.60k
                        continue;
285
2.54k
                    }
286
                    // If we are not going to read any bytes past the length we are done.
287
2.54k
                    if let Some(
length2.54k
) = length {
  Branch (287:28): [True: 2.54k, False: 8]
  Branch (287:28): [Folded - Ignored]
288
2.54k
                        if first_byte >= offset + length {
  Branch (288:28): [True: 407, False: 2.13k]
  Branch (288:28): [Folded - Ignored]
289
407
                            break;
290
2.13k
                        }
291
8
                    }
292
2.14k
                    entries.push(entry);
293
                }
294
903
                entries
295
            }
296
        };
297
298
        // Second we we create a stream of futures for each chunk, but buffer/limit them so only
299
        // `max_concurrent_fetch_per_get` will be executed at a time.
300
        // The results will be streamed out in the same order they are in the entries table.
301
        // The results will execute in a "window-like" fashion, meaning that if we limit to
302
        // 5 requests at a time, and request 3 is stalled, request 1 & 2 can be output and
303
        // request 4 & 5 can be executing (or finished) while waiting for 3 to finish.
304
        // Note: We will buffer our data here up to:
305
        // `config.max_size * config.max_concurrent_fetch_per_get` per `get_part()` request.
306
905
        let mut entries_stream = stream::iter(entries)
307
2.20k
            .map(move |index_entry| async move {
308
2.20k
                let 
data2.20k
= self
309
2.20k
                    .content_store
310
2.20k
                    .get_part_unchunked(index_entry, 0, None)
311
2.20k
                    .await
312
2.20k
                    .err_tip(|| 
"Failed to get_part in content_store in dedup_store"1
)
?1
;
313
314
2.20k
                Result::<_, Error>::Ok(data)
315
4.41k
            })
316
905
            .buffered(self.max_concurrent_fetch_per_get);
317
318
        // Stream out the buffered data one at a time and write the data to our writer stream.
319
        // In the event any of these error, we will abort early and abandon all the rest of the
320
        // streamed data.
321
        // Note: Need to take special care to ensure we send the proper slice of data requested.
322
905
        let mut bytes_to_skip = usize::try_from(offset - start_byte_in_stream)
323
905
            .err_tip(|| 
"Could not convert (offset - start_byte_in_stream) to usize"0
)
?0
;
324
905
        let mut bytes_to_send = usize::try_from(length.unwrap_or(u64::MAX - offset))
325
905
            .err_tip(|| 
"Could not convert length to usize"0
)
?0
;
326
3.11k
        while let Some(
result2.20k
) = entries_stream.next().
await1.06k
{
  Branch (326:19): [True: 2.20k, False: 904]
  Branch (326:19): [Folded - Ignored]
327
2.20k
            let 
mut data2.20k
= result.err_tip(||
"Inner store iterator closed early in DedupStore"1
)
?1
;
328
2.20k
            assert!(
329
2.20k
                bytes_to_skip <= data.len(),
330
0
                "Formula above must be wrong, {} > {}",
331
0
                bytes_to_skip,
332
0
                data.len()
333
            );
334
2.20k
            let end_pos = cmp::min(data.len(), bytes_to_send + bytes_to_skip);
335
2.20k
            if bytes_to_skip != 0 || 
data.len() > bytes_to_send1.48k
{
  Branch (335:16): [True: 727, False: 1.48k]
  Branch (335:38): [True: 306, False: 1.17k]
  Branch (335:16): [Folded - Ignored]
  Branch (335:38): [Folded - Ignored]
336
1.03k
                data = data.slice(bytes_to_skip..end_pos);
337
1.17k
            }
338
2.20k
            writer
339
2.20k
                .send(data)
340
434
                .await
341
2.20k
                .err_tip(|| 
"Failed to write data to get_part dedup"0
)
?0
;
342
2.20k
            bytes_to_send -= end_pos - bytes_to_skip;
343
2.20k
            bytes_to_skip = 0;
344
        }
345
346
        // Finish our stream by writing our EOF and shutdown the stream.
347
904
        writer
348
904
            .send_eof()
349
904
            .err_tip(|| 
"Failed to write EOF out from get_part dedup"0
)
?0
;
350
904
        Ok(())
351
1.87k
    }
352
353
0
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
354
0
        self
355
0
    }
356
357
0
    fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) {
358
0
        self
359
0
    }
360
361
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
362
0
        self
363
0
    }
364
}
365
366
default_health_status_indicator!(DedupStore);