Coverage Report

Created: 2026-05-23 21:09

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/fast_slow_store.rs
Line
Count
Source
1
// Copyright 2024-2025 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::borrow::BorrowMut;
16
use core::cmp::{max, min};
17
use core::ops::Range;
18
use core::pin::Pin;
19
use core::sync::atomic::{AtomicU64, Ordering};
20
use core::time::Duration;
21
use std::collections::HashMap;
22
use std::ffi::OsString;
23
use std::sync::{Arc, Weak};
24
25
use async_trait::async_trait;
26
use futures::{FutureExt, join};
27
use nativelink_config::stores::{FastSlowSpec, StoreDirection};
28
use nativelink_error::{Code, Error, ErrorContext, ResultExt, make_err};
29
use nativelink_metric::MetricsComponent;
30
use nativelink_util::buf_channel::{
31
    DropCloserReadHalf, DropCloserWriteHalf, make_buf_channel_pair,
32
};
33
use nativelink_util::fs;
34
use nativelink_util::health_utils::{HealthStatusIndicator, default_health_status_indicator};
35
use nativelink_util::store_trait::{
36
    RemoveItemCallback, Store, StoreDriver, StoreKey, StoreLike, StoreOptimizations,
37
    UploadSizeInfo, slow_update_store_with_file,
38
};
39
use parking_lot::Mutex;
40
use tokio::sync::OnceCell;
41
use tracing::{debug, info, trace, warn};
42
43
// TODO(palfrey) This store needs to be evaluated for more efficient memory usage,
44
// there are many copies happening internally.
45
46
type Loader = Arc<OnceCell<()>>;
47
48
// TODO(palfrey) We should consider copying the data in the background to allow the
49
// client to hang up while the data is buffered. An alternative is to possibly make a
50
// "BufferedStore" that could be placed on the "slow" store that would hang up early
51
// if data is in the buffer.
52
#[derive(Debug, MetricsComponent)]
53
pub struct FastSlowStore {
54
    #[metric(group = "fast_store")]
55
    fast_store: Store,
56
    fast_direction: StoreDirection,
57
    #[metric(group = "slow_store")]
58
    slow_store: Store,
59
    slow_direction: StoreDirection,
60
    weak_self: Weak<Self>,
61
    #[metric]
62
    metrics: FastSlowStoreMetrics,
63
    // De-duplicate requests for the fast store, only the first streams, others
64
    // are blocked.  This may feel like it's causing a slow down of tasks, but
65
    // actually it's faster because we're not downloading the file multiple
66
    // times are doing loads of duplicate IO.
67
    populating_digests: Mutex<HashMap<StoreKey<'static>, Loader>>,
68
    // Tracks keys whose slow-store write is currently in flight, along with
69
    // the best-known size. Consulted by `has_with_results` so that a
70
    // concurrent writer that has not yet finished pushing to the slow store
71
    // is still visible to a concurrent existence check, preventing redundant
72
    // duplicate uploads of the same blob.
73
    in_flight_slow_writes: Mutex<HashMap<StoreKey<'static>, u64>>,
74
}
75
76
// This guard ensures that the populating_digests is cleared even if the future
77
// is dropped, it is cancel safe.
78
struct LoaderGuard<'a> {
79
    weak_store: Weak<FastSlowStore>,
80
    key: StoreKey<'a>,
81
    loader: Option<Loader>,
82
    is_leader: bool,
83
}
84
85
impl LoaderGuard<'_> {
86
35
    async fn get_or_try_init<E, F, Fut>(&self, f: F) -> Result<(), E>
87
35
    where
88
35
        F: FnOnce() -> Fut,
89
35
        Fut: Future<Output = Result<(), E>>,
90
35
    {
91
35
        if let Some(loader) = &self.loader {
92
35
            loader.get_or_try_init(f).await.
map34
(|&()| ())
93
        } else {
94
            // This is impossible, but we do it anyway.
95
0
            f().await
96
        }
97
34
    }
