Coverage Report

Created: 2025-12-04 21:59

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/fast_slow_store.rs
Line
Count
Source
1
// Copyright 2024-2025 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::borrow::BorrowMut;
16
use core::cmp::{max, min};
17
use core::ops::Range;
18
use core::pin::Pin;
19
use core::sync::atomic::{AtomicU64, Ordering};
20
use std::collections::HashMap;
21
use std::ffi::OsString;
22
use std::sync::{Arc, Weak};
23
24
use async_trait::async_trait;
25
use futures::{FutureExt, join};
26
use nativelink_config::stores::{FastSlowSpec, StoreDirection};
27
use nativelink_error::{Code, Error, ResultExt, make_err};
28
use nativelink_metric::MetricsComponent;
29
use nativelink_util::buf_channel::{
30
    DropCloserReadHalf, DropCloserWriteHalf, make_buf_channel_pair,
31
};
32
use nativelink_util::fs;
33
use nativelink_util::health_utils::{HealthStatusIndicator, default_health_status_indicator};
34
use nativelink_util::store_trait::{
35
    RemoveItemCallback, Store, StoreDriver, StoreKey, StoreLike, StoreOptimizations,
36
    UploadSizeInfo, slow_update_store_with_file,
37
};
38
use parking_lot::Mutex;
39
use tokio::sync::OnceCell;
40
use tracing::trace;
41
42
// TODO(palfrey) This store needs to be evaluated for more efficient memory usage,
43
// there are many copies happening internally.
44
45
type Loader = Arc<OnceCell<()>>;
46
47
// TODO(palfrey) We should consider copying the data in the background to allow the
48
// client to hang up while the data is buffered. An alternative is to possibly make a
49
// "BufferedStore" that could be placed on the "slow" store that would hang up early
50
// if data is in the buffer.
51
#[derive(Debug, MetricsComponent)]
52
pub struct FastSlowStore {
53
    #[metric(group = "fast_store")]
54
    fast_store: Store,
55
    fast_direction: StoreDirection,
56
    #[metric(group = "slow_store")]
57
    slow_store: Store,
58
    slow_direction: StoreDirection,
59
    weak_self: Weak<Self>,
60
    #[metric]
61
    metrics: FastSlowStoreMetrics,
62
    // De-duplicate requests for the fast store, only the first streams, others
63
    // are blocked.  This may feel like it's causing a slow down of tasks, but
64
    // actually it's faster because we're not downloading the file multiple
65
    // times are doing loads of duplicate IO.
66
    populating_digests: Mutex<HashMap<StoreKey<'static>, Loader>>,
67
}
68
69
// This guard ensures that the populating_digests is cleared even if the future
70
// is dropped, it is cancel safe.
71
struct LoaderGuard<'a> {
72
    weak_store: Weak<FastSlowStore>,
73
    key: StoreKey<'a>,
74
    loader: Option<Loader>,
75
}
76
77
impl LoaderGuard<'_> {
78
14
    async fn get_or_try_init<E, F, Fut>(&self, f: F) -> Result<(), E>
79
14
    where
80
14
        F: FnOnce() -> Fut,
81
14
        Fut: Future<Output = Result<(), E>>,
82
14
    {
83
14
        if let Some(loader) = &self.loader {
  Branch (83:16): [True: 10, False: 0]
  Branch (83:16): [True: 4, False: 0]
  Branch (83:16): [Folded - Ignored]
84
14
            loader.get_or_try_init(f).await.map(|&()| ())
85
        } else {
86
            // This is impossible, but we do it anyway.
87
0
            f().await
88
        }
89
14
    }
