Coverage Report

Created: 2025-11-05 08:49

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/fast_slow_store.rs
Line
Count
Source
1
// Copyright 2024-2025 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::borrow::BorrowMut;
16
use core::cmp::{max, min};
17
use core::ops::Range;
18
use core::pin::Pin;
19
use core::sync::atomic::{AtomicU64, Ordering};
20
use std::collections::HashMap;
21
use std::ffi::OsString;
22
use std::sync::{Arc, Weak};
23
24
use async_trait::async_trait;
25
use futures::{FutureExt, join};
26
use nativelink_config::stores::{FastSlowSpec, StoreDirection};
27
use nativelink_error::{Code, Error, ResultExt, make_err};
28
use nativelink_metric::MetricsComponent;
29
use nativelink_util::buf_channel::{
30
    DropCloserReadHalf, DropCloserWriteHalf, make_buf_channel_pair,
31
};
32
use nativelink_util::fs;
33
use nativelink_util::health_utils::{HealthStatusIndicator, default_health_status_indicator};
34
use nativelink_util::store_trait::{
35
    RemoveItemCallback, Store, StoreDriver, StoreKey, StoreLike, StoreOptimizations,
36
    UploadSizeInfo, slow_update_store_with_file,
37
};
38
use parking_lot::Mutex;
39
use tokio::sync::OnceCell;
40
41
// TODO(palfrey) This store needs to be evaluated for more efficient memory usage,
42
// there are many copies happening internally.
43
44
type Loader = Arc<OnceCell<()>>;
45
46
// TODO(palfrey) We should consider copying the data in the background to allow the
47
// client to hang up while the data is buffered. An alternative is to possibly make a
48
// "BufferedStore" that could be placed on the "slow" store that would hang up early
49
// if data is in the buffer.
50
#[derive(Debug, MetricsComponent)]
51
pub struct FastSlowStore {
52
    #[metric(group = "fast_store")]
53
    fast_store: Store,
54
    fast_direction: StoreDirection,
55
    #[metric(group = "slow_store")]
56
    slow_store: Store,
57
    slow_direction: StoreDirection,
58
    weak_self: Weak<Self>,
59
    #[metric]
60
    metrics: FastSlowStoreMetrics,
61
    // De-duplicate requests for the fast store, only the first streams, others
62
    // are blocked.  This may feel like it's causing a slow down of tasks, but
63
    // actually it's faster because we're not downloading the file multiple
64
    // times are doing loads of duplicate IO.
65
    populating_digests: Mutex<HashMap<StoreKey<'static>, Loader>>,
66
}
67
68
// This guard ensures that the populating_digests is cleared even if the future
69
// is dropped, it is cancel safe.
70
struct LoaderGuard<'a> {
71
    weak_store: Weak<FastSlowStore>,
72
    key: StoreKey<'a>,
73
    loader: Option<Loader>,
74
}
75
76
impl LoaderGuard<'_> {
77
12
    async fn get_or_try_init<E, F, Fut>(&self, f: F) -> Result<(), E>
78
12
    where
79
12
        F: FnOnce() -> Fut,
80
12
        Fut: Future<Output = Result<(), E>>,
81
12
    {
82
12
        if let Some(loader) = &self.loader {
  Branch (82:16): [True: 8, False: 0]
  Branch (82:16): [True: 4, False: 0]
  Branch (82:16): [Folded - Ignored]
83
12
            loader.get_or_try_init(f).await.map(|&()| ())
84
        } else {
85
            // This is impossible, but we do it anyway.
86
0
            f().await
87
        }
88
12
    }
89
}
90
91
impl Drop for LoaderGuard<'_> {
92
12
    fn drop(&mut self) {
93
12
        let Some(store) = self.weak_store.upgrade() else {
  Branch (93:13): [True: 12, False: 0]
  Branch (93:13): [Folded - Ignored]
94
            // The store has already gone away, nothing to remove from.
95
0
            return;
96
        };
97
12
        let Some(loader) = self.loader.take() else {
  Branch (97:13): [True: 12, False: 0]
  Branch (97:13): [Folded - Ignored]
98
            // This should never happen, but we do it to be safe.
99
0
            return;
100
        };
101
102
12
        let mut guard = store.populating_digests.lock();
103
12
        if let std::collections::hash_map::Entry::Occupied(occupied_entry) =
  Branch (103:16): [True: 12, False: 0]
  Branch (103:16): [Folded - Ignored]
104
12
            guard.entry(self.key.borrow().into_owned())
105
        {
106
12
            if Arc::ptr_eq(occupied_entry.get(), &loader) {
  Branch (106:16): [True: 12, False: 0]
  Branch (106:16): [Folded - Ignored]
107
12
                drop(loader);
108
12
                if Arc::strong_count(occupied_entry.get()) == 1 {
  Branch (108:20): [True: 12, False: 0]
  Branch (108:20): [Folded - Ignored]
109
12
                    // This is the last loader, so remove it.
110
12
                    occupied_entry.remove();
111
12
                
}0
112
0
            }
113
0
        }
114
12
    }