98
}
99
100
/// Cancel-safe RAII guard that removes an entry from
101
/// `FastSlowStore::in_flight_slow_writes` when dropped. This ensures the
102
/// map does not leak entries if the surrounding `update()` future is
103
/// cancelled before the slow-store write completes.
104
struct InFlightSlowWriteGuard {
105
    weak_store: Weak<FastSlowStore>,
106
    key: Option<StoreKey<'static>>,
107
}
108
109
impl Drop for InFlightSlowWriteGuard {
110
131
    fn drop(&mut self) {
111
131
        let Some(store) = self.weak_store.upgrade() else {
112
0
            return;
113
        };
114
131
        let Some(key) = self.key.take() else {
115
0
            return;
116
        };
117
131
        store.in_flight_slow_writes.lock().remove(&key);
118
131
    }
119
}
120
121
impl Drop for LoaderGuard<'_> {
122
35
    fn drop(&mut self) {
123
35
        let Some(store) = self.weak_store.upgrade() else {
124
            // The store has already gone away, nothing to remove from.
125
0
            return;
126
        };
127
35
        let Some(loader) = self.loader.take() else {
128
            // This should never happen, but we do it to be safe.
129
0
            return;
130
        };
131
132
35
        let mut guard = store.populating_digests.lock();
133
35
        if let std::collections::hash_map::Entry::Occupied(occupied_entry) =
134
35
            guard.entry(self.key.borrow().into_owned())
135
35
            && Arc::ptr_eq(occupied_entry.get(), &loader)
136
        {
137
35
            drop(loader);
138
35
            if Arc::strong_count(occupied_entry.get()) == 1 {
139
19
                // This is the last loader, so remove it.
140
19
                occupied_entry.remove();
141
19
            
}16
142
0
        }
143
35
    }
144
}
145
146
impl FastSlowStore {
147
54
    pub fn new(spec: &FastSlowSpec, fast_store: Store, slow_store: Store) -> Arc<Self> {
148
54
        Arc::new_cyclic(|weak_self| Self {
149
54
            fast_store,
150
54
            fast_direction: spec.fast_direction,
151
54
            slow_store,
152
54
            slow_direction: spec.slow_direction,
153
54
            weak_self: weak_self.clone(),
154
54
            metrics: FastSlowStoreMetrics::default(),
155
54
            populating_digests: Mutex::new(HashMap::new()),
156
54
            in_flight_slow_writes: Mutex::new(HashMap::new()),
157
54
        })
158
54
    }
159
160
    /// Best-effort size for tracking in-flight slow writes. Falls back to 0
161
    /// when neither the upload size nor the key carries an exact size
162
    /// (e.g. `MaxSize` for a string key). Callers of `has_with_results` only
163
    /// rely on `Some(_)` vs `None`, so a size of 0 still correctly signals
164
    /// "this blob exists".
165
131
    const fn track_size(key: &StoreKey<'_>, size_info: UploadSizeInfo) -> u64 {
166
131
        match size_info {
167
131
            UploadSizeInfo::ExactSize(s) => s,
168
0
            UploadSizeInfo::MaxSize(_) => match key {
169
0
                StoreKey::Digest(d) => d.size_bytes(),
170
0
                StoreKey::Str(_) => 0,
171
            },
172
        }
173
131
    }
174
175
131
    fn register_in_flight_slow_write(
176
131
        &self,
177
131
        key: StoreKey<'_>,
178
131
        size: u64,
179
131
    ) -> InFlightSlowWriteGuard {
180
131
        let owned = key.into_owned();
181
131
        self.in_flight_slow_writes
182
131
            .lock()
183
131
            .insert(owned.borrow().into_owned(), size);
184
131
        InFlightSlowWriteGuard {
185
131
            weak_store: self.weak_self.clone(),
186
131
            key: Some(owned),
187
131
        }
188
131
    }
189
190
0
    pub const fn fast_store(&self) -> &Store {
191
0
        &self.fast_store
192
0
    }
193
194
0
    pub const fn slow_store(&self) -> &Store {
195
0
        &self.slow_store
196
0
    }
197
198
2
    pub fn get_arc(&self) -> Option<Arc<Self>> {
199
2
        self.weak_self.upgrade()
200
2
    }
201
202
35
    fn get_loader<'a>(&self, key: StoreKey<'a>) -> LoaderGuard<'a> {
203
        // Get a single loader instance that's used to populate the fast store
204
        // for this digest.  If another request comes in then it's de-duplicated.
205
35
        let mut is_leader = false;
206
35
        let loader = match self
207
35
            .populating_digests
208
35
            .lock()
209
35
            .entry(key.borrow().into_owned())
210
        {
211
16
            std::collections::hash_map::Entry::Occupied(occupied_entry) => {
212
16
                occupied_entry.get().clone()
213
            }
214
19
            std::collections::hash_map::Entry::Vacant(vacant_entry) => {
215
19
                is_leader = true;
216
19
                vacant_entry.insert(Arc::new(OnceCell::new())).clone()
217
            }
218
        };
219
35
        LoaderGuard {
220
35
            weak_store: self.weak_self.clone(),
221
35
            key,
222
35
            loader: Some(loader),
223
35
            is_leader,
224
35
        }
225
35
    }