90
}
91
92
impl Drop for LoaderGuard<'_> {
93
14
    fn drop(&mut self) {
94
14
        let Some(store) = self.weak_store.upgrade() else {
  Branch (94:13): [True: 14, False: 0]
  Branch (94:13): [Folded - Ignored]
95
            // The store has already gone away, nothing to remove from.
96
0
            return;
97
        };
98
14
        let Some(loader) = self.loader.take() else {
  Branch (98:13): [True: 14, False: 0]
  Branch (98:13): [Folded - Ignored]
99
            // This should never happen, but we do it to be safe.
100
0
            return;
101
        };
102
103
14
        let mut guard = store.populating_digests.lock();
104
14
        if let std::collections::hash_map::Entry::Occupied(occupied_entry) =
  Branch (104:16): [True: 14, False: 0]
  Branch (104:16): [Folded - Ignored]
105
14
            guard.entry(self.key.borrow().into_owned())
106
        {
107
14
            if Arc::ptr_eq(occupied_entry.get(), &loader) {
  Branch (107:16): [True: 14, False: 0]
  Branch (107:16): [Folded - Ignored]
108
14
                drop(loader);
109
14
                if Arc::strong_count(occupied_entry.get()) == 1 {
  Branch (109:20): [True: 14, False: 0]
  Branch (109:20): [Folded - Ignored]
110
14
                    // This is the last loader, so remove it.
111
14
                    occupied_entry.remove();
112
14
                
}0
113
0
            }
114
0
        }
115
14
    }
116
}
117
118
impl FastSlowStore {
119
41
    pub fn new(spec: &FastSlowSpec, fast_store: Store, slow_store: Store) -> Arc<Self> {
120
41
        Arc::new_cyclic(|weak_self| Self {
121
41
            fast_store,
122
41
            fast_direction: spec.fast_direction,
123
41
            slow_store,
124
41
            slow_direction: spec.slow_direction,
125
41
            weak_self: weak_self.clone(),
126
41
            metrics: FastSlowStoreMetrics::default(),
127
41
            populating_digests: Mutex::new(HashMap::new()),
128
41
        })
129
41
    }
130
131
0
    pub const fn fast_store(&self) -> &Store {
132
0
        &self.fast_store
133
0
    }
134
135
0
    pub const fn slow_store(&self) -> &Store {
136
0
        &self.slow_store
137
0
    }
138
139
2
    pub fn get_arc(&self) -> Option<Arc<Self>> {
140
2
        self.weak_self.upgrade()
141
2
    }
142
143
14
    fn get_loader<'a>(&self, key: StoreKey<'a>) -> LoaderGuard<'a> {
144
        // Get a single loader instance that's used to populate the fast store
145
        // for this digest.  If another request comes in then it's de-duplicated.
146
14
        let loader = match self
147
14
            .populating_digests
148
14
            .lock()
149
14
            .entry(key.borrow().into_owned())
150
        {
151
0
            std::collections::hash_map::Entry::Occupied(occupied_entry) => {
152
0
                occupied_entry.get().clone()
153
            }
154
14
            std::collections::hash_map::Entry::Vacant(vacant_entry) => {
155
14
                vacant_entry.insert(Arc::new(OnceCell::new())).clone()
156
            }
157
        };
158
14
        LoaderGuard {
159
14
            weak_store: self.weak_self.clone(),
160
14
            key,
161
14
            loader: Some(loader),
162
14
        }
163
14
    }
