Coverage Report

Created: 2025-09-16 19:42

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/fast_slow_store.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::borrow::BorrowMut;
16
use core::cmp::{max, min};
17
use core::ops::Range;
18
use core::pin::Pin;
19
use core::sync::atomic::{AtomicU64, Ordering};
20
use std::collections::HashMap;
21
use std::ffi::OsString;
22
use std::sync::{Arc, Weak};
23
24
use async_trait::async_trait;
25
use futures::{FutureExt, join};
26
use nativelink_config::stores::FastSlowSpec;
27
use nativelink_error::{Code, Error, ResultExt, make_err};
28
use nativelink_metric::MetricsComponent;
29
use nativelink_util::buf_channel::{
30
    DropCloserReadHalf, DropCloserWriteHalf, make_buf_channel_pair,
31
};
32
use nativelink_util::fs;
33
use nativelink_util::health_utils::{HealthStatusIndicator, default_health_status_indicator};
34
use nativelink_util::store_trait::{
35
    Store, StoreDriver, StoreKey, StoreLike, StoreOptimizations, UploadSizeInfo,
36
    slow_update_store_with_file,
37
};
38
use parking_lot::Mutex;
39
use tokio::sync::OnceCell;
40
41
// TODO(palfrey) This store needs to be evaluated for more efficient memory usage,
42
// there are many copies happening internally.
43
44
type Loader = Arc<OnceCell<()>>;
45
46
// TODO(palfrey) We should consider copying the data in the background to allow the
47
// client to hang up while the data is buffered. An alternative is to possibly make a
48
// "BufferedStore" that could be placed on the "slow" store that would hang up early
49
// if data is in the buffer.
50
#[derive(Debug, MetricsComponent)]
51
pub struct FastSlowStore {
52
    #[metric(group = "fast_store")]
53
    fast_store: Store,
54
    #[metric(group = "slow_store")]
55
    slow_store: Store,
56
    weak_self: Weak<Self>,
57
    #[metric]
58
    metrics: FastSlowStoreMetrics,
59
    // De-duplicate the populate_fast_store requests.
60
    populating_digests: Mutex<HashMap<StoreKey<'static>, Loader>>,
61
}
62
63
impl FastSlowStore {
64
33
    pub fn new(_spec: &FastSlowSpec, fast_store: Store, slow_store: Store) -> Arc<Self> {
65
33
        Arc::new_cyclic(|weak_self| Self {
66
33
            fast_store,
67
33
            slow_store,
68
33
            weak_self: weak_self.clone(),
69
33
            metrics: FastSlowStoreMetrics::default(),
70
33
            populating_digests: Mutex::new(HashMap::new()),
71
33
        })
72
33
    }
73
74
0
    pub const fn fast_store(&self) -> &Store {
75
0
        &self.fast_store
76
0
    }
77
78
0
    pub const fn slow_store(&self) -> &Store {
79
0
        &self.slow_store
80
0
    }
81
82
2
    pub fn get_arc(&self) -> Option<Arc<Self>> {
83
2
        self.weak_self.upgrade()
84
2
    }
85
86
    /// Ensure our fast store is populated. This should be kept as a low
87
    /// cost function. Since the data itself is shared and not copied it should be fairly
88
    /// low cost to just discard the data, but does cost a few mutex locks while
89
    /// streaming.
90
4
    
pub async fn populate_fast_store(&self, key: StoreKey<'_>) -> Result<(), Error>0
{
91
4
        let maybe_size_info = self
92
4
            .fast_store
93
4
            .has(key.borrow())
94
4
            .await
95
4
            .err_tip(|| "While querying in populate_fast_store")
?0
;
96
4
        if maybe_size_info.is_some() {
  Branch (96:12): [Folded - Ignored]
  Branch (96:12): [True: 0, False: 4]
  Branch (96:12): [Folded - Ignored]
97
0
            return Ok(());
98
4
        }
99
        // Get a single loader instance that's used to populate the fast store
100
        // for this digest.  If another request comes in then it's de-duplicated.
101
4
        let loader = match self
102
4
            .populating_digests
103
4
            .lock()
104
4
            .entry(key.borrow().into_owned())
105
        {
106
0
            std::collections::hash_map::Entry::Occupied(occupied_entry) => {
107
0
                occupied_entry.get().clone()
108
            }
109
4
            std::collections::hash_map::Entry::Vacant(vacant_entry) => {
110
4
                vacant_entry.insert(Arc::new(OnceCell::new())).clone()
111
            }
112
        };
113
4
        loader
114
8
            .
get_or_try_init4
(async move || {
115
                // TODO(palfrey) This is extremely inefficient, since we are just trying
116
                // to send the stream to /dev/null. Maybe we could instead make a version of
117
                // the stream that can send to the drain more efficiently?
118
4
                let (tx, mut rx) = make_buf_channel_pair();
119
4
                let drain_fut = async move {
120
8
                    while !rx.recv().await
?0
.is_empty()
{}4
  Branch (120:27): [Folded - Ignored]
  Branch (120:27): [True: 4, False: 4]
  Branch (120:27): [Folded - Ignored]
121
4
                    Ok(())
122
4
                };
123
4
                let key_owned = key.borrow().into_owned();
124
4
                let (drain_res, get_res) =
125
4
                    join!(drain_fut, StoreDriver::get(Pin::new(self), key, tx));
126
                // Now the store is populated, remove the loader.
127
4
                self.populating_digests.lock().remove_entry(&key_owned);
128
4
                get_res.err_tip(|| "Failed to populate()").merge(drain_res)
129
8
            })
130
4
            .await
?0
;
131
4
        Ok(())
132
4
    }
133
134
    /// Returns the range of bytes that should be sent given a slice bounds
135
    /// offset so the output range maps the `received_range.start` to 0.
136
    // TODO(palfrey) This should be put into utils, as this logic is used
137
    // elsewhere in the code.
138
21
    pub fn calculate_range(
139
21
        received_range: &Range<u64>,
140
21
        send_range: &Range<u64>,
141
21
    ) -> Result<Option<Range<usize>>, Error> {
142
        // Protect against subtraction overflow.
143
21
        if received_range.start >= received_range.end {
  Branch (143:12): [True: 0, False: 21]
  Branch (143:12): [Folded - Ignored]
144
0
            return Ok(None);
145
21
        }
146
147
21
        let start = max(received_range.start, send_range.start);
148
21
        let end = min(received_range.end, send_range.end);
149
21
        if received_range.contains(&start) && 
received_range19
.
contains19
(
&(end - 1)19
) {
  Branch (149:12): [True: 19, False: 2]
  Branch (149:47): [True: 17, False: 2]
  Branch (149:12): [Folded - Ignored]
  Branch (149:47): [Folded - Ignored]
150
            // Offset both to the start of the received_range.
151
17
            let calculated_range_start = usize::try_from(start - received_range.start)
152
17
                .err_tip(|| "Could not convert (start - received_range.start) to usize")
?0
;
153
17
            let calculated_range_end = usize::try_from(end - received_range.start)
154
17
                .err_tip(|| "Could not convert (end - received_range.start) to usize")
?0
;
155
17
            Ok(Some(calculated_range_start..calculated_range_end))
156
        } else {
157
4
            Ok(None)
158
        }
159
21
    }
160
}
161
162
#[async_trait]
163
impl StoreDriver for FastSlowStore {
164
    async fn has_with_results(
165
        self: Pin<&Self>,
166
        key: &[StoreKey<'_>],
167
        results: &mut [Option<u64>],
168
24
    ) -> Result<(), Error> {
169
        // If our slow store is a noop store, it'll always return a 404,
170
        // so only check the fast store in such case.
171
12
        let slow_store = self.slow_store.inner_store::<StoreKey<'_>>(None);
172
12
        if slow_store.optimized_for(StoreOptimizations::NoopDownloads) {
  Branch (172:12): [True: 2, False: 10]
  Branch (172:12): [Folded - Ignored]
173
2
            return self.fast_store.has_with_results(key, results).await;
174
10
        }
175
        // Only check the slow store because if it's not there, then something
176
        // down stream might be unable to get it.  This should not affect
177
        // workers as they only use get() and a CAS can use an
178
        // ExistenceCacheStore to avoid the bottleneck.
179
10
        self.slow_store.has_with_results(key, results).await
180
24
    }
181
182
    async fn update(
183
        self: Pin<&Self>,
184
        key: StoreKey<'_>,
185
        mut reader: DropCloserReadHalf,
186
        size_info: UploadSizeInfo,
187
168
    ) -> Result<(), Error> {
188
        // If either one of our stores is a noop store, bypass the multiplexing
189
        // and just use the store that is not a noop store.
190
84
        let slow_store = self.slow_store.inner_store(Some(key.borrow()));
191
84
        if slow_store.optimized_for(StoreOptimizations::NoopUpdates) {
  Branch (191:12): [True: 0, False: 84]
  Branch (191:12): [Folded - Ignored]
192
0
            return self.fast_store.update(key, reader, size_info).await;
193
84
        }
194
84
        let fast_store = self.fast_store.inner_store(Some(key.borrow()));
195
84
        if fast_store.optimized_for(StoreOptimizations::NoopUpdates) {
  Branch (195:12): [True: 0, False: 84]
  Branch (195:12): [Folded - Ignored]
196
0
            return self.slow_store.update(key, reader, size_info).await;
197
84
        }
198
199
84
        let (mut fast_tx, fast_rx) = make_buf_channel_pair();
200
84
        let (mut slow_tx, slow_rx) = make_buf_channel_pair();
201
202
84
        let data_stream_fut = async move {
203
            loop {
204
136
                let buffer = reader
205
136
                    .recv()
206
136
                    .await
207
136
                    .err_tip(|| "Failed to read buffer in fastslow store")
?0
;
208
136
                if buffer.is_empty() {
  Branch (208:20): [True: 84, False: 52]
  Branch (208:20): [Folded - Ignored]
209
                    // EOF received.
210
84
                    fast_tx.send_eof().err_tip(
211
                        || "Failed to write eof to fast store in fast_slow store update",
212
0
                    )?;
213
84
                    slow_tx
214
84
                        .send_eof()
215
84
                        .err_tip(|| "Failed to write eof to writer in fast_slow store update")
?0
;
216
84
                    return Result::<(), Error>::Ok(());
217
52
                }
218
219
52
                let (fast_result, slow_result) =
220
52
                    join!(fast_tx.send(buffer.clone()), slow_tx.send(buffer));
221
52
                fast_result
222
52
                    .map_err(|e| 
{0
223
0
                        make_err!(
224
0
                            Code::Internal,
225
                            "Failed to send message to fast_store in fast_slow_store {:?}",
226
                            e
227
                        )
228
0
                    })
229
52
                    .merge(slow_result.map_err(|e| 
{0
230
0
                        make_err!(
231
0
                            Code::Internal,
232
                            "Failed to send message to slow_store in fast_slow store {:?}",
233
                            e
234
                        )
235
0
                    }))?;
236
            }
237
84
        };
238
239
84
        let fast_store_fut = self.fast_store.update(key.borrow(), fast_rx, size_info);
240
84
        let slow_store_fut = self.slow_store.update(key.borrow(), slow_rx, size_info);
241
242
84
        let (data_stream_res, fast_res, slow_res) =
243
84
            join!(data_stream_fut, fast_store_fut, slow_store_fut);
244
84
        data_stream_res.merge(fast_res).merge(slow_res)
?0
;
245
84
        Ok(())
246
168
    }
247
248
    /// FastSlowStore has optimizations for dealing with files.
249
0
    fn optimized_for(&self, optimization: StoreOptimizations) -> bool {
250
0
        optimization == StoreOptimizations::FileUpdates
251
0
    }
252
253
    /// Optimized variation to consume the file if one of the stores is a
254
    /// filesystem store. This makes the operation a move instead of a copy
255
    /// dramatically increasing performance for large files.
256
    async fn update_with_whole_file(
257
        self: Pin<&Self>,
258
        key: StoreKey<'_>,
259
        path: OsString,
260
        mut file: fs::FileSlot,
261
        upload_size: UploadSizeInfo,
262
10
    ) -> Result<Option<fs::FileSlot>, Error> {
263
5
        if self
  Branch (263:12): [True: 5, False: 0]
  Branch (263:12): [Folded - Ignored]
264
5
            .fast_store
265
5
            .optimized_for(StoreOptimizations::FileUpdates)
266
        {
267
5
            if !self
  Branch (267:16): [True: 5, False: 0]
  Branch (267:16): [Folded - Ignored]
268
5
                .slow_store
269
5
                .optimized_for(StoreOptimizations::NoopUpdates)
270
            {
271
5
                slow_update_store_with_file(
272
5
                    self.slow_store.as_store_driver_pin(),
273
5
                    key.borrow(),
274
5
                    &mut file,
275
5
                    upload_size,
276
5
                )
277
5
                .await
278
5
                .err_tip(|| "In FastSlowStore::update_with_whole_file slow_store")
?0
;
279
0
            }
280
5
            return self
281
5
                .fast_store
282
5
                .update_with_whole_file(key, path, file, upload_size)
283
5
                .await;
284
0
        }
285
286
0
        if self
  Branch (286:12): [True: 0, False: 0]
  Branch (286:12): [Folded - Ignored]
287
0
            .slow_store
288
0
            .optimized_for(StoreOptimizations::FileUpdates)
289
        {
290
0
            if !self
  Branch (290:16): [True: 0, False: 0]
  Branch (290:16): [Folded - Ignored]
291
0
                .fast_store
292
0
                .optimized_for(StoreOptimizations::NoopUpdates)
293
            {
294
0
                slow_update_store_with_file(
295
0
                    self.fast_store.as_store_driver_pin(),
296
0
                    key.borrow(),
297
0
                    &mut file,
298
0
                    upload_size,
299
0
                )
300
0
                .await
301
0
                .err_tip(|| "In FastSlowStore::update_with_whole_file fast_store")?;
302
0
            }
303
0
            return self
304
0
                .slow_store
305
0
                .update_with_whole_file(key, path, file, upload_size)
306
0
                .await;
307
0
        }
308
309
0
        slow_update_store_with_file(self, key, &mut file, upload_size)
310
0
            .await
311
0
            .err_tip(|| "In FastSlowStore::update_with_whole_file")?;
312
0
        Ok(Some(file))
313
10
    }
314
315
    async fn get_part(
316
        self: Pin<&Self>,
317
        key: StoreKey<'_>,
318
        writer: &mut DropCloserWriteHalf,
319
        offset: u64,
320
        length: Option<u64>,
321
140
    ) -> Result<(), Error> {
322
        // TODO(palfrey) Investigate if we should maybe ignore errors here instead of
323
        // forwarding the up.
324
70
        if self.fast_store.has(key.borrow()).await
?0
.is_some() {
  Branch (324:12): [True: 58, False: 12]
  Branch (324:12): [Folded - Ignored]
325
58
            self.metrics
326
58
                .fast_store_hit_count
327
58
                .fetch_add(1, Ordering::Acquire);
328
58
            self.fast_store
329
58
                .get_part(key, writer.borrow_mut(), offset, length)
330
58
                .await
?0
;
331
58
            self.metrics
332
58
                .fast_store_downloaded_bytes
333
58
                .fetch_add(writer.get_bytes_written(), Ordering::Acquire);
334
58
            return Ok(());
335
12
        }
336
337
12
        let sz = self
338
12
            .slow_store
339
12
            .has(key.borrow())
340
12
            .await
341
12
            .err_tip(|| "Failed to run has() on slow store")
?0
342
12
            .ok_or_else(|| 
{0
343
0
                make_err!(
344
0
                    Code::NotFound,
345
                    "Object {} not found in either fast or slow store",
346
0
                    key.as_str()
347
                )
348
0
            })?;
349
12
        self.metrics
350
12
            .slow_store_hit_count
351
12
            .fetch_add(1, Ordering::Acquire);
352
353
12
        let send_range = offset..length.map_or(u64::MAX, |length| 
length7
+
offset7
);
354
12
        let mut bytes_received: u64 = 0;
355
356
12
        let (mut fast_tx, fast_rx) = make_buf_channel_pair();
357
12
        let (slow_tx, mut slow_rx) = make_buf_channel_pair();
358
12
        let data_stream_fut = async move {
359
12
            let mut writer_pin = Pin::new(writer);
360
            loop {
361
23
                let output_buf = slow_rx
362
23
                    .recv()
363
23
                    .await
364
23
                    .err_tip(|| "Failed to read data data buffer from slow store")
?0
;
365
23
                if output_buf.is_empty() {
  Branch (365:20): [True: 12, False: 11]
  Branch (365:20): [Folded - Ignored]
366
                    // Write out our EOF.
367
                    // We are dropped as soon as we send_eof to writer_pin, so
368
                    // we wait until we've finished all of our joins to do that.
369
12
                    let fast_res = fast_tx.send_eof();
370
12
                    return Ok::<_, Error>((fast_res, writer_pin));
371
11
                }
372
11
                let output_buf_len = u64::try_from(output_buf.len())
373
11
                    .err_tip(|| "Could not output_buf.len() to u64")
?0
;
374
11
                self.metrics
375
11
                    .slow_store_downloaded_bytes
376
11
                    .fetch_add(output_buf_len, Ordering::Acquire);
377
378
11
                let writer_fut = Self::calculate_range(
379
11
                    &(bytes_received..bytes_received + output_buf_len),
380
11
                    &send_range,
381
0
                )?
382
11
                .map_or_else(
383
0
                    || futures::future::ready(Ok(())).left_future(),
384
11
                    |range| writer_pin.send(output_buf.slice(range)).right_future(),
385
                );
386
387
11
                bytes_received += output_buf_len;
388
389
11
                let (fast_tx_res, writer_res) = join!(fast_tx.send(output_buf), writer_fut);
390
11
                fast_tx_res.err_tip(|| "Failed to write to fast store in fast_slow store")
?0
;
391
11
                writer_res.err_tip(|| "Failed to write result to writer in fast_slow store")
?0
;
392
            }
393
12
        };
394
395
12
        let slow_store_fut = self.slow_store.get(key.borrow(), slow_tx);
396
12
        let fast_store_fut =
397
12
            self.fast_store
398
12
                .update(key.borrow(), fast_rx, UploadSizeInfo::ExactSize(sz));
399
400
12
        let (data_stream_res, slow_res, fast_res) =
401
12
            join!(data_stream_fut, slow_store_fut, fast_store_fut);
402
12
        match data_stream_res {
403
12
            Ok((fast_eof_res, mut writer_pin)) =>
404
            // Sending the EOF will drop us almost immediately in bytestream_server
405
            // so we perform it as the very last action in this method.
406
            {
407
12
                fast_eof_res
408
12
                    .merge(fast_res)
409
12
                    .merge(slow_res)
410
12
                    .merge(writer_pin.send_eof())
411
            }
412
0
            Err(err) => fast_res.merge(slow_res).merge(Err(err)),
413
        }
414
140
    }
415
416
2
    fn inner_store(&self, _key: Option<StoreKey>) -> &dyn StoreDriver {
417
2
        self
418
2
    }
419
420
2
    fn as_any<'a>(&'a self) -> &'a (dyn core::any::Any + Sync + Send + 'static) {
421
2
        self
422
2
    }
423
424
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> {
425
0
        self
426
0
    }
427
}
428
429
#[derive(Debug, Default, MetricsComponent)]
430
struct FastSlowStoreMetrics {
431
    #[metric(help = "Hit count for the fast store")]
432
    fast_store_hit_count: AtomicU64,
433
    #[metric(help = "Downloaded bytes from the fast store")]
434
    fast_store_downloaded_bytes: AtomicU64,
435
    #[metric(help = "Hit count for the slow store")]
436
    slow_store_hit_count: AtomicU64,
437
    #[metric(help = "Downloaded bytes from the slow store")]
438
    slow_store_downloaded_bytes: AtomicU64,
439
}
440
441
default_health_status_indicator!(FastSlowStore);