226
227
19
    async fn populate_and_maybe_stream(
228
19
        self: Pin<&Self>,
229
19
        key: StoreKey<'_>,
230
19
        maybe_writer: Option<&mut DropCloserWriteHalf>,
231
19
        offset: u64,
232
19
        length: Option<u64>,
233
19
    ) -> Result<(), Error> {
234
19
        let reader_stream_size = if self
235
19
            .slow_store
236
19
            .inner_store(Some(key.borrow()))
237
19
            .optimized_for(StoreOptimizations::LazyExistenceOnSync)
238
        {
239
2
            trace!(
240
                %key,
241
2
                store_name = %self.slow_store.inner_store(Some(key.borrow())).get_name(),
242
                "Skipping .has() check due to LazyExistenceOnSync optimization"
243
            );
244
2
            UploadSizeInfo::MaxSize(u64::MAX)
245
        } else {
246
17
            UploadSizeInfo::ExactSize(self
247
17
                    .slow_store
248
17
                    .has(key.borrow())
249
17
                    .await
250
17
                    .err_tip(|| "Failed to run has() on slow store")
?0
251
17
                    .ok_or_else(|| 
{0
252
0
                        let err = make_err!(
253
0
                            Code::NotFound,
254
                            "Object {} not found in either fast or slow store. \
255
                                If using multiple workers, ensure all workers share the same CAS storage path.",
256
0
                            key.as_str()
257
                        );
258
0
                        if let StoreKey::Digest(d) = key.borrow() {
259
0
                            err.with_context(ErrorContext::MissingDigest {
260
0
                                hash: d.packed_hash().to_string(),
261
0
                                size: d.size_bytes() as i64,
262
0
                            })
263
                        } else {
264
0
                            err
265
                        }
266
0
                    })?
267
            )
268
        };
269
270
19
        let send_range = offset..length.map_or(u64::MAX, |length| 
length9
+
offset9
);
271
19
        let mut bytes_received: u64 = 0;
272
19
        let mut counted_hit = false;
273
274
19
        let (mut fast_tx, fast_rx) = make_buf_channel_pair();
275
19
        let (slow_tx, mut slow_rx) = make_buf_channel_pair();
276
19
        let data_stream_fut = async move {
277
19
            let mut maybe_writer_pin = maybe_writer.map(Pin::new);
278
            loop {
279
36
                let 
output_buf35
= slow_rx
280
36
                    .recv()
281
36
                    .await
282
36
                    .err_tip(|| "Failed to read data data buffer from slow store")
?1
;
283
35
                if output_buf.is_empty() {
284
                    // Write out our EOF.
285
                    // We are dropped as soon as we send_eof to writer_pin, so
286
                    // we wait until we've finished all of our joins to do that.
287
18
                    let fast_res = fast_tx.send_eof();
288
18
                    return Ok::<_, Error>((fast_res, maybe_writer_pin));
289
17
                }
290
291
17
                if !counted_hit {
292
17
                    self.metrics
293
17
                        .slow_store_hit_count
294
17
                        .fetch_add(1, Ordering::Acquire);
295
17
                    counted_hit = true;
296
17
                
}0
297
298
17
                let output_buf_len = u64::try_from(output_buf.len())
299
17
                    .err_tip(|| "Could not output_buf.len() to u64")
?0
;
300
17
                self.metrics
301
17
                    .slow_store_downloaded_bytes
302
17
                    .fetch_add(output_buf_len, Ordering::Acquire);
303
304
17
                let writer_fut = Self::calculate_range(
305
17
                    &(bytes_received..bytes_received + output_buf_len),
306
17
                    &send_range,
307
0
                )?
308
17
                .zip(maybe_writer_pin.as_mut())
309
17
                .map_or_else(
310
5
                    || futures::future::ready(Ok(())).left_future(),
311
12
                    |(range, writer_pin)| writer_pin.send(output_buf.slice(range)).right_future(),
312
                );
313
314
17
                bytes_received += output_buf_len;
315
316
17
                let (fast_tx_res, writer_res) = join!(fast_tx.send(output_buf), writer_fut);
317
17
                fast_tx_res.err_tip(|| "Failed to write to fast store in fast_slow store")
?0
;
318
17
                writer_res.err_tip(|| "Failed to write result to writer in fast_slow store")
?0
;
319
            }
320
19
        };
321
322
19
        let slow_store_fut = self.slow_store.get(key.borrow(), slow_tx);
323
19
        let fast_store_fut = self
324
19
            .fast_store
325
19
            .update(key.borrow(), fast_rx, reader_stream_size);
326
327
19
        let (data_stream_res, slow_res, fast_res) =
328
19
            join!(data_stream_fut, slow_store_fut, fast_store_fut);
329
19
        match data_stream_res {
330
18
            Ok((fast_eof_res, maybe_writer_pin)) =>
331
            // Sending the EOF will drop us almost immediately in bytestream_server
332
            // so we perform it as the very last action in this method.
333
            {
334
18
                fast_eof_res.merge(fast_res).merge(slow_res).merge(
335
18
                    if let Some(
mut writer_pin13
) = maybe_writer_pin {
336
13
                        writer_pin.send_eof()
337
                    } else {
338
5
                        Ok(())
339
                    },
340
                )
341
            }
342
1
            Err(err) => match slow_res {
343
1
                Err(slow_err) if slow_err.code == Code::NotFound => Err(slow_err),
344
0
                _ => fast_res.merge(slow_res).merge(Err(err)),
345
            },
346
        }
347
19
    }