164
165
14
    async fn populate_and_maybe_stream(
166
14
        self: Pin<&Self>,
167
14
        key: StoreKey<'_>,
168
14
        maybe_writer: Option<&mut DropCloserWriteHalf>,
169
14
        offset: u64,
170
14
        length: Option<u64>,
171
14
    ) -> Result<(), Error> {
172
14
        let reader_stream_size = if self
  Branch (172:37): [True: 2, False: 8]
  Branch (172:37): [True: 0, False: 4]
  Branch (172:37): [Folded - Ignored]
173
14
            .slow_store
174
14
            .inner_store(Some(key.borrow()))
175
14
            .optimized_for(StoreOptimizations::LazyExistenceOnSync)
176
        {
177
2
            trace!(
178
                %key,
179
2
                store_name = %self.slow_store.inner_store(Some(key.borrow())).get_name(),
180
2
                "Skipping .has() check due to LazyExistenceOnSync optimization"
181
            );
182
2
            UploadSizeInfo::MaxSize(u64::MAX)
183
        } else {
184
12
            UploadSizeInfo::ExactSize(self
185
12
                    .slow_store
186
12
                    .has(key.borrow())
187
12
                    .await
188
12
                    .err_tip(|| "Failed to run has() on slow store")
?0
189
12
                    .ok_or_else(|| 
{0
190
0
                        make_err!(
191
0
                            Code::NotFound,
192
                            "Object {} not found in either fast or slow store. \
193
                                If using multiple workers, ensure all workers share the same CAS storage path.",
194
0
                            key.as_str()
195
                        )
196
0
                    })?
197
            )
198
        };
199
200
14
        let send_range = offset..length.map_or(u64::MAX, |length| 
length7
+
offset7
);
201
14
        let mut bytes_received: u64 = 0;
202
14
        let mut counted_hit = false;
203
204
14
        let (mut fast_tx, fast_rx) = make_buf_channel_pair();
205
14
        let (slow_tx, mut slow_rx) = make_buf_channel_pair();
206
14
        let data_stream_fut = async move {
207
14
            let mut maybe_writer_pin = maybe_writer.map(Pin::new);
208
            loop {
209
26
                let 
output_buf25
= slow_rx
210
26
                    .recv()
211
26
                    .await
212
26
                    .err_tip(|| "Failed to read data data buffer from slow store")
?1
;
213
25
                if output_buf.is_empty() {
  Branch (213:20): [True: 9, False: 8]
  Branch (213:20): [True: 4, False: 4]
  Branch (213:20): [Folded - Ignored]
214
                    // Write out our EOF.
215
                    // We are dropped as soon as we send_eof to writer_pin, so
216
                    // we wait until we've finished all of our joins to do that.
217
13
                    let fast_res = fast_tx.send_eof();
218
13
                    return Ok::<_, Error>((fast_res, maybe_writer_pin));
219
12
                }
220
221
12
                if !counted_hit {
  Branch (221:20): [True: 8, False: 0]
  Branch (221:20): [True: 4, False: 0]
  Branch (221:20): [Folded - Ignored]
222
12
                    self.metrics
223
12
                        .slow_store_hit_count
224
12
                        .fetch_add(1, Ordering::Acquire);
225
12
                    counted_hit = true;
226
12
                
}0
227
228
12
                let output_buf_len = u64::try_from(output_buf.len())
229
12
                    .err_tip(|| "Could not output_buf.len() to u64")
?0
;
230
12
                self.metrics
231
12
                    .slow_store_downloaded_bytes
232
12
                    .fetch_add(output_buf_len, Ordering::Acquire);
233
234
12
                let writer_fut = Self::calculate_range(
235
12
                    &(bytes_received..bytes_received + output_buf_len),
236
12
                    &send_range,
237
0
                )?
238
12
                .zip(maybe_writer_pin.as_mut())
239
12
                .map_or_else(
240
4
                    || futures::future::ready(Ok(())).left_future(),
241
8
                    |(range, writer_pin)| writer_pin.send(output_buf.slice(range)).right_future(),
242
                );
243
244
12
                bytes_received += output_buf_len;
245
246
12
                let (fast_tx_res, writer_res) = join!(fast_tx.send(output_buf), writer_fut);
247
12
                fast_tx_res.err_tip(|| "Failed to write to fast store in fast_slow store")
?0
;
248
12
                writer_res.err_tip(|| "Failed to write result to writer in fast_slow store")
?0
;
249
            }
250
14
        };
251
252
14
        let slow_store_fut = self.slow_store.get(key.borrow(), slow_tx);
253
14
        let fast_store_fut = self
254
14
            .fast_store
255
14
            .update(key.borrow(), fast_rx, reader_stream_size);
256
257
14
        let (data_stream_res, slow_res, fast_res) =
258
14
            join!(data_stream_fut, slow_store_fut, fast_store_fut);
259
14
        match data_stream_res {
260
13
            Ok((fast_eof_res, maybe_writer_pin)) =>
261
            // Sending the EOF will drop us almost immediately in bytestream_server
262
            // so we perform it as the very last action in this method.
263
            {
264
13
                fast_eof_res.merge(fast_res).merge(slow_res).merge(
265
13
                    if let Some(
mut writer_pin9
) = maybe_writer_pin {
  Branch (265:28): [True: 9, False: 0]
  Branch (265:28): [True: 0, False: 4]
  Branch (265:28): [Folded - Ignored]
266
9
                        writer_pin.send_eof()
267
                    } else {
268
4
                        Ok(())
269
                    },
270
                )
271
            }
272
1
            Err(err) => match slow_res {
273
1
                Err(slow_err) if slow_err.code == Code::NotFound => Err(slow_err),
  Branch (273:34): [True: 1, False: 0]
  Branch (273:34): [True: 0, False: 0]
  Branch (273:34): [Folded - Ignored]
274
0
                _ => fast_res.merge(slow_res).merge(Err(err)),
275
            },
276
        }
277
14
    }
