Coverage Report

Created: 2026-06-19 15:12

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/fast_slow_store.rs
Line
Count
Source
1
// Copyright 2024-2025 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::borrow::BorrowMut;
16
use core::cmp::{max, min};
17
use core::ops::Range;
18
use core::pin::Pin;
19
use core::sync::atomic::{AtomicU64, Ordering};
20
use core::time::Duration;
21
use std::collections::HashMap;
22
use std::ffi::OsString;
23
use std::sync::{Arc, Weak};
24
25
use async_trait::async_trait;
26
use futures::stream::{FuturesUnordered, StreamExt};
27
use futures::{FutureExt, join, try_join};
28
use nativelink_config::stores::{FastSlowSpec, StoreDirection};
29
use nativelink_error::{Code, Error, ErrorContext, ResultExt, make_err};
30
use nativelink_metric::MetricsComponent;
31
use nativelink_util::buf_channel::{
32
    DropCloserReadHalf, DropCloserWriteHalf, make_buf_channel_pair,
33
};
34
use nativelink_util::fs;
35
use nativelink_util::health_utils::{HealthStatusIndicator, default_health_status_indicator};
36
use nativelink_util::store_trait::{
37
    RemoveItemCallback, Store, StoreDriver, StoreKey, StoreLike, StoreOptimizations,
38
    UploadSizeInfo, slow_update_store_with_file,
39
};
40
use parking_lot::Mutex;
41
use tokio::sync::OnceCell;
42
use tracing::{debug, info, trace, warn};
43
44
// TODO(palfrey) This store needs to be evaluated for more efficient memory usage,
45
// there are many copies happening internally.
46
47
type Loader = Arc<OnceCell<()>>;
48
type MaybeSize = Option<u64>;
49
50
// TODO(palfrey) We should consider copying the data in the background to allow the
51
// client to hang up while the data is buffered. An alternative is to possibly make a
52
// "BufferedStore" that could be placed on the "slow" store that would hang up early
53
// if data is in the buffer.
54
#[derive(Debug, MetricsComponent)]
55
pub struct FastSlowStore {
56
    #[metric(group = "fast_store")]
57
    fast_store: Store,
58
    fast_direction: StoreDirection,
59
    #[metric(group = "slow_store")]
60
    slow_store: Store,
61
    slow_direction: StoreDirection,
62
    /// See [`FastSlowSpec::bypass_dedup_threshold_bytes`].
63
    bypass_dedup_threshold_bytes: u64,
64
    weak_self: Weak<Self>,
65
    #[metric]
66
    metrics: FastSlowStoreMetrics,
67
    // De-duplicate requests for the fast store, only the first streams, others
68
    // are blocked.  This may feel like it's causing a slow down of tasks, but
69
    // actually it's faster because we're not downloading the file multiple
70
    // times are doing loads of duplicate IO.
71
    populating_digests: Mutex<HashMap<StoreKey<'static>, Loader>>,
72
    // Tracks keys whose slow-store write is currently in flight, along with
73
    // the best-known size. Consulted by `has_with_results` so that a
74
    // concurrent writer that has not yet finished pushing to the slow store
75
    // is still visible to a concurrent existence check, preventing redundant
76
    // duplicate uploads of the same blob.
77
    in_flight_slow_writes: Mutex<HashMap<StoreKey<'static>, Arc<tokio::sync::Mutex<MaybeSize>>>>,
78
}
79
80
// This guard ensures that the populating_digests is cleared even if the future
81
// is dropped, it is cancel safe.
82
struct LoaderGuard<'a> {
83
    weak_store: Weak<FastSlowStore>,
84
    key: StoreKey<'a>,
85
    loader: Option<Loader>,
86
    is_leader: bool,
87
}
88
89
impl LoaderGuard<'_> {
90
75
    async fn get_or_try_init<E, F, Fut>(&self, f: F) -> Result<(), E>
91
75
    where
92
75
        F: FnOnce() -> Fut,
93
75
        Fut: Future<Output = Result<(), E>>,
94
75
    {
95
75
        if let Some(loader) = &self.loader {
96
75
            loader.get_or_try_init(f).await.
map74
(|&()| ())
97
        } else {
98
            // This is impossible, but we do it anyway.
99
0
            f().await
100
        }
101
74
    }
