Coverage Report

Created: 2025-03-08 07:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/fast_slow_store.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::borrow::BorrowMut;
16
use std::cmp::{max, min};
17
use std::ffi::OsString;
18
use std::ops::Range;
19
use std::pin::Pin;
20
use std::sync::atomic::{AtomicU64, Ordering};
21
use std::sync::{Arc, Weak};
22
23
use async_trait::async_trait;
24
use futures::{join, FutureExt};
25
use nativelink_config::stores::FastSlowSpec;
26
use nativelink_error::{make_err, Code, Error, ResultExt};
27
use nativelink_metric::MetricsComponent;
28
use nativelink_util::buf_channel::{
29
    make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf,
30
};
31
use nativelink_util::fs;
32
use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator};
33
use nativelink_util::store_trait::{
34
    slow_update_store_with_file, Store, StoreDriver, StoreKey, StoreLike, StoreOptimizations,
35
    UploadSizeInfo,
36
};
37
38
// TODO(blaise.bruer) This store needs to be evaluated for more efficient memory usage,
39
// there are many copies happening internally.
40
41
// TODO(blaise.bruer) We should consider copying the data in the background to allow the
42
// client to hang up while the data is buffered. An alternative is to possibly make a
43
// "BufferedStore" that could be placed on the "slow" store that would hang up early
44
// if data is in the buffer.
45
#[derive(MetricsComponent)]
46
pub struct FastSlowStore {
47
    #[metric(group = "fast_store")]
48
    fast_store: Store,
49
    #[metric(group = "slow_store")]
50
    slow_store: Store,
51
    weak_self: Weak<Self>,
52
    #[metric]
53
    metrics: FastSlowStoreMetrics,
54
}
55
56
impl FastSlowStore {
57
31
    pub fn new(_spec: &FastSlowSpec, fast_store: Store, slow_store: Store) -> Arc<Self> {
58
31
        Arc::new_cyclic(|weak_self| Self {
59
31
            fast_store,
60
31
            slow_store,
61
31
            weak_self: weak_self.clone(),
62
31
            metrics: FastSlowStoreMetrics::default(),
63
31
        })
64
31
    }
65
66
24
    pub const fn fast_store(&self) -> &Store {
67
24
        &self.fast_store
68
24
    }
69
70
0
    pub const fn slow_store(&self) -> &Store {
71
0
        &self.slow_store
72
0
    }
73
74
2
    pub fn get_arc(&self) -> Option<Arc<Self>> {
75
2
        self.weak_self.upgrade()
76
2
    }
77
78
    /// Ensure our fast store is populated. This should be kept as a low
79
    /// cost function. Since the data itself is shared and not copied it should be fairly
80
    /// low cost to just discard the data, but does cost a few mutex locks while
81
    /// streaming.
82
4
    pub async fn populate_fast_store(&self, key: StoreKey<'_>) -> Result<(), Error> {
83
4
        let maybe_size_info = self
84
4
            .fast_store
85
4
            .has(key.borrow())
86
4
            .await
87
4
            .err_tip(|| 
"While querying in populate_fast_store"0
)
?0
;
88
4
        if maybe_size_info.is_some() {
  Branch (88:12): [Folded - Ignored]
  Branch (88:12): [True: 0, False: 4]
  Branch (88:12): [Folded - Ignored]
89
0
            return Ok(());
90
4
        }
91
4
        // TODO(blaise.bruer) This is extremely inefficient, since we are just trying
92
4
        // to send the stream to /dev/null. Maybe we could instead make a version of
93
4
        // the stream that can send to the drain more efficiently?
94
4
        let (tx, mut rx) = make_buf_channel_pair();
95
4
        let drain_fut = async move {
96
8
            while !rx.recv().await
?0
.is_empty()
{}4
  Branch (96:19): [Folded - Ignored]
  Branch (96:19): [True: 4, False: 4]
  Branch (96:19): [Folded - Ignored]
97
4
            Ok(())
98
4
        };
99
4
        let (drain_res, get_res) = join!(drain_fut, StoreDriver::get(Pin::new(self), key, tx));
100
4
        get_res.err_tip(|| 
"Failed to populate()"0
).merge(drain_res)
101
4
    }
102
103
    /// Returns the range of bytes that should be sent given a slice bounds
104
    /// offset so the output range maps the `received_range.start` to 0.
105
    // TODO(allada) This should be put into utils, as this logic is used
106
    // elsewhere in the code.
107
21
    pub fn calculate_range(
108
21
        received_range: &Range<u64>,
109
21
        send_range: &Range<u64>,
110
21
    ) -> Result<Option<Range<usize>>, Error> {
111
21
        // Protect against subtraction overflow.
112
21
        if received_range.start >= received_range.end {
  Branch (112:12): [True: 0, False: 21]
  Branch (112:12): [Folded - Ignored]
113
0
            return Ok(None);
114
21
        }
115
21
116
21
        let start = max(received_range.start, send_range.start);
117
21
        let end = min(received_range.end, send_range.end);
118
21
        if received_range.contains(&start) && 
received_range.contains(&(end - 1))19
{
  Branch (118:12): [True: 19, False: 2]
  Branch (118:47): [True: 17, False: 2]
  Branch (118:12): [Folded - Ignored]
  Branch (118:47): [Folded - Ignored]
119
            // Offset both to the start of the received_range.
120
17
            let calculated_range_start = usize::try_from(start - received_range.start)
121
17
                .err_tip(|| 
"Could not convert (start - received_range.start) to usize"0
)
?0
;
122
17
            let calculated_range_end = usize::try_from(end - received_range.start)
123
17
                .err_tip(|| 
"Could not convert (end - received_range.start) to usize"0
)
?0
;
124
17
            Ok(Some(calculated_range_start..calculated_range_end))
125
        } else {
126
4
            Ok(None)
127
        }
128
21
    }
129
}
130
131
#[async_trait]
132
impl StoreDriver for FastSlowStore {
133
    async fn has_with_results(
134
        self: Pin<&Self>,
135
        key: &[StoreKey<'_>],
136
        results: &mut [Option<u64>],
137
5
    ) -> Result<(), Error> {
138
        // If our slow store is a noop store, it'll always return a 404,
139
        // so only check the fast store in such case.
140
5
        let slow_store = self.slow_store.inner_store::<StoreKey<'_>>(None);
141
5
        if slow_store.optimized_for(StoreOptimizations::NoopDownloads) {
  Branch (141:12): [True: 2, False: 3]
  Branch (141:12): [Folded - Ignored]
142
2
            return self.fast_store.has_with_results(key, results).await;
143
3
        }
144
3
        // Only check the slow store because if it's not there, then something
145
3
        // down stream might be unable to get it.  This should not affect
146
3
        // workers as they only use get() and a CAS can use an
147
3
        // ExistenceCacheStore to avoid the bottleneck.
148
3
        self.slow_store.has_with_results(key, results).await
149
10
    }
150
151
    async fn update(
152
        self: Pin<&Self>,
153
        key: StoreKey<'_>,
154
        mut reader: DropCloserReadHalf,
155
        size_info: UploadSizeInfo,
156
78
    ) -> Result<(), Error> {
157
        // If either one of our stores is a noop store, bypass the multiplexing
158
        // and just use the store that is not a noop store.
159
78
        let slow_store = self.slow_store.inner_store(Some(key.borrow()));
160
78
        if slow_store.optimized_for(StoreOptimizations::NoopUpdates) {
  Branch (160:12): [True: 0, False: 78]
  Branch (160:12): [Folded - Ignored]
161
0
            return self.fast_store.update(key, reader, size_info).await;
162
78
        }
163
78
        let fast_store = self.fast_store.inner_store(Some(key.borrow()));
164
78
        if fast_store.optimized_for(StoreOptimizations::NoopUpdates) {
  Branch (164:12): [True: 0, False: 78]
  Branch (164:12): [Folded - Ignored]
165
0
            return self.slow_store.update(key, reader, size_info).await;
166
78
        }
167
78
168
78
        let (mut fast_tx, fast_rx) = make_buf_channel_pair();
169
78
        let (mut slow_tx, slow_rx) = make_buf_channel_pair();
170
78
171
78
        let data_stream_fut = async move {
172
            loop {
173
126
                let buffer = reader
174
126
                    .recv()
175
126
                    .await
176
126
                    .err_tip(|| 
"Failed to read buffer in fastslow store"0
)
?0
;
177
126
                if buffer.is_empty() {
  Branch (177:20): [True: 78, False: 48]
  Branch (177:20): [Folded - Ignored]
178
                    // EOF received.
179
78
                    fast_tx.send_eof().err_tip(|| {
180
0
                        "Failed to write eof to fast store in fast_slow store update"
181
78
                    })
?0
;
182
78
                    slow_tx
183
78
                        .send_eof()
184
78
                        .err_tip(|| 
"Failed to write eof to writer in fast_slow store update"0
)
?0
;
185
78
                    return Result::<(), Error>::Ok(());
186
48
                }
187
188
48
                let (fast_result, slow_result) =
189
48
                    join!(fast_tx.send(buffer.clone()), slow_tx.send(buffer));
190
48
                fast_result
191
48
                    .map_err(|e| {
192
0
                        make_err!(
193
0
                            Code::Internal,
194
0
                            "Failed to send message to fast_store in fast_slow_store {:?}",
195
0
                            e
196
0
                        )
197
48
                    })
198
48
                    .merge(slow_result.map_err(|e| {
199
0
                        make_err!(
200
0
                            Code::Internal,
201
0
                            "Failed to send message to slow_store in fast_slow store {:?}",
202
0
                            e
203
0
                        )
204
48
                    }))
?0
;
205
            }
206
78
        };
207
208
78
        let fast_store_fut = self.fast_store.update(key.borrow(), fast_rx, size_info);
209
78
        let slow_store_fut = self.slow_store.update(key.borrow(), slow_rx, size_info);
210
211
78
        let (data_stream_res, fast_res, slow_res) =
212
78
            join!(data_stream_fut, fast_store_fut, slow_store_fut);
213
78
        data_stream_res.merge(fast_res).merge(slow_res)
?0
;
214
78
        Ok(())
215
156
    }
216
217
    /// FastSlowStore has optimiations for dealing with files.
218
0
    fn optimized_for(&self, optimization: StoreOptimizations) -> bool {
219
0
        optimization == StoreOptimizations::FileUpdates
220
0
    }
221
222
    /// Optimized variation to consume the file if one of the stores is a
223
    /// filesystem store. This makes the operation a move instead of a copy
224
    /// dramatically increasing performance for large files.
225
    async fn update_with_whole_file(
226
        self: Pin<&Self>,
227
        key: StoreKey<'_>,
228
        path: OsString,
229
        mut file: fs::FileSlot,
230
        upload_size: UploadSizeInfo,
231
7
    ) -> Result<Option<fs::FileSlot>, Error> {
232
7
        if self
  Branch (232:12): [True: 7, False: 0]
  Branch (232:12): [Folded - Ignored]
233
7
            .fast_store
234
7
            .optimized_for(StoreOptimizations::FileUpdates)
235
        {
236
7
            if !self
  Branch (236:16): [True: 7, False: 0]
  Branch (236:16): [Folded - Ignored]
237
7
                .slow_store
238
7
                .optimized_for(StoreOptimizations::NoopUpdates)
239
            {
240
7
                slow_update_store_with_file(
241
7
                    self.slow_store.as_store_driver_pin(),
242
7
                    key.borrow(),
243
7
                    &mut file,
244
7
                    upload_size,
245
7
                )
246
7
                .await
247
7
                .err_tip(|| 
"In FastSlowStore::update_with_whole_file slow_store"0
)
?0
;
248
0
            }
249
7
            return self
250
7
                .fast_store
251
7
                .update_with_whole_file(key, path, file, upload_size)
252
7
                .await;
253
0
        }
254
0
255
0
        if self
  Branch (255:12): [True: 0, False: 0]
  Branch (255:12): [Folded - Ignored]
256
0
            .slow_store
257
0
            .optimized_for(StoreOptimizations::FileUpdates)
258
        {
259
0
            if !self
  Branch (259:16): [True: 0, False: 0]
  Branch (259:16): [Folded - Ignored]
260
0
                .fast_store
261
0
                .optimized_for(StoreOptimizations::NoopUpdates)
262
            {
263
0
                slow_update_store_with_file(
264
0
                    self.fast_store.as_store_driver_pin(),
265
0
                    key.borrow(),
266
0
                    &mut file,
267
0
                    upload_size,
268
0
                )
269
0
                .await
270
0
                .err_tip(|| "In FastSlowStore::update_with_whole_file fast_store")?;
271
0
            }
272
0
            return self
273
0
                .slow_store
274
0
                .update_with_whole_file(key, path, file, upload_size)
275
0
                .await;
276
0
        }
277
0
278
0
        slow_update_store_with_file(self, key, &mut file, upload_size)
279
0
            .await
280
0
            .err_tip(|| "In FastSlowStore::update_with_whole_file")?;
281
0
        Ok(Some(file))
282
14
    }
283
284
    async fn get_part(
285
        self: Pin<&Self>,
286
        key: StoreKey<'_>,
287
        writer: &mut DropCloserWriteHalf,
288
        offset: u64,
289
        length: Option<u64>,
290
67
    ) -> Result<(), Error> {
291
        // TODO(blaise.bruer) Investigate if we should maybe ignore errors here instead of
292
        // forwarding the up.
293
67
        if self.fast_store.has(key.borrow()).await
?0
.is_some() {
  Branch (293:12): [True: 55, False: 12]
  Branch (293:12): [Folded - Ignored]
294
55
            self.metrics
295
55
                .fast_store_hit_count
296
55
                .fetch_add(1, Ordering::Acquire);
297
55
            self.fast_store
298
55
                .get_part(key, writer.borrow_mut(), offset, length)
299
55
                .await
?0
;
300
55
            self.metrics
301
55
                .fast_store_downloaded_bytes
302
55
                .fetch_add(writer.get_bytes_written(), Ordering::Acquire);
303
55
            return Ok(());
304
12
        }
305
306
12
        let sz = self
307
12
            .slow_store
308
12
            .has(key.borrow())
309
12
            .await
310
12
            .err_tip(|| 
"Failed to run has() on slow store"0
)
?0
311
12
            .ok_or_else(|| {
312
0
                make_err!(
313
0
                    Code::NotFound,
314
0
                    "Object {} not found in either fast or slow store",
315
0
                    key.as_str()
316
0
                )
317
12
            })
?0
;
318
12
        self.metrics
319
12
            .slow_store_hit_count
320
12
            .fetch_add(1, Ordering::Acquire);
321
12
322
12
        let send_range = offset..length.map_or(u64::MAX, |length| 
length + offset7
);
323
12
        let mut bytes_received: u64 = 0;
324
12
325
12
        let (mut fast_tx, fast_rx) = make_buf_channel_pair();
326
12
        let (slow_tx, mut slow_rx) = make_buf_channel_pair();
327
12
        let data_stream_fut = async move {
328
12
            let mut writer_pin = Pin::new(writer);
329
            loop {
330
23
                let output_buf = slow_rx
331
23
                    .recv()
332
23
                    .await
333
23
                    .err_tip(|| 
"Failed to read data data buffer from slow store"0
)
?0
;
334
23
                if output_buf.is_empty() {
  Branch (334:20): [True: 12, False: 11]
  Branch (334:20): [Folded - Ignored]
335
                    // Write out our EOF.
336
                    // We are dropped as soon as we send_eof to writer_pin, so
337
                    // we wait until we've finished all of our joins to do that.
338
12
                    let fast_res = fast_tx.send_eof();
339
12
                    return Ok::<_, Error>((fast_res, writer_pin));
340
11
                }
341
11
                let output_buf_len = u64::try_from(output_buf.len())
342
11
                    .err_tip(|| 
"Could not output_buf.len() to u64"0
)
?0
;
343
11
                self.metrics
344
11
                    .slow_store_downloaded_bytes
345
11
                    .fetch_add(output_buf_len, Ordering::Acquire);
346
347
11
                let writer_fut = if let Some(range) = Self::calculate_range(
  Branch (347:41): [True: 11, False: 0]
  Branch (347:41): [Folded - Ignored]
348
11
                    &(bytes_received..bytes_received + output_buf_len),
349
11
                    &send_range,
350
11
                )
?0
{
351
11
                    writer_pin.send(output_buf.slice(range)).right_future()
352
                } else {
353
0
                    futures::future::ready(Ok(())).left_future()
354
                };
355
11
                bytes_received += output_buf_len;
356
357
11
                let (fast_tx_res, writer_res) = join!(fast_tx.send(output_buf), writer_fut);
358
11
                fast_tx_res.err_tip(|| 
"Failed to write to fast store in fast_slow store"0
)
?0
;
359
11
                writer_res.err_tip(|| 
"Failed to write result to writer in fast_slow store"0
)
?0
;
360
            }
361
12
        };
362
363
12
        let slow_store_fut = self.slow_store.get(key.borrow(), slow_tx);
364
12
        let fast_store_fut =
365
12
            self.fast_store
366
12
                .update(key.borrow(), fast_rx, UploadSizeInfo::ExactSize(sz));
367
368
12
        let (data_stream_res, slow_res, fast_res) =
369
12
            join!(data_stream_fut, slow_store_fut, fast_store_fut);
370
12
        match data_stream_res {
371
12
            Ok((fast_eof_res, mut writer_pin)) =>
372
12
            // Sending the EOF will drop us almost immediately in bytestream_server
373
12
            // so we perform it as the very last action in this method.
374
12
            {
375
12
                fast_eof_res
376
12
                    .merge(fast_res)
377
12
                    .merge(slow_res)
378
12
                    .merge(writer_pin.send_eof())
379
            }
380
0
            Err(err) => fast_res.merge(slow_res).merge(Err(err)),
381
        }
382
134
    }
383
384
2
    fn inner_store(&self, _key: Option<StoreKey>) -> &dyn StoreDriver {
385
2
        self
386
2
    }
387
388
2
    fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) {
389
2
        self
390
2
    }
391
392
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
393
0
        self
394
0
    }
395
}
396
397
#[derive(Default, MetricsComponent)]
398
struct FastSlowStoreMetrics {
399
    #[metric(help = "Hit count for the fast store")]
400
    fast_store_hit_count: AtomicU64,
401
    #[metric(help = "Downloaded bytes from the fast store")]
402
    fast_store_downloaded_bytes: AtomicU64,
403
    #[metric(help = "Hit count for the slow store")]
404
    slow_store_hit_count: AtomicU64,
405
    #[metric(help = "Downloaded bytes from the slow store")]
406
    slow_store_downloaded_bytes: AtomicU64,
407
}
408
409
default_health_status_indicator!(FastSlowStore);