115
}
116
117
impl FastSlowStore {
118
39
    pub fn new(spec: &FastSlowSpec, fast_store: Store, slow_store: Store) -> Arc<Self> {
119
39
        Arc::new_cyclic(|weak_self| Self {
120
39
            fast_store,
121
39
            fast_direction: spec.fast_direction,
122
39
            slow_store,
123
39
            slow_direction: spec.slow_direction,
124
39
            weak_self: weak_self.clone(),
125
39
            metrics: FastSlowStoreMetrics::default(),
126
39
            populating_digests: Mutex::new(HashMap::new()),
127
39
        })
128
39
    }
129
130
0
    pub const fn fast_store(&self) -> &Store {
131
0
        &self.fast_store
132
0
    }
133
134
0
    pub const fn slow_store(&self) -> &Store {
135
0
        &self.slow_store
136
0
    }
137
138
2
    pub fn get_arc(&self) -> Option<Arc<Self>> {
139
2
        self.weak_self.upgrade()
140
2
    }
141
142
12
    fn get_loader<'a>(&self, key: StoreKey<'a>) -> LoaderGuard<'a> {
143
        // Get a single loader instance that's used to populate the fast store
144
        // for this digest.  If another request comes in then it's de-duplicated.
145
12
        let loader = match self
146
12
            .populating_digests
147
12
            .lock()
148
12
            .entry(key.borrow().into_owned())
149
        {
150
0
            std::collections::hash_map::Entry::Occupied(occupied_entry) => {
151
0
                occupied_entry.get().clone()
152
            }
153
12
            std::collections::hash_map::Entry::Vacant(vacant_entry) => {
154
12
                vacant_entry.insert(Arc::new(OnceCell::new())).clone()
155
            }
156
        };
157
12
        LoaderGuard {
158
12
            weak_store: self.weak_self.clone(),
159
12
            key,
160
12
            loader: Some(loader),
161
12
        }
162
12
    }