102
}
103
104
/// Cancel-safe RAII guard that removes an entry from
105
/// `FastSlowStore::in_flight_slow_writes` when dropped. This ensures the
106
/// map does not leak entries if the surrounding `update()` future is
107
/// cancelled before the slow-store write completes.
108
struct InFlightSlowWriteGuard {
109
    weak_store: Weak<FastSlowStore>,
110
    key: Option<StoreKey<'static>>,
111
    write_complete_guard: tokio::sync::OwnedMutexGuard<MaybeSize>,
112
}
113
114
impl InFlightSlowWriteGuard {
115
130
    fn complete(mut self, size: MaybeSize) {
116
130
        *self.write_complete_guard = size;
117
130
    }
118
}
119
120
impl Drop for InFlightSlowWriteGuard {
121
131
    fn drop(&mut self) {
122
131
        let Some(store) = self.weak_store.upgrade() else {
123
0
            return;
124
        };
125
131
        let Some(key) = self.key.take() else {
126
0
            return;
127
        };
128
131
        store.in_flight_slow_writes.lock().remove(&key);
129
131
    }
130
}
131
132
impl Drop for LoaderGuard<'_> {
133
75
    fn drop(&mut self) {
134
75
        let Some(store) = self.weak_store.upgrade() else {
135
            // The store has already gone away, nothing to remove from.
136
0
            return;
137
        };
138
75
        let Some(loader) = self.loader.take() else {
139
            // This should never happen, but we do it to be safe.
140
0
            return;
141
        };
142
143
75
        let mut guard = store.populating_digests.lock();
144
75
        if let std::collections::hash_map::Entry::Occupied(occupied_entry) =
145
75
            guard.entry(self.key.borrow().into_owned())
146
75
            && Arc::ptr_eq(occupied_entry.get(), &loader)
147
        {
148
75
            drop(loader);
149
75
            if Arc::strong_count(occupied_entry.get()) == 1 {
150
52
                // This is the last loader, so remove it.
151
52
                occupied_entry.remove();
152
52
            
}23
153
0
        }
154
75
    }
155
}
156
157
impl FastSlowStore {
158
75
    pub fn new(spec: &FastSlowSpec, fast_store: Store, slow_store: Store) -> Arc<Self> {
159
75
        Arc::new_cyclic(|weak_self| Self {
160
75
            fast_store,
161
75
            fast_direction: spec.fast_direction,
162
75
            slow_store,
163
75
            slow_direction: spec.slow_direction,
164
            // 0 (default) disables the bypass entirely (always dedup).
165
75
            bypass_dedup_threshold_bytes: spec.bypass_dedup_threshold_bytes,
166
75
            weak_self: weak_self.clone(),
167
75
            metrics: FastSlowStoreMetrics::default(),
168
75
            populating_digests: Mutex::new(HashMap::new()),
169
75
            in_flight_slow_writes: Mutex::new(HashMap::new()),
170
75
        })
171
75
    }
172
173
131
    fn register_in_flight_slow_write(&self, key: StoreKey<'_>) -> InFlightSlowWriteGuard {
174
131
        let owned = key.into_owned();
175
131
        let write_complete = Arc::new(tokio::sync::Mutex::new(None));
176
131
        let write_complete_guard = write_complete
177
131
            .clone()
178
131
            .try_lock_owned()
179
131
            .expect("Newly created mutex is locked");
180
131
        self.in_flight_slow_writes
181
131
            .lock()
182
131
            .insert(owned.borrow().into_owned(), write_complete);
183
131
        InFlightSlowWriteGuard {
184
131
            weak_store: self.weak_self.clone(),
185
131
            key: Some(owned),
186
131
            write_complete_guard,
187
131
        }
188
131
    }
189
190
    /// Digest size in bytes, or `None` for non-digest keys.
191
28
    const fn digest_size_bytes(key: &StoreKey<'_>) -> Option<u64> {
192
28
        match key {
193
28
            StoreKey::Digest(d) => Some(d.size_bytes()),
194
0
            StoreKey::Str(_) => None,
195
        }
196
28
    }
197
198
    /// Whether a read should skip dedup and hit the slow store directly. A
199
    /// threshold of 0 disables the bypass so every read goes through dedup.
200
79
    fn should_bypass_dedup(&self, key: &StoreKey<'_>) -> bool {
201
79
        self.bypass_dedup_threshold_bytes != 0
202
28
            && Self::digest_size_bytes(key)
203
28
                .is_some_and(|size| size >= self.bypass_dedup_threshold_bytes)
204
79
    }
205
206
0
    pub const fn fast_store(&self) -> &Store {
207
0
        &self.fast_store
208
0
    }
209
210
0
    pub const fn slow_store(&self) -> &Store {
211
0
        &self.slow_store
212
0
    }
213
214
2
    pub fn get_arc(&self) -> Option<Arc<Self>> {
215
2
        self.weak_self.upgrade()
216
2
    }
217
218
75
    fn get_loader<'a>(&self, key: StoreKey<'a>) -> LoaderGuard<'a> {
219
        // Get a single loader instance that's used to populate the fast store
220
        // for this digest.  If another request comes in then it's de-duplicated.
221
75
        let mut is_leader = false;
222
75
        let loader = match self
223
75
            .populating_digests
224
75
            .lock()
225
75
            .entry(key.borrow().into_owned())
226
        {
227
23
            std::collections::hash_map::Entry::Occupied(occupied_entry) => {
228
23
                occupied_entry.get().clone()
229
            }
230
52
            std::collections::hash_map::Entry::Vacant(vacant_entry) => {
231
52
                is_leader = true;
232
52
                vacant_entry.insert(Arc::new(OnceCell::new())).clone()
233
            }
234
        };
235
75
        LoaderGuard {
236
75
            weak_store: self.weak_self.clone(),
237
75
            key,
238
75
            loader: Some(loader),
239
75
            is_leader,
240
75
        }
241
75
    }