348
349
    /// Ensure our fast store is populated. This should be kept as a low
350
    /// cost function. Since the data itself is shared and not copied it should be fairly
351
    /// low cost to just discard the data, but does cost a few mutex locks while
352
    /// streaming.
353
12
    pub async fn populate_fast_store(&self, key: StoreKey<'_>) -> Result<(), Error> {
354
12
        let maybe_size_info = self
355
12
            .fast_store
356
12
            .has(key.borrow())
357
12
            .await
358
12
            .err_tip(|| "While querying in populate_fast_store")
?0
;
359
12
        if maybe_size_info.is_some() {
360
7
            return Ok(());
361
5
        }
362
363
        // If the fast store is noop or read only or update only then this is an error.
364
5
        if self
365
5
            .fast_store
366
5
            .inner_store(Some(key.borrow()))
367
5
            .optimized_for(StoreOptimizations::NoopUpdates)
368
5
            || self.fast_direction == StoreDirection::ReadOnly
369
5
            || self.fast_direction == StoreDirection::Update
370
        {
371
0
            return Err(make_err!(
372
0
                Code::Internal,
373
0
                "Attempt to populate fast store that is read only or noop"
374
0
            ));
375
5
        }
376
377
5
        self.get_loader(key.borrow())
378
5
            .get_or_try_init(|| {
379
5
                Pin::new(self).populate_and_maybe_stream(key.borrow(), None, 0, None)
380
5
            })
381
5
            .await
382
5
            .err_tip(|| "Failed to populate()")
383
12
    }
384
385
    /// Returns the range of bytes that should be sent given a slice bounds
386
    /// offset so the output range maps the `received_range.start` to 0.
387
    // TODO(palfrey) This should be put into utils, as this logic is used
388
    // elsewhere in the code.
389
27
    pub fn calculate_range(
390
27
        received_range: &Range<u64>,
391
27
        send_range: &Range<u64>,
392
27
    ) -> Result<Option<Range<usize>>, Error> {
393
        // Protect against subtraction overflow.
394
27
        if received_range.start >= received_range.end {
395
0
            return Ok(None);
396
27
        }
397
398
27
        let start = max(received_range.start, send_range.start);
399
27
        let end = min(received_range.end, send_range.end);
400
27
        if received_range.contains(&start) && 
received_range25
.
contains25
(
&(end - 1)25
) {
401
            // Offset both to the start of the received_range.
402
23
            let calculated_range_start = usize::try_from(start - received_range.start)
403
23
                .err_tip(|| "Could not convert (start - received_range.start) to usize")
?0
;
404
23
            let calculated_range_end = usize::try_from(end - received_range.start)
405
23
                .err_tip(|| "Could not convert (end - received_range.start) to usize")
?0
;
406
23
            Ok(Some(calculated_range_start..calculated_range_end))
407
        } else {
408
4
            Ok(None)
409
        }
410
27
    }