278
279
    /// Ensure our fast store is populated. This should be kept as a low
280
    /// cost function. Since the data itself is shared and not copied it should be fairly
281
    /// low cost to just discard the data, but does cost a few mutex locks while
282
    /// streaming.
283
4
    pub async fn populate_fast_store(&self, key: StoreKey<'_>) -> Result<(), Error> {
284
4
        let maybe_size_info = self
285
4
            .fast_store
286
4
            .has(key.borrow())
287
4
            .await
288
4
            .err_tip(|| "While querying in populate_fast_store")
?0
;
289
4
        if maybe_size_info.is_some() {
  Branch (289:12): [Folded - Ignored]
  Branch (289:12): [True: 0, False: 4]
  Branch (289:12): [Folded - Ignored]
290
0
            return Ok(());
291
4
        }
292
293
        // If the fast store is noop or read only or update only then this is an error.
294
4
        if self
  Branch (294:12): [Folded - Ignored]
  Branch (294:12): [True: 0, False: 4]
  Branch (294:12): [Folded - Ignored]
295
4
            .fast_store
296
4
            .inner_store(Some(key.borrow()))
297
4
            .optimized_for(StoreOptimizations::NoopUpdates)
298
4
            || self.fast_direction == StoreDirection::ReadOnly
  Branch (298:16): [Folded - Ignored]
  Branch (298:16): [True: 0, False: 4]
  Branch (298:16): [Folded - Ignored]
299
4
            || self.fast_direction == StoreDirection::Update
  Branch (299:16): [Folded - Ignored]
  Branch (299:16): [True: 0, False: 4]
  Branch (299:16): [Folded - Ignored]
300
        {
301
0
            return Err(make_err!(
302
0
                Code::Internal,
303
0
                "Attempt to populate fast store that is read only or noop"
304
0
            ));
305
4
        }
306
307
4
        self.get_loader(key.borrow())
308
4
            .get_or_try_init(|| {
309
4
                Pin::new(self).populate_and_maybe_stream(key.borrow(), None, 0, None)
310
4
            })
311
4
            .await
312
4
            .err_tip(|| "Failed to populate()")
313
4
    }
314
315
    /// Returns the range of bytes that should be sent given a slice bounds
316
    /// offset so the output range maps the `received_range.start` to 0.
317
    // TODO(palfrey) This should be put into utils, as this logic is used
318
    // elsewhere in the code.
319
22
    pub fn calculate_range(
320
22
        received_range: &Range<u64>,
321
22
        send_range: &Range<u64>,
322
22
    ) -> Result<Option<Range<usize>>, Error> {
323
        // Protect against subtraction overflow.
324
22
        if received_range.start >= received_range.end {
  Branch (324:12): [True: 0, False: 22]
  Branch (324:12): [Folded - Ignored]
325
0
            return Ok(None);
326
22
        }
327
328
22
        let start = max(received_range.start, send_range.start);
329
22
        let end = min(received_range.end, send_range.end);
330
22
        if received_range.contains(&start) && 
received_range20
.
contains20
(
&(end - 1)20
) {
  Branch (330:12): [True: 20, False: 2]
  Branch (330:47): [True: 18, False: 2]
  Branch (330:12): [Folded - Ignored]
  Branch (330:47): [Folded - Ignored]
331
            // Offset both to the start of the received_range.
332
18
            let calculated_range_start = usize::try_from(start - received_range.start)
333
18
                .err_tip(|| "Could not convert (start - received_range.start) to usize")
?0
;
334
18
            let calculated_range_end = usize::try_from(end - received_range.start)
335
18
                .err_tip(|| "Could not convert (end - received_range.start) to usize")
?0
;
336
18
            Ok(Some(calculated_range_start..calculated_range_end))
337
        } else {
338
4
            Ok(None)
339
        }
340
22
    }