163
164
12
    async fn populate_and_maybe_stream(
165
12
        self: Pin<&Self>,
166
12
        key: StoreKey<'_>,
167
12
        maybe_writer: Option<&mut DropCloserWriteHalf>,
168
12
        offset: u64,
169
12
        length: Option<u64>,
170
12
    ) -> Result<(), Error> {
171
12
        let sz = self
172
12
            .slow_store
173
12
            .has(key.borrow())
174
12
            .await
175
12
            .err_tip(|| "Failed to run has() on slow store")
?0
176
12
            .ok_or_else(|| 
{0
177
0
                make_err!(
178
0
                    Code::NotFound,
179
                    "Object {} not found in either fast or slow store. \
180
                    If using multiple workers, ensure all workers share the same CAS storage path.",
181
0
                    key.as_str()
182
                )
183
0
            })?;
184
185
12
        self.metrics
186
12
            .slow_store_hit_count
187
12
            .fetch_add(1, Ordering::Acquire);
188
189
12
        let send_range = offset..length.map_or(u64::MAX, |length| 
length7
+
offset7
);
190
12
        let mut bytes_received: u64 = 0;
191
192
12
        let (mut fast_tx, fast_rx) = make_buf_channel_pair();
193
12
        let (slow_tx, mut slow_rx) = make_buf_channel_pair();
194
12
        let data_stream_fut = async move {
195
12
            let mut maybe_writer_pin = maybe_writer.map(Pin::new);
196
            loop {
197
23
                let output_buf = slow_rx
198
23
                    .recv()
199
23
                    .await
200
23
                    .err_tip(|| "Failed to read data data buffer from slow store")
?0
;
201
23
                if output_buf.is_empty() {
  Branch (201:20): [True: 8, False: 7]
  Branch (201:20): [True: 4, False: 4]
  Branch (201:20): [Folded - Ignored]
202
                    // Write out our EOF.
203
                    // We are dropped as soon as we send_eof to writer_pin, so
204
                    // we wait until we've finished all of our joins to do that.
205
12
                    let fast_res = fast_tx.send_eof();
206
12
                    return Ok::<_, Error>((fast_res, maybe_writer_pin));
207
11
                }
208
11
                let output_buf_len = u64::try_from(output_buf.len())
209
11
                    .err_tip(|| "Could not output_buf.len() to u64")
?0
;
210
11
                self.metrics
211
11
                    .slow_store_downloaded_bytes
212
11
                    .fetch_add(output_buf_len, Ordering::Acquire);
213
214
11
                let writer_fut = Self::calculate_range(
215
11
                    &(bytes_received..bytes_received + output_buf_len),
216
11
                    &send_range,
217
0
                )?
218
11
                .zip(maybe_writer_pin.as_mut())
219
11
                .map_or_else(
220
4
                    || futures::future::ready(Ok(())).left_future(),
221
7
                    |(range, writer_pin)| writer_pin.send(output_buf.slice(range)).right_future(),
222
                );
223
224
11
                bytes_received += output_buf_len;
225
226
11
                let (fast_tx_res, writer_res) = join!(fast_tx.send(output_buf), writer_fut);
227
11
                fast_tx_res.err_tip(|| "Failed to write to fast store in fast_slow store")
?0
;
228
11
                writer_res.err_tip(|| "Failed to write result to writer in fast_slow store")
?0
;
229
            }
230
12
        };
231
232
12
        let slow_store_fut = self.slow_store.get(key.borrow(), slow_tx);
233
12
        let fast_store_fut =
234
12
            self.fast_store
235
12
                .update(key.borrow(), fast_rx, UploadSizeInfo::ExactSize(sz));
236
237
12
        let (data_stream_res, slow_res, fast_res) =
238
12
            join!(data_stream_fut, slow_store_fut, fast_store_fut);
239
12
        match data_stream_res {
240
12
            Ok((fast_eof_res, maybe_writer_pin)) =>
241
            // Sending the EOF will drop us almost immediately in bytestream_server
242
            // so we perform it as the very last action in this method.
243
            {
244
12
                fast_eof_res.merge(fast_res).merge(slow_res).merge(
245
12
                    if let Some(
mut writer_pin8
) = maybe_writer_pin {
  Branch (245:28): [True: 8, False: 0]
  Branch (245:28): [True: 0, False: 4]
  Branch (245:28): [Folded - Ignored]
246
8
                        writer_pin.send_eof()
247
                    } else {
248
4
                        Ok(())
249
                    },
250
                )
251
            }
252
0
            Err(err) => fast_res.merge(slow_res).merge(Err(err)),
253
        }
254
12
    }
255
256
    /// Ensure our fast store is populated. This should be kept as a low
257
    /// cost function. Since the data itself is shared and not copied it should be fairly
258
    /// low cost to just discard the data, but does cost a few mutex locks while
259
    /// streaming.
260
4
    pub async fn populate_fast_store(&self, key: StoreKey<'_>) -> Result<(), Error> {
261
4
        let maybe_size_info = self
262
4
            .fast_store
263
4
            .has(key.borrow())
264
4
            .await
265
4
            .err_tip(|| "While querying in populate_fast_store")
?0
;
266
4
        if maybe_size_info.is_some() {
  Branch (266:12): [Folded - Ignored]
  Branch (266:12): [True: 0, False: 4]
  Branch (266:12): [Folded - Ignored]
267
0
            return Ok(());
268
4
        }
269
270
        // If the fast store is noop or read only or update only then this is an error.
271
4
        if self
  Branch (271:12): [Folded - Ignored]
  Branch (271:12): [True: 0, False: 4]
  Branch (271:12): [Folded - Ignored]
272
4
            .fast_store
273
4
            .inner_store(Some(key.borrow()))
274
4
            .optimized_for(StoreOptimizations::NoopUpdates)
275
4
            || self.fast_direction == StoreDirection::ReadOnly
  Branch (275:16): [Folded - Ignored]
  Branch (275:16): [True: 0, False: 4]
  Branch (275:16): [Folded - Ignored]
276
4
            || self.fast_direction == StoreDirection::Update
  Branch (276:16): [Folded - Ignored]
  Branch (276:16): [True: 0, False: 4]
  Branch (276:16): [Folded - Ignored]
277
        {
278
0
            return Err(make_err!(
279
0
                Code::Internal,
280
0
                "Attempt to populate fast store that is read only or noop"
281
0
            ));
282
4
        }
283
284
4
        self.get_loader(key.borrow())
285
4
            .get_or_try_init(|| {
286
4
                Pin::new(self).populate_and_maybe_stream(key.borrow(), None, 0, None)
287
4
            })
288
4
            .await
289
4
            .err_tip(|| "Failed to populate()")
290
4
    }