411
}
412
413
#[async_trait]
414
impl StoreDriver for FastSlowStore {
415
    async fn has_with_results(
416
        self: Pin<&Self>,
417
        key: &[StoreKey<'_>],
418
        results: &mut [Option<u64>],
419
24
    ) -> Result<(), Error> {
420
        // If our slow store is a noop store, it'll always return a 404,
421
        // so only check the fast store in such case.
422
        let slow_store = self.slow_store.inner_store::<StoreKey<'_>>(None);
423
        if slow_store.optimized_for(StoreOptimizations::NoopDownloads) {
424
            return self.fast_store.has_with_results(key, results).await;
425
        }
426
        // Primary lookup is the slow store because that's authoritative for
427
        // downstream consumers that fetch from there. But the slow store
428
        // alone can miss two important cases:
429
        //   1. A concurrent writer's slow-store write is still in flight.
430
        //   2. The blob is present in the fast (local) store — either
431
        //      fast-only by configuration, or because the slow write has
432
        //      not yet started/completed.
433
        // Reporting NotFound in those cases causes redundant duplicate
434
        // uploads or unnecessary slow-store fetches.
435
        self.slow_store.has_with_results(key, results).await?;
436
437
        // Fill in any blobs whose slow-store write is currently in flight.
438
        // Cheap when the map is empty (the common case).
439
        {
440
            let in_flight = self.in_flight_slow_writes.lock();
441
            if !in_flight.is_empty() {
442
                for (k, result) in key.iter().zip(results.iter_mut()) {
443
                    if result.is_none() {
444
                        let owned = k.borrow().into_owned();
445
                        if let Some(size) = in_flight.get(&owned) {
446
                            *result = Some(*size);
447
                        }
448
                    }
449
                }
450
            }
451
        }
452
453
        // Fall back to the fast store for anything still missing. This
454
        // covers fast-only writes and the brief window between fast-store
455
        // insertion and slow-store write start.
456
        let missing_indices: Vec<usize> = results
457
            .iter()
458
            .enumerate()
459
25
            .filter_map(|(i, r)| if r.is_none() { 
Some(i)16
} else {
None9
})
460
            .collect();
461
        if !missing_indices.is_empty() {
462
            let missing_keys: Vec<StoreKey<'_>> =
463
16
                missing_indices.iter().map(|&i| key[i].borrow()).collect();
464
            let mut fast_results = vec![None; missing_keys.len()];
465
            self.fast_store
466
                .has_with_results(&missing_keys, &mut fast_results)
467
                .await?;
468
            for (j, &orig_idx) in missing_indices.iter().enumerate() {
469
                if fast_results[j].is_some() {
470
                    results[orig_idx] = fast_results[j];
471
                }
472
            }
473
        }
474
        Ok(())
475
24
    }
476
477
    async fn update(
478
        self: Pin<&Self>,
479
        key: StoreKey<'_>,
480
        mut reader: DropCloserReadHalf,
481
        size_info: UploadSizeInfo,
482
123
    ) -> Result<(), Error> {
483
        // If either one of our stores is a noop store, bypass the multiplexing
484
        // and just use the store that is not a noop store.
485
        let ignore_slow = self
486
            .slow_store
487
            .inner_store(Some(key.borrow()))
488
            .optimized_for(StoreOptimizations::NoopUpdates)
489
            || self.slow_direction == StoreDirection::ReadOnly
490
            || self.slow_direction == StoreDirection::Get;
491
        let ignore_fast = self
492
            .fast_store
493
            .inner_store(Some(key.borrow()))
494
            .optimized_for(StoreOptimizations::NoopUpdates)
495
            || self.fast_direction == StoreDirection::ReadOnly
496
            || self.fast_direction == StoreDirection::Get;
497
        if ignore_slow && ignore_fast {
498
            // We need to drain the reader to avoid the writer complaining that we dropped
499
            // the connection prematurely.
500
            reader
501
                .drain()
502
                .await
503
                .err_tip(|| "In FastFlowStore::update")?;
504
            return Ok(());
505
        }
506
        if ignore_slow {
507
            return self.fast_store.update(key, reader, size_info).await;
508
        }
509
        if ignore_fast {
510
            let _guard =
511
                self.register_in_flight_slow_write(key.borrow(), Self::track_size(&key, size_info));
512
            return self.slow_store.update(key, reader, size_info).await;
513
        }
514
515
        let _slow_in_flight_guard =
516
            self.register_in_flight_slow_write(key.borrow(), Self::track_size(&key, size_info));
517
518
        let (mut fast_tx, fast_rx) = make_buf_channel_pair();
519
        let (mut slow_tx, slow_rx) = make_buf_channel_pair();
520
521
        let key_debug = format!("{key:?}");
522
        trace!(
523
            key = %key_debug,
524
            "FastSlowStore::update: starting dual-store upload",
525
        );
526
        let update_start = std::time::Instant::now();
527
        let mut bytes_sent: u64 = 0;
528
529
117
        let data_stream_fut = async move {
530
            loop {
531
189
                let buffer = reader
532
189
                    .recv()
533
189
                    .await
534
189
                    .err_tip(|| "Failed to read buffer in fastslow store")
?0
;
535
189
                if buffer.is_empty() {
536
                    // EOF received.
537
117
                    fast_tx.send_eof().err_tip(
538
                        || "Failed to write eof to fast store in fast_slow store update",
539
0
                    )?;
540
117
                    slow_tx
541
117
                        .send_eof()
542
117
                        .err_tip(|| "Failed to write eof to writer in fast_slow store update")
?0
;
543
117
                    debug!(
544
                        total_bytes = bytes_sent,
545
                        "FastSlowStore::update: data_stream sent EOF to both stores",
546
                    );
547
117
                    return Result::<(), Error>::Ok(());
548
72
                }
549
550
72
                let chunk_len = buffer.len();
551
72
                let send_start = std::time::Instant::now();
552
72
                let (fast_result, slow_result) =
553
72
                    join!(fast_tx.send(buffer.clone()), slow_tx.send(buffer));
554
72
                let send_elapsed = send_start.elapsed();
555
72
                if send_elapsed.as_secs() >= 5 {
556
0
                    warn!(
557
                        chunk_len,
558
0
                        send_elapsed_ms = send_elapsed.as_millis(),
559
                        total_bytes = bytes_sent,
560
                        "FastSlowStore::update: channel send stalled (>5s). A downstream store may be hanging",
561
                    );
562
72
                }
563
72
                bytes_sent += u64::try_from(chunk_len).unwrap_or(u64::MAX);
564
72
                fast_result
565
72
                    .map_err(|e| 
{0
566
0
                        make_err!(
567
0
                            Code::Internal,
568
                            "Failed to send message to fast_store in fast_slow_store {:?}",
569
                            e
570
                        )
571
0
                    })
572
72
                    .merge(slow_result.map_err(|e| 
{0
573
0
                        make_err!(
574
0
                            Code::Internal,
575
                            "Failed to send message to slow_store in fast_slow store {:?}",
576
                            e
577
                        )
578
0
                    }))?;
579
            }
580
117
        };
581
582
        let fast_store_fut = self.fast_store.update(key.borrow(), fast_rx, size_info);
583
        let slow_store_fut = self.slow_store.update(key.borrow(), slow_rx, size_info);
584
585
        let (data_stream_res, fast_res, slow_res) =
586
            join!(data_stream_fut, fast_store_fut, slow_store_fut);
587
588
        let total_elapsed = update_start.elapsed();
589
        if data_stream_res.is_err() || fast_res.is_err() || slow_res.is_err() {
590
            let all_not_found = [&data_stream_res, &fast_res, &slow_res]
591
                .iter()
592
0
                .all(|r| match r {
593
0
                    Ok(()) => true,
594
0
                    Err(e) => e.code == Code::NotFound,
595
0
                });
596
            if all_not_found {
597
                info!(
598
                    key = %key_debug,
599
                    elapsed_ms = total_elapsed.as_millis(),
600
                    data_stream_ok = data_stream_res.is_ok(),
601
                    fast_store_ok = fast_res.is_ok(),
602
                    slow_store_ok = slow_res.is_ok(),
603
                    "FastSlowStore::update: completed with NotFound error(s)",
604
                );
605
            } else {
606
                warn!(
607
                    key = %key_debug,
608
                    elapsed_ms = total_elapsed.as_millis(),
609
                    data_stream_ok = data_stream_res.is_ok(),
610
                    fast_store_ok = fast_res.is_ok(),
611
                    slow_store_ok = slow_res.is_ok(),
612
                    "FastSlowStore::update: completed with error(s)",
613
                );
614
            }
615
        } else {
616
            trace!(
617
                key = %key_debug,
618
                elapsed_ms = total_elapsed.as_millis(),
619
                "FastSlowStore::update: completed successfully",
620
            );
621
        }
622
        data_stream_res.merge(fast_res).merge(slow_res)?;
623
        Ok(())
624
123
    }
625
626
    /// `FastSlowStore` has optimizations for dealing with files.
627
0
    fn optimized_for(&self, optimization: StoreOptimizations) -> bool {
628
0
        optimization == StoreOptimizations::FileUpdates
629
0
    }
630
631
    /// Optimized variation to consume the file if one of the stores is a
632
    /// filesystem store. This makes the operation a move instead of a copy
633
    /// dramatically increasing performance for large files.
634
    async fn update_with_whole_file(
635
        self: Pin<&Self>,
636
        key: StoreKey<'_>,
637
        path: OsString,
638
        mut file: fs::FileSlot,
639
        upload_size: UploadSizeInfo,
640
10
    ) -> Result<Option<fs::FileSlot>, Error> {
641
        trace!(
642
            key = ?key,
643
            ?upload_size,
644
            "FastSlowStore::update_with_whole_file: starting",
645
        );
646
        if self
647
            .fast_store
648
            .optimized_for(StoreOptimizations::FileUpdates)
649
        {
650
            if !self
651
                .slow_store
652
                .inner_store(Some(key.borrow()))
653
                .optimized_for(StoreOptimizations::NoopUpdates)
654
                && self.slow_direction != StoreDirection::ReadOnly
655
                && self.slow_direction != StoreDirection::Get
656
            {
657
                trace!("FastSlowStore::update_with_whole_file: uploading to slow_store");
658
                let slow_start = std::time::Instant::now();
659
                let _guard = self.register_in_flight_slow_write(
660
                    key.borrow(),
661
                    Self::track_size(&key, upload_size),
662
                );
663
                slow_update_store_with_file(
664
                    self.slow_store.as_store_driver_pin(),
665
                    key.borrow(),
666
                    &mut file,
667
                    upload_size,
668
                )
669
                .await
670
                .err_tip(|| "In FastSlowStore::update_with_whole_file slow_store")?;
671
                trace!(
672
                    elapsed_ms = slow_start.elapsed().as_millis(),
673
                    "FastSlowStore::update_with_whole_file: slow_store upload completed",
674
                );
675
            }
676
            if self.fast_direction == StoreDirection::ReadOnly
677
                || self.fast_direction == StoreDirection::Get
678
            {
679
                return Ok(Some(file));
680
            }
681
            return self
682
                .fast_store
683
                .update_with_whole_file(key, path, file, upload_size)
684
                .await;
685
        }
686
687
        if self
688
            .slow_store
689
            .optimized_for(StoreOptimizations::FileUpdates)
690
        {
691
            let ignore_fast = self
692
                .fast_store
693
                .inner_store(Some(key.borrow()))
694
                .optimized_for(StoreOptimizations::NoopUpdates)
695
                || self.fast_direction == StoreDirection::ReadOnly
696
                || self.fast_direction == StoreDirection::Get;
697
            if !ignore_fast {
698
                slow_update_store_with_file(
699
                    self.fast_store.as_store_driver_pin(),
700
                    key.borrow(),
701
                    &mut file,
702
                    upload_size,
703
                )
704
                .await
705
                .err_tip(|| "In FastSlowStore::update_with_whole_file fast_store")?;
706
            }
707
            let ignore_slow = self.slow_direction == StoreDirection::ReadOnly
708
                || self.slow_direction == StoreDirection::Get;
709
            if ignore_slow {
710
                return Ok(Some(file));
711
            }
712
            let _guard = self
713
                .register_in_flight_slow_write(key.borrow(), Self::track_size(&key, upload_size));
714
            return self
715
                .slow_store
716
                .update_with_whole_file(key, path, file, upload_size)
717
                .await;
718
        }
719
720
        slow_update_store_with_file(self, key, &mut file, upload_size)
721
            .await
722
            .err_tip(|| "In FastSlowStore::update_with_whole_file")?;
723
        Ok(Some(file))
724
10
    }
725
726
    async fn get_part(
727
        self: Pin<&Self>,
728
        key: StoreKey<'_>,
729
        writer: &mut DropCloserWriteHalf,
730
        offset: u64,
731
        length: Option<u64>,
732
124
    ) -> Result<(), Error> {
733
        // TODO(palfrey) Investigate if we should maybe ignore errors here instead of
734
        // forwarding them up.
735
        if self.fast_store.has(key.borrow()).await?.is_some() {
736
            self.metrics
737
                .fast_store_hit_count
738
                .fetch_add(1, Ordering::Acquire);
739
            self.fast_store
740
                .get_part(key, writer.borrow_mut(), offset, length)
741
                .await?;
742
            self.metrics
743
                .fast_store_downloaded_bytes
744
                .fetch_add(writer.get_bytes_written(), Ordering::Acquire);
745
            return Ok(());
746
        }
747
748
        // If the fast store is noop or read only or update only then bypass it.
749
        if self
750
            .fast_store
751
            .inner_store(Some(key.borrow()))
752
            .optimized_for(StoreOptimizations::NoopUpdates)
753
            || self.fast_direction == StoreDirection::ReadOnly
754
            || self.fast_direction == StoreDirection::Update
755
        {
756
            self.metrics
757
                .slow_store_hit_count
758
                .fetch_add(1, Ordering::Acquire);
759
            self.slow_store
760
                .get_part(key, writer.borrow_mut(), offset, length)
761
                .await?;
762
            self.metrics
763
                .slow_store_downloaded_bytes
764
                .fetch_add(writer.get_bytes_written(), Ordering::Acquire);
765
            return Ok(());
766
        }
767
768
        let mut writer = Some(writer);
769
770
        // Drive the dedup loader. Two distinct paths:
771
        //
772
        //   * Leader (created the OnceCell entry): runs `populate` with
773
        //     OUR `writer`, streaming directly to the caller while
774
        //     filling the fast cache. No timeout: a multi-GB blob
775
        //     legitimately takes minutes to stream, and cancelling our
776
        //     own populate would propagate the failure to every other
777
        //     reader of this digest.
778
        //
779
        //   * Follower: bound the wait so a wedged leader does not pin
780
        //     us until the upstream `gRPC` deadline fires. The follower
781
        //     closure passes `None` for `writer`. This is critical: if
782
        //     the OnceCell ever promotes our follower closure to leader
783
        //     (because the original leader's future was dropped), and
784
        //     our `tokio::time::timeout` then cancels it, *no* caller
785
        //     `writer` was ever moved into the populate stream, so no
786
        //     `gRPC` sender is dropped without EOF. The follower then
787
        //     re-enters `get_part` below and reads from the now-warm
788
        //     fast cache, OR falls back to the slow store on timeout.
789
        let needs_slow_store_fallback: bool = {
790
            let loader_guard = self.get_loader(key.borrow());
791
            let is_leader = loader_guard.is_leader;
792
            if is_leader {
793
                loader_guard
794
14
                    .get_or_try_init(|| {
795
14
                        self.populate_and_maybe_stream(key.borrow(), writer.take(), offset, length)
796
14
                    })
797
                    .await?;
798
                false
799
            } else {
800
0
                let load_fut = loader_guard.get_or_try_init(|| {
801
0
                    self.populate_and_maybe_stream(key.borrow(), None, offset, length)
802
0
                });
803
                match tokio::time::timeout(LEADER_WAIT_TIMEOUT, load_fut).await {
804
                    Ok(result) => {
805
                        result?;
806
                        false
807
                    }
808
                    Err(_elapsed) => {
809
                        self.metrics
810
                            .leader_wait_timeouts
811
                            .fetch_add(1, Ordering::Acquire);
812
                        warn!(
813
                            %key,
814
                            timeout_secs = LEADER_WAIT_TIMEOUT.as_secs(),
815
                            "FastSlowStore::get_part: leader-wait exceeded timeout, bypassing dedup and reading slow store directly",
816
                        );
817
                        true
818
                    }
819
                }
820
            }
821
        };
822
823
        if needs_slow_store_fallback && let Some(writer) = writer.take() {
824
            return self
825
                .slow_store
826
                .get_part(key, writer, offset, length)
827
                .await
828
                .err_tip(
829
                    || "In FastSlowStore::get_part slow_store fallback after leader-wait timeout",
830
                );
831
        }
832
833
        // If we didn't stream then re-enter which will stream from the fast
834
        // store, or retry the download.  We should not get in a loop here
835
        // because OnceCell has the good sense to retry for all callers so in
836
        // order to get here the fast store will have been populated.  There's
837
        // an outside chance it was evicted, but that's slim.
838
        if let Some(writer) = writer.take() {
839
            self.get_part(key, writer, offset, length).await
840
        } else {
841
            // This was the thread that did the streaming already, lucky duck.
842
            Ok(())
843
        }
844
124
    }
845
846
2
    fn inner_store(&self, _key: Option<StoreKey>) -> &dyn StoreDriver {
847
2
        self
848
2
    }
849
850
2
    fn as_any<'a>(&'a self) -> &'a (dyn core::any::Any + Sync + Send + 'static) {
851
2
        self
852
2
    }