242
243
52
    async fn populate_and_maybe_stream(
244
52
        self: Pin<&Self>,
245
52
        key: StoreKey<'_>,
246
52
        maybe_writer: Option<&mut DropCloserWriteHalf>,
247
52
        offset: u64,
248
52
        length: Option<u64>,
249
52
    ) -> Result<(), Error> {
250
52
        let 
reader_stream_size51
= if self
251
52
            .slow_store
252
52
            .inner_store(Some(key.borrow()))
253
52
            .optimized_for(StoreOptimizations::LazyExistenceOnSync)
254
        {
255
2
            trace!(
256
                %key,
257
2
                store_name = %self.slow_store.inner_store(Some(key.borrow())).get_name(),
258
                "Skipping .has() check due to LazyExistenceOnSync optimization"
259
            );
260
2
            UploadSizeInfo::MaxSize(u64::MAX)
261
        } else {
262
50
            UploadSizeInfo::ExactSize(self
263
50
                    .slow_store
264
50
                    .has(key.borrow())
265
50
                    .await
266
50
                    .err_tip(|| "Failed to run has() on slow store")
?0
267
50
                    .ok_or_else(|| 
{1
268
1
                        let err = make_err!(
269
1
                            Code::NotFound,
270
                            "Object {} not found in either fast or slow store. \
271
                                If using multiple workers, ensure all workers share the same CAS storage path.",
272
1
                            key.as_str()
273
                        );
274
1
                        if let StoreKey::Digest(d) = key.borrow() {
275
1
                            err.with_context(ErrorContext::MissingDigest {
276
1
                                hash: d.packed_hash().to_string(),
277
1
                                size: d.size_bytes() as i64,
278
1
                            })
279
                        } else {
280
0
                            err
281
                        }
282
1
                    })?
283
            )
284
        };
285
286
51
        let send_range = offset..length.map_or(u64::MAX, |length| 
length28
+
offset28
);
287
51
        let mut bytes_received: u64 = 0;
288
51
        let mut counted_hit = false;
289
290
51
        let (mut fast_tx, fast_rx) = make_buf_channel_pair();
291
51
        let (slow_tx, mut slow_rx) = make_buf_channel_pair();
292
51
        let data_stream_fut = async move {
293
51
            let mut maybe_writer_pin = maybe_writer.map(Pin::new);
294
            loop {
295
94
                let 
output_buf93
= slow_rx
296
94
                    .recv()
297
94
                    .await
298
94
                    .err_tip(|| "Failed to read data data buffer from slow store")
?1
;
299
93
                if output_buf.is_empty() {
300
                    // Write out our EOF.
301
                    // We are dropped as soon as we send_eof to writer_pin, so
302
                    // we wait until we've finished all of our joins to do that.
303
50
                    let fast_res = fast_tx.send_eof();
304
50
                    return Ok::<_, Error>((fast_res, maybe_writer_pin));
305
43
                }
306
307
43
                if !counted_hit {
308
43
                    self.metrics
309
43
                        .slow_store_hit_count
310
43
                        .fetch_add(1, Ordering::Acquire);
311
43
                    counted_hit = true;
312
43
                
}0
313
314
43
                let output_buf_len = u64::try_from(output_buf.len())
315
43
                    .err_tip(|| "Could not output_buf.len() to u64")
?0
;
316
43
                self.metrics
317
43
                    .slow_store_downloaded_bytes
318
43
                    .fetch_add(output_buf_len, Ordering::Acquire);
319
320
43
                let writer_fut = Self::calculate_range(
321
43
                    &(bytes_received..bytes_received + output_buf_len),
322
43
                    &send_range,
323
0
                )?
324
43
                .zip(maybe_writer_pin.as_mut())
325
43
                .map_or_else(
326
15
                    || futures::future::ready(Ok(())).left_future(),
327
28
                    |(range, writer_pin)| writer_pin.send(output_buf.slice(range)).right_future(),
328
                );
329
330
43
                bytes_received += output_buf_len;
331
332
43
                let (fast_tx_res, writer_res) = join!(fast_tx.send(output_buf), writer_fut);
333
43
                fast_tx_res.err_tip(|| "Failed to write to fast store in fast_slow store")
?0
;
334
43
                writer_res.err_tip(|| "Failed to write result to writer in fast_slow store")
?0
;
335
            }
336
51
        };
337
338
51
        let slow_store_fut = self.slow_store.get(key.borrow(), slow_tx);
339
51
        let fast_store_fut = self
340
51
            .fast_store
341
51
            .update(key.borrow(), fast_rx, reader_stream_size);
342
343
51
        let (data_stream_res, slow_res, fast_res) =
344
51
            join!(data_stream_fut, slow_store_fut, fast_store_fut);
345
51
        match data_stream_res {
346
50
            Ok((fast_eof_res, maybe_writer_pin)) =>
347
            // Sending the EOF will drop us almost immediately in bytestream_server
348
            // so we perform it as the very last action in this method.
349
            {
350
50
                fast_eof_res.merge(fast_res).merge(slow_res).merge(
351
50
                    if let Some(
mut writer_pin34
) = maybe_writer_pin {
352
34
                        writer_pin.send_eof()
353
                    } else {
354
16
                        Ok(())
355
                    },
356
                )
357
            }
358
1
            Err(err) => match slow_res {
359
1
                Err(slow_err) if slow_err.code == Code::NotFound => Err(slow_err),
360
0
                _ => fast_res.merge(slow_res).merge(Err(err)),
361
            },
362
        }
363
52
    }