291
292
    /// Returns the range of bytes that should be sent given a slice bounds
293
    /// offset so the output range maps the `received_range.start` to 0.
294
    // TODO(palfrey) This should be put into utils, as this logic is used
295
    // elsewhere in the code.
296
21
    pub fn calculate_range(
297
21
        received_range: &Range<u64>,
298
21
        send_range: &Range<u64>,
299
21
    ) -> Result<Option<Range<usize>>, Error> {
300
        // Protect against subtraction overflow.
301
21
        if received_range.start >= received_range.end {
  Branch (301:12): [True: 0, False: 21]
  Branch (301:12): [Folded - Ignored]
302
0
            return Ok(None);
303
21
        }
304
305
21
        let start = max(received_range.start, send_range.start);
306
21
        let end = min(received_range.end, send_range.end);
307
21
        if received_range.contains(&start) && 
received_range19
.
contains19
(
&(end - 1)19
) {
  Branch (307:12): [True: 19, False: 2]
  Branch (307:47): [True: 17, False: 2]
  Branch (307:12): [Folded - Ignored]
  Branch (307:47): [Folded - Ignored]
308
            // Offset both to the start of the received_range.
309
17
            let calculated_range_start = usize::try_from(start - received_range.start)
310
17
                .err_tip(|| "Could not convert (start - received_range.start) to usize")
?0
;
311
17
            let calculated_range_end = usize::try_from(end - received_range.start)
312
17
                .err_tip(|| "Could not convert (end - received_range.start) to usize")
?0
;
313
17
            Ok(Some(calculated_range_start..calculated_range_end))
314
        } else {
315
4
            Ok(None)
316
        }
317
21
    }