853
854
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> {
855
0
        self
856
0
    }
857
858
0
    fn register_remove_callback(
859
0
        self: Arc<Self>,
860
0
        callback: Arc<dyn RemoveItemCallback>,
861
0
    ) -> Result<(), Error> {
862
0
        self.fast_store.register_remove_callback(callback.clone())?;
863
0
        self.slow_store.register_remove_callback(callback)?;
864
0
        Ok(())
865
0
    }
866
}
867
868
#[derive(Debug, Default, MetricsComponent)]
869
struct FastSlowStoreMetrics {
870
    #[metric(help = "Hit count for the fast store")]
871
    fast_store_hit_count: AtomicU64,
872
    #[metric(help = "Downloaded bytes from the fast store")]
873
    fast_store_downloaded_bytes: AtomicU64,
874
    #[metric(help = "Hit count for the slow store")]
875
    slow_store_hit_count: AtomicU64,
876
    #[metric(help = "Downloaded bytes from the slow store")]
877
    slow_store_downloaded_bytes: AtomicU64,
878
    #[metric(
879
        help = "Number of times a follower bypassed the populating-digests dedup because the leader exceeded LEADER_WAIT_TIMEOUT"
880
    )]
881
    leader_wait_timeouts: AtomicU64,
882
}
883
884
/// Maximum time a follower will wait on the leader-populator before
885
/// bypassing the dedup map and reading directly from the slow store.
886
///
887
/// Without this bound a single wedged populator would block every
888
/// concurrent reader of the same digest until each one's own `gRPC`
889
/// deadline fired (e.g. Bazel's `--remote_timeout`), turning a
890
/// single slow read into a fan-out of `DEADLINE_EXCEEDED` errors.
891
const LEADER_WAIT_TIMEOUT: Duration = Duration::from_secs(60);
892
893
default_health_status_indicator!(FastSlowStore);