364
365
    /// Ensure our fast store is populated. This should be kept as a low
366
    /// cost function. Since the data itself is shared and not copied it should be fairly
367
    /// low cost to just discard the data, but does cost a few mutex locks while
368
    /// streaming.
369
26
    
pub async fn populate_fast_store(&self, key: StoreKey<'_>) -> Result<(), Error>0
{
370
26
        let maybe_size_info = self
371
26
            .fast_store
372
26
            .has(key.borrow())
373
26
            .await
374
26
            .err_tip(|| "While querying in populate_fast_store")
?0
;
375
26
        if maybe_size_info.is_some() {
376
10
            return Ok(());
377
16
        }
378
379
        // If the fast store is noop or read only or update only then this is an error.
380
16
        if self
381
16
            .fast_store
382
16
            .inner_store(Some(key.borrow()))
383
16
            .optimized_for(StoreOptimizations::NoopUpdates)
384
16
            || self.fast_direction == StoreDirection::ReadOnly
385
16
            || self.fast_direction == StoreDirection::Update
386
        {
387
0
            return Err(make_err!(
388
0
                Code::Internal,
389
0
                "Attempt to populate fast store that is read only or noop"
390
0
            ));
391
16
        }
392
393
16
        self.get_loader(key.borrow())
394
16
            .get_or_try_init(|| {
395
16
                Pin::new(self).populate_and_maybe_stream(key.borrow(), None, 0, None)
396
16
            })
397
16
            .await
398
16
            .err_tip(|| "Failed to populate()")
399
26
    }
400
401
    /// Returns the range of bytes that should be sent given a slice bounds
402
    /// offset so the output range maps the `received_range.start` to 0.
403
    // TODO(palfrey) This should be put into utils, as this logic is used
404
    // elsewhere in the code.
405
53
    pub fn calculate_range(
406
53
        received_range: &Range<u64>,
407
53
        send_range: &Range<u64>,
408
53
    ) -> Result<Option<Range<usize>>, Error> {
409
        // Protect against subtraction overflow.
410
53
        if received_range.start >= received_range.end {
411
0
            return Ok(None);
412
53
        }
413
414
53
        let start = max(received_range.start, send_range.start);
415
53
        let end = min(received_range.end, send_range.end);
416
53
        if received_range.contains(&start) && 
received_range51
.
contains51
(
&(end - 1)51
) {
417
            // Offset both to the start of the received_range.
418
49
            let calculated_range_start = usize::try_from(start - received_range.start)
419
49
                .err_tip(|| "Could not convert (start - received_range.start) to usize")
?0
;
420
49
            let calculated_range_end = usize::try_from(end - received_range.start)
421
49
                .err_tip(|| "Could not convert (end - received_range.start) to usize")
?0
;
422
49
            Ok(Some(calculated_range_start..calculated_range_end))
423
        } else {
424
4
            Ok(None)
425
        }
426
53
    }
427
}
428
429
#[async_trait]
430
impl StoreDriver for FastSlowStore {
431
0
    async fn post_init(self: Arc<Self>) -> Result<(), Error> {
432
        try_join!(
433
            self.fast_store.clone().into_inner().post_init(),
434
            self.slow_store.clone().into_inner().post_init()
435
        )?;
436
        Ok(())
437
0
    }
438
439
    async fn has_with_results(
440
        self: Pin<&Self>,
441
        key: &[StoreKey<'_>],
442
        results: &mut [Option<u64>],
443
24
    ) -> Result<(), Error> {
444
        // If our slow store is a noop store, it'll always return a 404,
445
        // so only check the fast store in such case.
446
        let slow_store = self.slow_store.inner_store::<StoreKey<'_>>(None);
447
        if slow_store.optimized_for(StoreOptimizations::NoopDownloads) {
448
            return self.fast_store.has_with_results(key, results).await;
449
        }
450
451
        // Check with the slow store first.
452
        self.slow_store.has_with_results(key, results).await?;
453
454
        // Check for any in-flight requests to the slow store next.
455
        let mut in_flight_futs = FuturesUnordered::new();
456
        {
457
            let in_flight = self.in_flight_slow_writes.lock();
458
            if !in_flight.is_empty() {
459
                for (i, (k, result)) in key.iter().zip(results.iter_mut()).enumerate() {
460
                    if result.is_none() {
461
                        let owned = k.borrow().into_owned();
462
                        if let Some(maybe_size) = in_flight.get(&owned) {
463
                            let maybe_size = maybe_size.clone();
464
3
                            in_flight_futs.push(async move { (i, *maybe_size.lock().await) });
465
                        }
466
                    }
467
                }
468
            }
469
        }
470
        while let Some((i, size)) = in_flight_futs.next().await {
471
            results[i] = size;
472
        }
473
474
        // NOTE: We intentionally *NEVER* check the fast store, this is to
475
        // ensure that we re-upload data to the slow store if it only exists
476
        // in the fast store.  This does not affect workers as they do not
477
        // check existence through `has` and instead go direct to loading the
478
        // data which bypasses the check and will load from the fast store if
479
        // it does not exist in the slow store.
480
481
        Ok(())
482
24
    }
483
484
    async fn update(
485
        self: Pin<&Self>,
486
        key: StoreKey<'_>,
487
        mut reader: DropCloserReadHalf,
488
        size_info: UploadSizeInfo,
489
123
    ) -> Result<u64, Error> {
490
        // If either one of our stores is a noop store, bypass the multiplexing
491
        // and just use the store that is not a noop store.
492
        let ignore_slow = self
493
            .slow_store
494
            .inner_store(Some(key.borrow()))
495
            .optimized_for(StoreOptimizations::NoopUpdates)
496
            || self.slow_direction == StoreDirection::ReadOnly
497
            || self.slow_direction == StoreDirection::Get;
498
        let ignore_fast = self
499
            .fast_store
500
            .inner_store(Some(key.borrow()))
501
            .optimized_for(StoreOptimizations::NoopUpdates)
502
            || self.fast_direction == StoreDirection::ReadOnly
503
            || self.fast_direction == StoreDirection::Get;
504
        if ignore_slow && ignore_fast {
505
            // We need to drain the reader to avoid the writer complaining that we dropped
506
            // the connection prematurely.
507
            return reader.drain().await.err_tip(|| "In FastFlowStore::update");
508
        }
509
        if ignore_slow {
510
            return self.fast_store.update(key, reader, size_info).await;
511
        }
512
        let slow_in_flight_guard = self.register_in_flight_slow_write(key.borrow());
513
        if ignore_fast {
514
            let result = self
515
                .slow_store
516
                .update(key.borrow(), reader, size_info)
517
                .await;
518
            if let Ok(size) = &result {
519
                slow_in_flight_guard.complete(Some(*size));
520
            }
521
            return result;
522
        }
523
524
        let (mut fast_tx, fast_rx) = make_buf_channel_pair();
525
        let (mut slow_tx, slow_rx) = make_buf_channel_pair();
526
527
        let key_debug = format!("{key:?}");
528
        trace!(
529
            key = %key_debug,
530
            "FastSlowStore::update: starting dual-store upload",
531
        );
532
        let update_start = std::time::Instant::now();
533
534
117
        let data_stream_fut = async move {
535
117
            let mut bytes_sent: u64 = 0;
536
            loop {
537
189
                let buffer = reader
538
189
                    .recv()
539
189
                    .await
540
189
                    .err_tip(|| "Failed to read buffer in fastslow store")
?0
;
541
189
                if buffer.is_empty() {
542
                    // EOF received.
543
117
                    fast_tx.send_eof().err_tip(
544
                        || "Failed to write eof to fast store in fast_slow store update",
545
0
                    )?;
546
117
                    slow_tx
547
117
                        .send_eof()
548
117
                        .err_tip(|| "Failed to write eof to writer in fast_slow store update")
?0
;
549
117
                    debug!(
550
                        total_bytes = bytes_sent,
551
                        "FastSlowStore::update: data_stream sent EOF to both stores",
552
                    );
553
117
                    return Result::<u64, Error>::Ok(bytes_sent);
554
72
                }
555
556
72
                let chunk_len = buffer.len();
557
72
                let send_start = std::time::Instant::now();
558
72
                let (fast_result, slow_result) =
559
72
                    join!(fast_tx.send(buffer.clone()), slow_tx.send(buffer));
560
72
                let send_elapsed = send_start.elapsed();
561
72
                if send_elapsed.as_secs() >= 5 {
562
0
                    warn!(
563
                        chunk_len,
564
0
                        send_elapsed_ms = send_elapsed.as_millis(),
565
                        total_bytes = bytes_sent,
566
                        "FastSlowStore::update: channel send stalled (>5s). A downstream store may be hanging",
567
                    );
568
72
                }
569
72
                bytes_sent += u64::try_from(chunk_len).unwrap_or(u64::MAX);
570
72
                fast_result
571
72
                    .map_err(|e| 
{0
572
0
                        make_err!(
573
0
                            Code::Internal,
574
                            "Failed to send message to fast_store in fast_slow_store {:?}",
575
                            e
576
                        )
577
0
                    })
578
72
                    .merge(slow_result.map_err(|e| 
{0
579
0
                        make_err!(
580
0
                            Code::Internal,
581
                            "Failed to send message to slow_store in fast_slow store {:?}",
582
                            e
583
                        )
584
0
                    }))?;
585
            }
586
117
        };
587
588
        let fast_store_fut = self.fast_store.update(key.borrow(), fast_rx, size_info);
589
        let slow_store_fut = self.slow_store.update(key.borrow(), slow_rx, size_info);
590
591
        let (data_stream_res, fast_res, slow_res) =
592
            join!(data_stream_fut, fast_store_fut, slow_store_fut);
593
594
        if let Ok(size) = slow_res {
595
            slow_in_flight_guard.complete(Some(size));
596
        } else {
597
            drop(slow_in_flight_guard);
598
        }
599
600
        let total_elapsed = update_start.elapsed();
601
        if data_stream_res.is_err() || fast_res.is_err() || slow_res.is_err() {
602
            let all_not_found = [&data_stream_res, &fast_res, &slow_res]
603
                .iter()
604
0
                .all(|r| match r {
605
0
                    Ok(_size) => true,
606
0
                    Err(e) => e.code == Code::NotFound,
607
0
                });
608
            if all_not_found {
609
                info!(
610
                    key = %key_debug,
611
                    elapsed_ms = total_elapsed.as_millis(),
612
                    data_stream_ok = data_stream_res.is_ok(),
613
                    fast_store_ok = fast_res.is_ok(),
614
                    slow_store_ok = slow_res.is_ok(),
615
                    "FastSlowStore::update: completed with NotFound error(s)",
616
                );
617
            } else {
618
                warn!(
619
                    key = %key_debug,
620
                    elapsed_ms = total_elapsed.as_millis(),
621
                    data_stream_ok = data_stream_res.is_ok(),
622
                    fast_store_ok = fast_res.is_ok(),
623
                    slow_store_ok = slow_res.is_ok(),
624
                    "FastSlowStore::update: completed with error(s)",
625
                );
626
            }
627
        } else {
628
            trace!(
629
                key = %key_debug,
630
                elapsed_ms = total_elapsed.as_millis(),
631
                "FastSlowStore::update: completed successfully",
632
            );
633
        }
634
        data_stream_res.merge(fast_res).merge(slow_res)
635
123
    }
636
637
    /// `FastSlowStore` has optimizations for dealing with files.
638
0
    fn optimized_for(&self, optimization: StoreOptimizations) -> bool {
639
0
        optimization == StoreOptimizations::FileUpdates
640
0
    }
641
642
    /// Optimized variation to consume the file if one of the stores is a
643
    /// filesystem store. This makes the operation a move instead of a copy
644
    /// dramatically increasing performance for large files.
645
    async fn update_with_whole_file(
646
        self: Pin<&Self>,
647
        key: StoreKey<'_>,
648
        path: OsString,
649
        mut file: fs::FileSlot,
650
        upload_size: UploadSizeInfo,
651
10
    ) -> Result<(u64, Option<fs::FileSlot>), Error> {
652
        trace!(
653
            key = ?key,
654
            ?upload_size,
655
            "FastSlowStore::update_with_whole_file: starting",
656
        );
657
        if self
658
            .fast_store
659
            .optimized_for(StoreOptimizations::FileUpdates)
660
        {
661
            if !self
662
                .slow_store
663
                .inner_store(Some(key.borrow()))
664
                .optimized_for(StoreOptimizations::NoopUpdates)
665
                && self.slow_direction != StoreDirection::ReadOnly
666
                && self.slow_direction != StoreDirection::Get
667
            {
668
                trace!("FastSlowStore::update_with_whole_file: uploading to slow_store");
669
                let slow_start = std::time::Instant::now();
670
                let slow_in_flight_guard = self.register_in_flight_slow_write(key.borrow());
671
                let size = slow_update_store_with_file(
672
                    self.slow_store.as_store_driver_pin(),
673
                    key.borrow(),
674
                    &mut file,
675
                    upload_size,
676
                )
677
                .await
678
                .err_tip(|| "In FastSlowStore::update_with_whole_file slow_store")?;
679
                slow_in_flight_guard.complete(Some(size));
680
                trace!(
681
                    elapsed_ms = slow_start.elapsed().as_millis(),
682
                    "FastSlowStore::update_with_whole_file: slow_store upload completed",
683
                );
684
            }
685
            if self.fast_direction == StoreDirection::ReadOnly
686
                || self.fast_direction == StoreDirection::Get
687
            {
688
                let file_size = match upload_size {
689
                    UploadSizeInfo::ExactSize(size) => size,
690
                    UploadSizeInfo::MaxSize(_) => file
691
                        .as_ref()
692
                        .metadata()
693
                        .await
694
0
                        .err_tip(|| format!("While reading metadata for {}", path.display()))?
695
                        .len(),
696
                };
697
                return Ok((file_size, Some(file)));
698
            }
699
            return self
700
                .fast_store
701
                .update_with_whole_file(key, path, file, upload_size)
702
                .await;
703
        }
704
705
        if self
706
            .slow_store
707
            .optimized_for(StoreOptimizations::FileUpdates)
708
        {
709
            let ignore_fast = self
710
                .fast_store
711
                .inner_store(Some(key.borrow()))
712
                .optimized_for(StoreOptimizations::NoopUpdates)
713
                || self.fast_direction == StoreDirection::ReadOnly
714
                || self.fast_direction == StoreDirection::Get;
715
            let maybe_size = if ignore_fast {
716
                None
717
            } else {
718
                Some(
719
                    slow_update_store_with_file(
720
                        self.fast_store.as_store_driver_pin(),
721
                        key.borrow(),
722
                        &mut file,
723
                        upload_size,
724
                    )
725
                    .await
726
                    .err_tip(|| "In FastSlowStore::update_with_whole_file fast_store")?,
727
                )
728
            };
729
            let ignore_slow = self.slow_direction == StoreDirection::ReadOnly
730
                || self.slow_direction == StoreDirection::Get;
731
            if ignore_slow {
732
                let size = match maybe_size {
733
                    Some(size) => size,
734
                    None => match upload_size {
735
                        UploadSizeInfo::ExactSize(size) => size,
736
                        UploadSizeInfo::MaxSize(_) => file
737
                            .as_ref()
738
                            .metadata()
739
                            .await
740
0
                            .err_tip(|| format!("While reading metadata for {}", path.display()))?
741
                            .len(),
742
                    },
743
                };
744
                return Ok((size, Some(file)));
745
            }
746
            let slow_in_flight_guard: InFlightSlowWriteGuard =
747
                self.register_in_flight_slow_write(key.borrow());
748
            let (size, maybe_file_slot) = self
749
                .slow_store
750
                .update_with_whole_file(key.borrow(), path, file, upload_size)
751
                .await?;
752
            slow_in_flight_guard.complete(Some(size));
753
            return Ok((size, maybe_file_slot));
754
        }
755
756
        let size = slow_update_store_with_file(self, key, &mut file, upload_size)
757
            .await
758
            .err_tip(|| "In FastSlowStore::update_with_whole_file")?;
759
        Ok((size, Some(file)))
760
10
    }
761
762
    async fn get_part(
763
        self: Pin<&Self>,
764
        key: StoreKey<'_>,
765
        writer: &mut DropCloserWriteHalf,
766
        offset: u64,
767
        length: Option<u64>,
768
183
    ) -> Result<(), Error> {
769
        // `has()` can report a stale map entry whose file is gone, so
770
        // get_part may still return NotFound; fall through to the slow
771
        // store unless we have already streamed bytes to the caller.
772
        if self.fast_store.has(key.borrow()).await?.is_some() {
773
            let bytes_before = writer.get_bytes_written();
774
            match self
775
                .fast_store
776
                .get_part(key.borrow(), writer.borrow_mut(), offset, length)
777
                .await
778
            {
779
                Ok(()) => {
780
                    self.metrics
781
                        .fast_store_hit_count
782
                        .fetch_add(1, Ordering::Acquire);
783
                    self.metrics
784
                        .fast_store_downloaded_bytes
785
                        .fetch_add(writer.get_bytes_written(), Ordering::Acquire);
786
                    return Ok(());
787
                }
788
                Err(e)
789
                    if e.code == Code::NotFound && writer.get_bytes_written() == bytes_before =>
790
                {
791
                    self.metrics
792
                        .fast_store_stale_map_falls_through
793
                        .fetch_add(1, Ordering::Acquire);
794
                    warn!(%key, ?e, "Stale fast-store map entry; falling through to slow store");
795
                    // fall through to populate path
796
                }
797
                Err(e) => return Err(e),
798
            }
799
        }
800
801
        // If the fast store is noop or read only or update only then bypass it.
802
        if self
803
            .fast_store
804
            .inner_store(Some(key.borrow()))
805
            .optimized_for(StoreOptimizations::NoopUpdates)
806
            || self.fast_direction == StoreDirection::ReadOnly
807
            || self.fast_direction == StoreDirection::Update
808
        {
809
            self.metrics
810
                .slow_store_hit_count
811
                .fetch_add(1, Ordering::Acquire);
812
            self.slow_store
813
                .get_part(key, writer.borrow_mut(), offset, length)
814
                .await?;
815
            self.metrics
816
                .slow_store_downloaded_bytes
817
                .fetch_add(writer.get_bytes_written(), Ordering::Acquire);
818
            return Ok(());
819
        }
820
821
        // Huge blobs: dedup is a net loss (followers time out anyway and
822
        // the fast tier is evicted), so read straight from the slow store.
823
        if self.should_bypass_dedup(&key) {
824
            self.metrics
825
                .huge_blob_dedup_bypasses
826
                .fetch_add(1, Ordering::Acquire);
827
            self.metrics
828
                .slow_store_hit_count
829
                .fetch_add(1, Ordering::Acquire);
830
            debug!(%key, threshold_bytes = self.bypass_dedup_threshold_bytes, "Bypassing dedup for huge blob");
831
            self.slow_store
832
                .get_part(key, writer.borrow_mut(), offset, length)
833
                .await
834
                .err_tip(|| "In FastSlowStore::get_part huge-blob bypass")?;
835
            self.metrics
836
                .slow_store_downloaded_bytes
837
                .fetch_add(writer.get_bytes_written(), Ordering::Acquire);
838
            return Ok(());
839
        }
840
841
        let mut writer = Some(writer);
842
843
        // Drive the dedup loader. Two distinct paths:
844
        //
845
        //   * Leader (created the OnceCell entry): runs `populate` with
846
        //     OUR `writer`, streaming directly to the caller while
847
        //     filling the fast cache. No timeout: a multi-GB blob
848
        //     legitimately takes minutes to stream, and cancelling our
849
        //     own populate would propagate the failure to every other
850
        //     reader of this digest.
851
        //
852
        //   * Follower: bound the wait so a wedged leader does not pin
853
        //     us until the upstream `gRPC` deadline fires. The follower
854
        //     closure passes `None` for `writer`. This is critical: if
855
        //     the OnceCell ever promotes our follower closure to leader
856
        //     (because the original leader's future was dropped), and
857
        //     our `tokio::time::timeout` then cancels it, *no* caller
858
        //     `writer` was ever moved into the populate stream, so no
859
        //     `gRPC` sender is dropped without EOF. The follower then
860
        //     re-enters `get_part` below and reads from the now-warm
861
        //     fast cache, OR falls back to the slow store on timeout.
862
        let needs_slow_store_fallback: bool = {
863
            let loader_guard = self.get_loader(key.borrow());
864
            let is_leader = loader_guard.is_leader;
865
            if is_leader {
866
                loader_guard
867
36
                    .get_or_try_init(|| {
868
36
                        self.populate_and_maybe_stream(key.borrow(), writer.take(), offset, length)
869
36
                    })
870
                    .await?;
871
                false
872
            } else {
873
0
                let load_fut = loader_guard.get_or_try_init(|| {
874
0
                    self.populate_and_maybe_stream(key.borrow(), None, offset, length)
875
0
                });
876
                match tokio::time::timeout(LEADER_WAIT_TIMEOUT, load_fut).await {
877
                    Ok(result) => {
878
                        result?;
879
                        false
880
                    }
881
                    Err(_elapsed) => {
882
                        self.metrics
883
                            .leader_wait_timeouts
884
                            .fetch_add(1, Ordering::Acquire);
885
                        warn!(
886
                            %key,
887
                            timeout_secs = LEADER_WAIT_TIMEOUT.as_secs(),
888
                            "FastSlowStore::get_part: leader-wait exceeded timeout, bypassing dedup and reading slow store directly",
889
                        );
890
                        true
891
                    }
892
                }
893
            }
894
        };
895
896
        if needs_slow_store_fallback && let Some(writer) = writer.take() {
897
            return self
898
                .slow_store
899
                .get_part(key, writer, offset, length)
900
                .await
901
                .err_tip(
902
                    || "In FastSlowStore::get_part slow_store fallback after leader-wait timeout",
903
                );
904
        }
905
906
        // If we didn't stream then re-enter which will stream from the fast
907
        // store, or retry the download.  We should not get in a loop here
908
        // because OnceCell has the good sense to retry for all callers so in
909
        // order to get here the fast store will have been populated.  There's
910
        // an outside chance it was evicted, but that's slim.
911
        if let Some(writer) = writer.take() {
912
            self.get_part(key, writer, offset, length).await
913
        } else {
914
            // This was the thread that did the streaming already, lucky duck.
915
            Ok(())
916
        }
917
183
    }
918
919
2
    fn inner_store(&self, _key: Option<StoreKey>) -> &dyn StoreDriver {
920
2
        self
921
2
    }
922
923
2
    fn as_any<'a>(&'a self) -> &'a (dyn core::any::Any + Sync + Send + 'static) {
924
2
        self
925
2
    }