318
}
319
320
#[async_trait]
321
impl StoreDriver for FastSlowStore {
322
    async fn has_with_results(
323
        self: Pin<&Self>,
324
        key: &[StoreKey<'_>],
325
        results: &mut [Option<u64>],
326
12
    ) -> Result<(), Error> {
327
        // If our slow store is a noop store, it'll always return a 404,
328
        // so only check the fast store in such case.
329
        let slow_store = self.slow_store.inner_store::<StoreKey<'_>>(None);
330
        if slow_store.optimized_for(StoreOptimizations::NoopDownloads) {
331
            return self.fast_store.has_with_results(key, results).await;
332
        }
333
        // Only check the slow store because if it's not there, then something
334
        // down stream might be unable to get it.  This should not affect
335
        // workers as they only use get() and a CAS can use an
336
        // ExistenceCacheStore to avoid the bottleneck.
337
        self.slow_store.has_with_results(key, results).await
338
12
    }
339
340
    async fn update(
341
        self: Pin<&Self>,
342
        key: StoreKey<'_>,
343
        mut reader: DropCloserReadHalf,
344
        size_info: UploadSizeInfo,
345
88
    ) -> Result<(), Error> {
346
        // If either one of our stores is a noop store, bypass the multiplexing
347
        // and just use the store that is not a noop store.
348
        let ignore_slow = self
349
            .slow_store
350
            .inner_store(Some(key.borrow()))
351
            .optimized_for(StoreOptimizations::NoopUpdates)
352
            || self.slow_direction == StoreDirection::ReadOnly
353
            || self.slow_direction == StoreDirection::Get;
354
        let ignore_fast = self
355
            .fast_store
356
            .inner_store(Some(key.borrow()))
357
            .optimized_for(StoreOptimizations::NoopUpdates)
358
            || self.fast_direction == StoreDirection::ReadOnly
359
            || self.fast_direction == StoreDirection::Get;
360
        if ignore_slow && ignore_fast {
361
            // We need to drain the reader to avoid the writer complaining that we dropped
362
            // the connection prematurely.
363
            reader
364
                .drain()
365
                .await
366
                .err_tip(|| "In FastFlowStore::update")?;
367
            return Ok(());
368
        }
369
        if ignore_slow {
370
            return self.fast_store.update(key, reader, size_info).await;
371
        }
372
        if ignore_fast {
373
            return self.slow_store.update(key, reader, size_info).await;
374
        }
375
376
        let (mut fast_tx, fast_rx) = make_buf_channel_pair();
377
        let (mut slow_tx, slow_rx) = make_buf_channel_pair();
378
379
84
        let data_stream_fut = async move {
380
            loop {
381
136
                let buffer = reader
382
136
                    .recv()
383
136
                    .await
384
136
                    .err_tip(|| "Failed to read buffer in fastslow store")
?0
;
385
136
                if buffer.is_empty() {
  Branch (385:20): [True: 84, False: 52]
  Branch (385:20): [Folded - Ignored]
386
                    // EOF received.
387
84
                    fast_tx.send_eof().err_tip(
388
                        || "Failed to write eof to fast store in fast_slow store update",
389
0
                    )?;
390
84
                    slow_tx
391
84
                        .send_eof()
392
84
                        .err_tip(|| "Failed to write eof to writer in fast_slow store update")
?0
;
393
84
                    return Result::<(), Error>::Ok(());
394
52
                }
395
396
52
                let (fast_result, slow_result) =
397
52
                    join!(fast_tx.send(buffer.clone()), slow_tx.send(buffer));
398
52
                fast_result
399
52
                    .map_err(|e| 
{0
400
0
                        make_err!(
401
0
                            Code::Internal,
402
                            "Failed to send message to fast_store in fast_slow_store {:?}",
403
                            e
404
                        )
405
0
                    })
406
52
                    .merge(slow_result.map_err(|e| 
{0
407
0
                        make_err!(
408
0
                            Code::Internal,
409
                            "Failed to send message to slow_store in fast_slow store {:?}",
410
                            e
411
                        )
412
0
                    }))?;
413
            }
414
84
        };
415
416
        let fast_store_fut = self.fast_store.update(key.borrow(), fast_rx, size_info);
417
        let slow_store_fut = self.slow_store.update(key.borrow(), slow_rx, size_info);
418
419
        let (data_stream_res, fast_res, slow_res) =
420
            join!(data_stream_fut, fast_store_fut, slow_store_fut);
421
        data_stream_res.merge(fast_res).merge(slow_res)?;
422
        Ok(())
423
88
    }
424
425
    /// FastSlowStore has optimizations for dealing with files.
426
0
    fn optimized_for(&self, optimization: StoreOptimizations) -> bool {
427
0
        optimization == StoreOptimizations::FileUpdates
428
0
    }
429
430
    /// Optimized variation to consume the file if one of the stores is a
431
    /// filesystem store. This makes the operation a move instead of a copy
432
    /// dramatically increasing performance for large files.
433
    async fn update_with_whole_file(
434
        self: Pin<&Self>,
435
        key: StoreKey<'_>,
436
        path: OsString,
437
        mut file: fs::FileSlot,
438
        upload_size: UploadSizeInfo,
439
5
    ) -> Result<Option<fs::FileSlot>, Error> {
440
        if self
441
            .fast_store
442
            .optimized_for(StoreOptimizations::FileUpdates)
443
        {
444
            if !self
445
                .slow_store
446
                .inner_store(Some(key.borrow()))
447
                .optimized_for(StoreOptimizations::NoopUpdates)
448
                && self.slow_direction != StoreDirection::ReadOnly
449
                && self.slow_direction != StoreDirection::Get
450
            {
451
                slow_update_store_with_file(
452
                    self.slow_store.as_store_driver_pin(),
453
                    key.borrow(),
454
                    &mut file,
455
                    upload_size,
456
                )
457
                .await
458
                .err_tip(|| "In FastSlowStore::update_with_whole_file slow_store")?;
459
            }
460
            if self.fast_direction == StoreDirection::ReadOnly
461
                || self.fast_direction == StoreDirection::Get
462
            {
463
                return Ok(Some(file));
464
            }
465
            return self
466
                .fast_store
467
                .update_with_whole_file(key, path, file, upload_size)
468
                .await;
469
        }
470
471
        if self
472
            .slow_store
473
            .optimized_for(StoreOptimizations::FileUpdates)
474
        {
475
            let ignore_fast = self
476
                .fast_store
477
                .inner_store(Some(key.borrow()))
478
                .optimized_for(StoreOptimizations::NoopUpdates)
479
                || self.fast_direction == StoreDirection::ReadOnly
480
                || self.fast_direction == StoreDirection::Get;
481
            if !ignore_fast {
482
                slow_update_store_with_file(
483
                    self.fast_store.as_store_driver_pin(),
484
                    key.borrow(),
485
                    &mut file,
486
                    upload_size,
487
                )
488
                .await
489
                .err_tip(|| "In FastSlowStore::update_with_whole_file fast_store")?;
490
            }
491
            let ignore_slow = self.slow_direction == StoreDirection::ReadOnly
492
                || self.slow_direction == StoreDirection::Get;
493
            if ignore_slow {
494
                return Ok(Some(file));
495
            }
496
            return self
497
                .slow_store
498
                .update_with_whole_file(key, path, file, upload_size)
499
                .await;
500
        }
501
502
        slow_update_store_with_file(self, key, &mut file, upload_size)
503
            .await
504
            .err_tip(|| "In FastSlowStore::update_with_whole_file")?;
505
        Ok(Some(file))
506
5
    }
507
508
    async fn get_part(
509
        self: Pin<&Self>,
510
        key: StoreKey<'_>,
511
        writer: &mut DropCloserWriteHalf,
512
        offset: u64,
513
        length: Option<u64>,
514
68
    ) -> Result<(), Error> {
515
        // TODO(palfrey) Investigate if we should maybe ignore errors here instead of
516
        // forwarding them up.
517
        if self.fast_store.has(key.borrow()).await?.is_some() {
518
            self.metrics
519
                .fast_store_hit_count
520
                .fetch_add(1, Ordering::Acquire);
521
            self.fast_store
522
                .get_part(key, writer.borrow_mut(), offset, length)
523
                .await?;
524
            self.metrics
525
                .fast_store_downloaded_bytes
526
                .fetch_add(writer.get_bytes_written(), Ordering::Acquire);
527
            return Ok(());
528
        }
529
530
        // If the fast store is noop or read only or update only then bypass it.
531
        if self
532
            .fast_store
533
            .inner_store(Some(key.borrow()))
534
            .optimized_for(StoreOptimizations::NoopUpdates)
535
            || self.fast_direction == StoreDirection::ReadOnly
536
            || self.fast_direction == StoreDirection::Update
537
        {
538
            self.metrics
539
                .slow_store_hit_count
540
                .fetch_add(1, Ordering::Acquire);
541
            self.slow_store
542
                .get_part(key, writer.borrow_mut(), offset, length)
543
                .await?;
544
            self.metrics
545
                .slow_store_downloaded_bytes
546
                .fetch_add(writer.get_bytes_written(), Ordering::Acquire);
547
            return Ok(());
548
        }
549
550
        let mut writer = Some(writer);
551
        self.get_loader(key.borrow())
552
8
            .get_or_try_init(|| {
553
8
                self.populate_and_maybe_stream(key.borrow(), writer.take(), offset, length)
554
8
            })
555
            .await?;
556
557
        // If we didn't stream then re-enter which will stream from the fast
558
        // store, or retry the download.  We should not get in a loop here
559
        // because OnceCell has the good sense to retry for all callers so in
560
        // order to get here the fast store will have been populated.  There's
561
        // an outside chance it was evicted, but that's slim.
562
        if let Some(writer) = writer.take() {
563
            self.get_part(key, writer, offset, length).await
564
        } else {
565
            // This was the thread that did the streaming already, lucky duck.
566
            Ok(())
567
        }
568
68
    }
569
570
2
    fn inner_store(&self, _key: Option<StoreKey>) -> &dyn StoreDriver {
571
2
        self
572
2
    }
573
574
2
    fn as_any<'a>(&'a self) -> &'a (dyn core::any::Any + Sync + Send + 'static) {
575
2
        self
576
2
    }
577
578
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> {
579
0
        self
580
0
    }
581
582
0
    fn register_remove_callback(
583
0
        self: Arc<Self>,
584
0
        callback: Arc<dyn RemoveItemCallback>,
585
0
    ) -> Result<(), Error> {
586
0
        self.fast_store.register_remove_callback(callback.clone())?;
587
0
        self.slow_store.register_remove_callback(callback)?;
588
0
        Ok(())
589
0
    }
590
}
591
592
#[derive(Debug, Default, MetricsComponent)]
593
struct FastSlowStoreMetrics {
594
    #[metric(help = "Hit count for the fast store")]
595
    fast_store_hit_count: AtomicU64,
596
    #[metric(help = "Downloaded bytes from the fast store")]
597
    fast_store_downloaded_bytes: AtomicU64,
598
    #[metric(help = "Hit count for the slow store")]
599
    slow_store_hit_count: AtomicU64,
600
    #[metric(help = "Downloaded bytes from the slow store")]
601
    slow_store_downloaded_bytes: AtomicU64,
602
}
603
604
default_health_status_indicator!(FastSlowStore);