341
}
342
343
#[async_trait]
344
impl StoreDriver for FastSlowStore {
345
    async fn has_with_results(
346
        self: Pin<&Self>,
347
        key: &[StoreKey<'_>],
348
        results: &mut [Option<u64>],
349
12
    ) -> Result<(), Error> {
350
        // If our slow store is a noop store, it'll always return a 404,
351
        // so only check the fast store in such case.
352
        let slow_store = self.slow_store.inner_store::<StoreKey<'_>>(None);
353
        if slow_store.optimized_for(StoreOptimizations::NoopDownloads) {
354
            return self.fast_store.has_with_results(key, results).await;
355
        }
356
        // Only check the slow store because if it's not there, then something
357
        // down stream might be unable to get it.  This should not affect
358
        // workers as they only use get() and a CAS can use an
359
        // ExistenceCacheStore to avoid the bottleneck.
360
        self.slow_store.has_with_results(key, results).await
361
12
    }
362
363
    async fn update(
364
        self: Pin<&Self>,
365
        key: StoreKey<'_>,
366
        mut reader: DropCloserReadHalf,
367
        size_info: UploadSizeInfo,
368
88
    ) -> Result<(), Error> {
369
        // If either one of our stores is a noop store, bypass the multiplexing
370
        // and just use the store that is not a noop store.
371
        let ignore_slow = self
372
            .slow_store
373
            .inner_store(Some(key.borrow()))
374
            .optimized_for(StoreOptimizations::NoopUpdates)
375
            || self.slow_direction == StoreDirection::ReadOnly
376
            || self.slow_direction == StoreDirection::Get;
377
        let ignore_fast = self
378
            .fast_store
379
            .inner_store(Some(key.borrow()))
380
            .optimized_for(StoreOptimizations::NoopUpdates)
381
            || self.fast_direction == StoreDirection::ReadOnly
382
            || self.fast_direction == StoreDirection::Get;
383
        if ignore_slow && ignore_fast {
384
            // We need to drain the reader to avoid the writer complaining that we dropped
385
            // the connection prematurely.
386
            reader
387
                .drain()
388
                .await
389
                .err_tip(|| "In FastFlowStore::update")?;
390
            return Ok(());
391
        }
392
        if ignore_slow {
393
            return self.fast_store.update(key, reader, size_info).await;
394
        }
395
        if ignore_fast {
396
            return self.slow_store.update(key, reader, size_info).await;
397
        }
398
399
        let (mut fast_tx, fast_rx) = make_buf_channel_pair();
400
        let (mut slow_tx, slow_rx) = make_buf_channel_pair();
401
402
84
        let data_stream_fut = async move {
403
            loop {
404
136
                let buffer = reader
405
136
                    .recv()
406
136
                    .await
407
136
                    .err_tip(|| "Failed to read buffer in fastslow store")
?0
;
408
136
                if buffer.is_empty() {
  Branch (408:20): [True: 84, False: 52]
  Branch (408:20): [Folded - Ignored]
409
                    // EOF received.
410
84
                    fast_tx.send_eof().err_tip(
411
                        || "Failed to write eof to fast store in fast_slow store update",
412
0
                    )?;
413
84
                    slow_tx
414
84
                        .send_eof()
415
84
                        .err_tip(|| "Failed to write eof to writer in fast_slow store update")
?0
;
416
84
                    return Result::<(), Error>::Ok(());
417
52
                }
418
419
52
                let (fast_result, slow_result) =
420
52
                    join!(fast_tx.send(buffer.clone()), slow_tx.send(buffer));
421
52
                fast_result
422
52
                    .map_err(|e| 
{0
423
0
                        make_err!(
424
0
                            Code::Internal,
425
                            "Failed to send message to fast_store in fast_slow_store {:?}",
426
                            e
427
                        )
428
0
                    })
429
52
                    .merge(slow_result.map_err(|e| 
{0
430
0
                        make_err!(
431
0
                            Code::Internal,
432
                            "Failed to send message to slow_store in fast_slow store {:?}",
433
                            e
434
                        )
435
0
                    }))?;
436
            }
437
84
        };
438
439
        let fast_store_fut = self.fast_store.update(key.borrow(), fast_rx, size_info);
440
        let slow_store_fut = self.slow_store.update(key.borrow(), slow_rx, size_info);
441
442
        let (data_stream_res, fast_res, slow_res) =
443
            join!(data_stream_fut, fast_store_fut, slow_store_fut);
444
        data_stream_res.merge(fast_res).merge(slow_res)?;
445
        Ok(())
446
88
    }
447
448
    /// `FastSlowStore` has optimizations for dealing with files.
449
0
    fn optimized_for(&self, optimization: StoreOptimizations) -> bool {
450
0
        optimization == StoreOptimizations::FileUpdates
451
0
    }
452
453
    /// Optimized variation to consume the file if one of the stores is a
454
    /// filesystem store. This makes the operation a move instead of a copy
455
    /// dramatically increasing performance for large files.
456
    async fn update_with_whole_file(
457
        self: Pin<&Self>,
458
        key: StoreKey<'_>,
459
        path: OsString,
460
        mut file: fs::FileSlot,
461
        upload_size: UploadSizeInfo,
462
5
    ) -> Result<Option<fs::FileSlot>, Error> {
463
        if self
464
            .fast_store
465
            .optimized_for(StoreOptimizations::FileUpdates)
466
        {
467
            if !self
468
                .slow_store
469
                .inner_store(Some(key.borrow()))
470
                .optimized_for(StoreOptimizations::NoopUpdates)
471
                && self.slow_direction != StoreDirection::ReadOnly
472
                && self.slow_direction != StoreDirection::Get
473
            {
474
                slow_update_store_with_file(
475
                    self.slow_store.as_store_driver_pin(),
476
                    key.borrow(),
477
                    &mut file,
478
                    upload_size,
479
                )
480
                .await
481
                .err_tip(|| "In FastSlowStore::update_with_whole_file slow_store")?;
482
            }
483
            if self.fast_direction == StoreDirection::ReadOnly
484
                || self.fast_direction == StoreDirection::Get
485
            {
486
                return Ok(Some(file));
487
            }
488
            return self
489
                .fast_store
490
                .update_with_whole_file(key, path, file, upload_size)
491
                .await;
492
        }
493
494
        if self
495
            .slow_store
496
            .optimized_for(StoreOptimizations::FileUpdates)
497
        {
498
            let ignore_fast = self
499
                .fast_store
500
                .inner_store(Some(key.borrow()))
501
                .optimized_for(StoreOptimizations::NoopUpdates)
502
                || self.fast_direction == StoreDirection::ReadOnly
503
                || self.fast_direction == StoreDirection::Get;
504
            if !ignore_fast {
505
                slow_update_store_with_file(
506
                    self.fast_store.as_store_driver_pin(),
507
                    key.borrow(),
508
                    &mut file,
509
                    upload_size,
510
                )
511
                .await
512
                .err_tip(|| "In FastSlowStore::update_with_whole_file fast_store")?;
513
            }
514
            let ignore_slow = self.slow_direction == StoreDirection::ReadOnly
515
                || self.slow_direction == StoreDirection::Get;
516
            if ignore_slow {
517
                return Ok(Some(file));
518
            }
519
            return self
520
                .slow_store
521
                .update_with_whole_file(key, path, file, upload_size)
522
                .await;
523
        }
524
525
        slow_update_store_with_file(self, key, &mut file, upload_size)
526
            .await
527
            .err_tip(|| "In FastSlowStore::update_with_whole_file")?;
528
        Ok(Some(file))
529
5
    }
530
531
    async fn get_part(
532
        self: Pin<&Self>,
533
        key: StoreKey<'_>,
534
        writer: &mut DropCloserWriteHalf,
535
        offset: u64,
536
        length: Option<u64>,
537
70
    ) -> Result<(), Error> {
538
        // TODO(palfrey) Investigate if we should maybe ignore errors here instead of
539
        // forwarding them up.
540
        if self.fast_store.has(key.borrow()).await?.is_some() {
541
            self.metrics
542
                .fast_store_hit_count
543
                .fetch_add(1, Ordering::Acquire);
544
            self.fast_store
545
                .get_part(key, writer.borrow_mut(), offset, length)
546
                .await?;
547
            self.metrics
548
                .fast_store_downloaded_bytes
549
                .fetch_add(writer.get_bytes_written(), Ordering::Acquire);
550
            return Ok(());
551
        }
552
553
        // If the fast store is noop or read only or update only then bypass it.
554
        if self
555
            .fast_store
556
            .inner_store(Some(key.borrow()))
557
            .optimized_for(StoreOptimizations::NoopUpdates)
558
            || self.fast_direction == StoreDirection::ReadOnly
559
            || self.fast_direction == StoreDirection::Update
560
        {
561
            self.metrics
562
                .slow_store_hit_count
563
                .fetch_add(1, Ordering::Acquire);
564
            self.slow_store
565
                .get_part(key, writer.borrow_mut(), offset, length)
566
                .await?;
567
            self.metrics
568
                .slow_store_downloaded_bytes
569
                .fetch_add(writer.get_bytes_written(), Ordering::Acquire);
570
            return Ok(());
571
        }
572
573
        let mut writer = Some(writer);
574
        self.get_loader(key.borrow())
575
10
            .get_or_try_init(|| {
576
10
                self.populate_and_maybe_stream(key.borrow(), writer.take(), offset, length)
577
10
            })
578
            .await?;
579
580
        // If we didn't stream then re-enter which will stream from the fast
581
        // store, or retry the download.  We should not get in a loop here
582
        // because OnceCell has the good sense to retry for all callers so in
583
        // order to get here the fast store will have been populated.  There's
584
        // an outside chance it was evicted, but that's slim.
585
        if let Some(writer) = writer.take() {
586
            self.get_part(key, writer, offset, length).await
587
        } else {
588
            // This was the thread that did the streaming already, lucky duck.
589
            Ok(())
590
        }
591
70
    }
592
593
2
    fn inner_store(&self, _key: Option<StoreKey>) -> &dyn StoreDriver {
594
2
        self
595
2
    }
596
597
2
    fn as_any<'a>(&'a self) -> &'a (dyn core::any::Any + Sync + Send + 'static) {
598
2
        self
599
2
    }
600
601
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> {
602
0
        self
603
0
    }
604
605
0
    fn register_remove_callback(
606
0
        self: Arc<Self>,
607
0
        callback: Arc<dyn RemoveItemCallback>,
608
0
    ) -> Result<(), Error> {
609
0
        self.fast_store.register_remove_callback(callback.clone())?;
610
0
        self.slow_store.register_remove_callback(callback)?;
611
0
        Ok(())
612
0
    }
613
}
614
615
#[derive(Debug, Default, MetricsComponent)]
616
struct FastSlowStoreMetrics {
617
    #[metric(help = "Hit count for the fast store")]
618
    fast_store_hit_count: AtomicU64,
619
    #[metric(help = "Downloaded bytes from the fast store")]
620
    fast_store_downloaded_bytes: AtomicU64,
621
    #[metric(help = "Hit count for the slow store")]
622
    slow_store_hit_count: AtomicU64,
623
    #[metric(help = "Downloaded bytes from the slow store")]
624
    slow_store_downloaded_bytes: AtomicU64,
625
}
626
627
default_health_status_indicator!(FastSlowStore);