926
927
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> {
928
0
        self
929
0
    }
930
931
0
    fn register_remove_callback(
932
0
        self: Arc<Self>,
933
0
        callback: Arc<dyn RemoveItemCallback>,
934
0
    ) -> Result<(), Error> {
935
0
        self.fast_store.register_remove_callback(callback.clone())?;
936
0
        self.slow_store.register_remove_callback(callback)?;
937
0
        Ok(())
938
0
    }
939
}
940
941
#[derive(Debug, Default, MetricsComponent)]
942
struct FastSlowStoreMetrics {
943
    #[metric(help = "Hit count for the fast store")]
944
    fast_store_hit_count: AtomicU64,
945
    #[metric(help = "Downloaded bytes from the fast store")]
946
    fast_store_downloaded_bytes: AtomicU64,
947
    #[metric(help = "Hit count for the slow store")]
948
    slow_store_hit_count: AtomicU64,
949
    #[metric(help = "Downloaded bytes from the slow store")]
950
    slow_store_downloaded_bytes: AtomicU64,
951
    #[metric(
952
        help = "Number of times a follower bypassed the populating-digests dedup because the leader exceeded LEADER_WAIT_TIMEOUT"
953
    )]
954
    leader_wait_timeouts: AtomicU64,
955
    #[metric(help = "get_part calls that bypassed dedup for huge blobs")]
956
    huge_blob_dedup_bypasses: AtomicU64,
957
    #[metric(help = "Stale fast-store map entries that fell through to the slow store")]
958
    fast_store_stale_map_falls_through: AtomicU64,
959
}
960
961
/// Maximum time a follower will wait on the leader-populator before
962
/// bypassing the dedup map and reading directly from the slow store.
963
///
964
/// Without this bound a single wedged populator would block every
965
/// concurrent reader of the same digest until each one's own `gRPC`
966
/// deadline fired (e.g. Bazel's `--remote_timeout`), turning a
967
/// single slow read into a fan-out of `DEADLINE_EXCEEDED` errors.
968
const LEADER_WAIT_TIMEOUT: Duration = Duration::from_mins(1);
969
970
default_health_status_indicator!(FastSlowStore);