Coverage Report

Created: 2025-10-24 14:08

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/store_trait.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::borrow::{Borrow, BorrowMut};
16
use core::convert::Into;
17
use core::fmt::{self, Debug, Display};
18
use core::hash::{Hash, Hasher};
19
use core::ops::{Bound, RangeBounds};
20
use core::pin::Pin;
21
use core::ptr::addr_eq;
22
use std::borrow::Cow;
23
use std::collections::hash_map::DefaultHasher as StdHasher;
24
use std::ffi::OsString;
25
use std::sync::{Arc, OnceLock};
26
27
use async_trait::async_trait;
28
use bytes::{Bytes, BytesMut};
29
use futures::{Future, FutureExt, Stream, join, try_join};
30
use nativelink_error::{Code, Error, ResultExt, error_if, make_err};
31
use nativelink_metric::MetricsComponent;
32
use rand::rngs::StdRng;
33
use rand::{RngCore, SeedableRng};
34
use serde::{Deserialize, Serialize};
35
use tokio::io::{AsyncReadExt, AsyncSeekExt};
36
use tracing::warn;
37
38
use crate::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf, make_buf_channel_pair};
39
use crate::common::DigestInfo;
40
use crate::digest_hasher::{DigestHasher, DigestHasherFunc, default_digest_hasher_func};
41
use crate::fs;
42
use crate::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
43
44
static DEFAULT_DIGEST_SIZE_HEALTH_CHECK: OnceLock<usize> = OnceLock::new();
45
/// Default digest size for health check data. Any change in this value
46
/// changes the default contract. `GlobalConfig` should be updated to reflect
47
/// changes in this value.
48
pub const DEFAULT_DIGEST_SIZE_HEALTH_CHECK_CFG: usize = 1024 * 1024;
49
50
// Get the default digest size for health check data, if value is unset a system wide default is used.
51
1
pub fn default_digest_size_health_check() -> usize {
52
1
    *DEFAULT_DIGEST_SIZE_HEALTH_CHECK.get_or_init(|| DEFAULT_DIGEST_SIZE_HEALTH_CHECK_CFG)
53
1
}
54
55
/// Set the default digest size for health check data, this should be called once.
56
0
pub fn set_default_digest_size_health_check(size: usize) -> Result<(), Error> {
57
0
    DEFAULT_DIGEST_SIZE_HEALTH_CHECK.set(size).map_err(|_| {
58
0
        make_err!(
59
0
            Code::Internal,
60
            "set_default_digest_size_health_check already set"
61
        )
62
0
    })
63
0
}
64
65
#[derive(Debug, PartialEq, Eq, Copy, Clone, Serialize, Deserialize)]
66
pub enum UploadSizeInfo {
67
    /// When the data transfer amount is known to be exact size, this enum should be used.
68
    /// The receiver store can use this to better optimize the way the data is sent or stored.
69
    ExactSize(u64),
70
71
    /// When the data transfer amount is not known to be exact, the caller should use this enum
72
    /// to provide the maximum size that could possibly be sent. This will bypass the exact size
73
    /// checks, but still provide useful information to the underlying store about the data being
74
    /// sent that it can then use to optimize the upload process.
75
    MaxSize(u64),
76
}
77
78
/// Utility to send all the data to the store from a file.
79
// Note: This is not inlined because some code may want to bypass any underlying
80
// optimizations that may be present in the inner store.
81
6
pub async fn slow_update_store_with_file<S: StoreDriver + ?Sized>(
82
6
    store: Pin<&S>,
83
6
    digest: impl Into<StoreKey<'_>>,
84
6
    file: &mut fs::FileSlot,
85
6
    upload_size: UploadSizeInfo,
86
6
) -> Result<(), Error> {
87
6
    file.rewind()
88
6
        .await
89
6
        .err_tip(|| "Failed to rewind in upload_file_to_store")
?0
;
90
6
    let (mut tx, rx) = make_buf_channel_pair();
91
92
6
    let update_fut = store
93
6
        .update(digest.into(), rx, upload_size)
94
6
        .map(|r| r.err_tip(|| "Could not upload data to store in upload_file_to_store"));
95
6
    let read_data_fut = async move {
96
        loop {
97
75
            let mut buf = BytesMut::with_capacity(fs::DEFAULT_READ_BUFF_SIZE);
98
75
            let read = file
99
75
                .read_buf(&mut buf)
100
75
                .await
101
75
                .err_tip(|| "Failed to read in upload_file_to_store")
?0
;
102
75
            if read == 0 {
  Branch (102:16): [True: 5, False: 5]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [Folded - Ignored]
  Branch (102:16): [True: 1, False: 64]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [Folded - Ignored]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
  Branch (102:16): [True: 0, False: 0]
103
6
                break;
104
69
            }
105
69
            tx.send(buf.freeze())
106
69
                .await
107
69
                .err_tip(|| "Failed to send in upload_file_to_store")
?0
;
108
        }
109
6
        tx.send_eof()
110
6
            .err_tip(|| "Could not send EOF to store in upload_file_to_store")
111
6
    };
112
6
    tokio::pin!(read_data_fut);
113
6
    let (update_res, read_res) = tokio::join!(update_fut, read_data_fut);
114
6
    update_res.merge(read_res)
115
6
}
116
117
/// Optimizations that stores may want to expose to the callers.
118
/// This is useful for specific cases when the store can optimize the processing
119
/// of the data being processed.
120
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
121
pub enum StoreOptimizations {
122
    /// The store can optimize the upload process when it knows the data is coming from a file.
123
    FileUpdates,
124
125
    /// If the store will ignore the data uploads.
126
    NoopUpdates,
127
128
    /// If the store will never serve downloads.
129
    NoopDownloads,
130
}
131
132
/// A wrapper struct for [`StoreKey`] to work around
133
/// lifetime limitations in `HashMap::get()` as described in
134
/// <https://github.com/rust-lang/rust/issues/80389>
135
///
136
/// As such this is a wrapper type that is stored in the
137
/// maps using the workaround as described in
138
/// <https://blinsay.com/blog/compound-keys/>
139
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
140
#[repr(transparent)]
141
pub struct StoreKeyBorrow(StoreKey<'static>);
142
143
impl From<StoreKey<'static>> for StoreKeyBorrow {
144
5.87k
    fn from(key: StoreKey<'static>) -> Self {
145
5.87k
        Self(key)
146
5.87k
    }
147
}
148
149
impl From<StoreKeyBorrow> for StoreKey<'static> {
150
0
    fn from(key_borrow: StoreKeyBorrow) -> Self {
151
0
        key_borrow.0
152
0
    }
153
}
154
155
impl<'a> Borrow<StoreKey<'a>> for StoreKeyBorrow {
156
4.70k
    fn borrow(&self) -> &StoreKey<'a> {
157
4.70k
        &self.0
158
4.70k
    }
159
}
160
161
impl<'a> Borrow<StoreKey<'a>> for &StoreKeyBorrow {
162
0
    fn borrow(&self) -> &StoreKey<'a> {
163
0
        &self.0
164
0
    }
165
}
166
167
/// Holds something that can be converted into a key the
168
/// store API can understand. Generally this is a digest
169
/// but it can also be a string if the caller wishes to
170
/// store the data directly and reference it by a string
171
/// directly.
172
#[derive(Debug, Eq)]
173
pub enum StoreKey<'a> {
174
    /// A string key.
175
    Str(Cow<'a, str>),
176
177
    /// A key that is a digest.
178
    Digest(DigestInfo),
179
}
180
181
impl<'a> StoreKey<'a> {
182
    /// Creates a new store key from a string.
183
22
    pub const fn new_str(s: &'a str) -> Self {
184
22
        StoreKey::Str(Cow::Borrowed(s))
185
22
    }
186
187
    /// Returns a shallow clone of the key.
188
    /// This is extremely cheap and should be used when clone
189
    /// is needed but the key is not going to be modified.
190
    #[must_use]
191
    #[allow(
192
        clippy::missing_const_for_fn,
193
        reason = "False positive on stable, but not on nightly"
194
    )]
195
2.90k
    pub fn borrow(&'a self) -> Self {
196
53
        match self {
197
15
            StoreKey::Str(Cow::Owned(s)) => StoreKey::Str(Cow::Borrowed(s)),
198
38
            StoreKey::Str(Cow::Borrowed(s)) => StoreKey::Str(Cow::Borrowed(s)),
199
2.85k
            StoreKey::Digest(d) => StoreKey::Digest(*d),
200
        }
201
2.90k
    }
202
203
    /// Converts the key into an owned version. This is useful
204
    /// when the caller needs an owned version of the key.
205
12.0k
    pub fn into_owned(self) -> StoreKey<'static> {
206
68
        match self {
207
14
            StoreKey::Str(Cow::Owned(s)) => StoreKey::Str(Cow::Owned(s)),
208
54
            StoreKey::Str(Cow::Borrowed(s)) => StoreKey::Str(Cow::Owned(s.to_owned())),
209
11.9k
            StoreKey::Digest(d) => StoreKey::Digest(d),
210
        }
211
12.0k
    }
212
213
    /// Converts the key into a digest. This is useful when the caller
214
    /// must have a digest key. If the data is not a digest, it may
215
    /// hash the underlying key and return a digest of the hash of the key
216
161
    pub fn into_digest(self) -> DigestInfo {
217
161
        match self {
218
159
            StoreKey::Digest(digest) => digest,
219
2
            StoreKey::Str(s) => {
220
2
                let mut hasher = DigestHasherFunc::Blake3.hasher();
221
2
                hasher.update(s.as_bytes());
222
2
                hasher.finalize_digest()
223
            }
224
        }
225
161
    }
226
227
    /// Returns the key as a string. If the key is a digest, it will
228
    /// return a string representation of the digest. If the key is a string,
229
    /// it will return the string itself.
230
143
    pub fn as_str(&'a self) -> Cow<'a, str> {
231
73
        match self {
232
73
            StoreKey::Str(Cow::Owned(s)) => Cow::Borrowed(s),
233
0
            StoreKey::Str(Cow::Borrowed(s)) => Cow::Borrowed(s),
234
70
            StoreKey::Digest(d) => Cow::Owned(format!("{d}")),
235
        }
236
143
    }
237
}
238
239
impl Clone for StoreKey<'static> {
240
10.2k
    fn clone(&self) -> Self {
241
10.2k
        match self {
242
27
            StoreKey::Str(s) => StoreKey::Str(s.clone()),
243
10.2k
            StoreKey::Digest(d) => StoreKey::Digest(*d),
244
        }
245
10.2k
    }
246
}
247
248
impl PartialOrd for StoreKey<'_> {
249
36
    fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
250
36
        Some(self.cmp(other))
251
36
    }
252
}
253
254
impl Ord for StoreKey<'_> {
255
53
    fn cmp(&self, other: &Self) -> core::cmp::Ordering {
256
53
        match (self, other) {
257
53
            (StoreKey::Str(a), StoreKey::Str(b)) => a.cmp(b),
258
0
            (StoreKey::Digest(a), StoreKey::Digest(b)) => a.cmp(b),
259
0
            (StoreKey::Str(_), StoreKey::Digest(_)) => core::cmp::Ordering::Less,
260
0
            (StoreKey::Digest(_), StoreKey::Str(_)) => core::cmp::Ordering::Greater,
261
        }
262
53
    }
263
}
264
265
impl PartialEq for StoreKey<'_> {
266
5.61k
    fn eq(&self, other: &Self) -> bool {
267
5.61k
        match (self, other) {
268
50
            (StoreKey::Str(a), StoreKey::Str(b)) => a == b,
269
5.56k
            (StoreKey::Digest(a), StoreKey::Digest(b)) => a == b,
270
0
            _ => false,
271
        }
272
5.61k
    }
273
}
274
275
impl Hash for StoreKey<'_> {
276
25.2k
    fn hash<H: Hasher>(&self, state: &mut H) {
277
        /// Salts the hash with the enum value that represents
278
        /// the type of the key.
279
        #[repr(u8)]
280
        enum HashId {
281
            Str = 0,
282
            Digest = 1,
283
        }
284
25.2k
        match self {
285
42
            StoreKey::Str(s) => {
286
42
                (HashId::Str as u8).hash(state);
287
42
                s.hash(state);
288
42
            }
289
25.2k
            StoreKey::Digest(d) => {
290
25.2k
                (HashId::Digest as u8).hash(state);
291
25.2k
                d.hash(state);
292
25.2k
            }
293
        }
294
25.2k
    }
295
}
296
297
impl<'a> From<&'a str> for StoreKey<'a> {
298
1
    fn from(s: &'a str) -> Self {
299
1
        StoreKey::Str(Cow::Borrowed(s))
300
1
    }
301
}
302
303
impl From<String> for StoreKey<'static> {
304
0
    fn from(s: String) -> Self {
305
0
        StoreKey::Str(Cow::Owned(s))
306
0
    }
307
}
308
309
impl From<DigestInfo> for StoreKey<'_> {
310
10.5k
    fn from(d: DigestInfo) -> Self {
311
10.5k
        StoreKey::Digest(d)
312
10.5k
    }
313
}
314
315
impl From<&DigestInfo> for StoreKey<'_> {
316
41
    fn from(d: &DigestInfo) -> Self {
317
41
        StoreKey::Digest(*d)
318
41
    }
319
}
320
321
// mostly for use with tracing::Value
322
impl Display for StoreKey<'_> {
323
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
324
0
        match self {
325
0
            StoreKey::Str(s) => {
326
0
                write!(f, "{s}")
327
            }
328
0
            StoreKey::Digest(d) => {
329
0
                write!(f, "Digest: {d}")
330
            }
331
        }
332
0
    }
333
}
334
335
#[derive(Clone, MetricsComponent)]
336
#[repr(transparent)]
337
pub struct Store {
338
    #[metric]
339
    inner: Arc<dyn StoreDriver>,
340
}
341
342
impl Debug for Store {
343
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
344
0
        f.debug_struct("Store").finish_non_exhaustive()
345
0
    }
346
}
347
348
impl Store {
349
290
    pub fn new(inner: Arc<dyn StoreDriver>) -> Self {
350
290
        Self { inner }
351
290
    }
352
353
    /// Returns the immediate inner store driver.
354
    /// Note: This does not recursively try to resolve underlying store drivers
355
    /// like `.inner_store()` does.
356
    #[inline]
357
1
    pub fn into_inner(self) -> Arc<dyn StoreDriver> {
358
1
        self.inner
359
1
    }
360
361
    /// Gets the underlying store for the given digest.
362
    /// A caller might want to use this to obtain a reference to the "real" underlying store
363
    /// (if applicable) and check if it implements some special traits that allow optimizations.
364
    /// Note: If the store performs complex operations on the data, it should return itself.
365
    #[inline]
366
183
    pub fn inner_store<'a, K: Into<StoreKey<'a>>>(&self, digest: Option<K>) -> &dyn StoreDriver {
367
183
        self.inner.inner_store(digest.map(Into::into))
368
183
    }
369
370
    /// Tries to cast the underlying store to the given type.
371
    #[inline]
372
67
    pub fn downcast_ref<U: StoreDriver>(&self, maybe_digest: Option<StoreKey<'_>>) -> Option<&U> {
373
67
        self.inner.inner_store(maybe_digest).as_any().downcast_ref()
374
67
    }
375
376
    /// Register health checks used to monitor the store.
377
    #[inline]
378
0
    pub fn register_health(&self, registry: &mut HealthRegistryBuilder) {
379
0
        self.inner.clone().register_health(registry);
380
0
    }
381
382
    #[inline]
383
8
    pub fn register_remove_callback(
384
8
        &self,
385
8
        callback: Arc<dyn RemoveItemCallback>,
386
8
    ) -> Result<(), Error> {
387
8
        self.inner.clone().register_remove_callback(callback)
388
8
    }
389
}
390
391
impl StoreLike for Store {
392
    #[inline]
393
10.0k
    fn as_store_driver(&self) -> &'_ dyn StoreDriver {
394
10.0k
        self.inner.as_ref()
395
10.0k
    }
396
397
7
    fn as_pin(&self) -> Pin<&Self> {
398
7
        Pin::new(self)
399
7
    }
400
}
401
402
impl<T> StoreLike for T
403
where
404
    T: StoreDriver + Sized,
405
{
406
    #[inline]
407
8.01k
    fn as_store_driver(&self) -> &'_ dyn StoreDriver {
408
8.01k
        self
409
8.01k
    }
410
411
90
    fn as_pin(&self) -> Pin<&Self> {
412
90
        Pin::new(self)
413
90
    }
414
}
415
416
pub trait StoreLike: Send + Sync + Sized + Unpin + 'static {
417
    /// Returns the immediate inner store driver.
418
    fn as_store_driver(&self) -> &'_ dyn StoreDriver;
419
420
    /// Utility function to return a pinned reference to self.
421
    fn as_pin(&self) -> Pin<&Self>;
422
423
    /// Utility function to return a pinned reference to the store driver.
424
    #[inline]
425
18.0k
    fn as_store_driver_pin(&self) -> Pin<&'_ dyn StoreDriver> {
426
18.0k
        Pin::new(self.as_store_driver())
427
18.0k
    }
428
429
    /// Look up a digest in the store and return None if it does not exist in
430
    /// the store, or Some(size) if it does.
431
    /// Note: On an AC store the size will be incorrect and should not be used!
432
    #[inline]
433
238
    fn has<'a>(
434
238
        &'a self,
435
238
        digest: impl Into<StoreKey<'a>>,
436
238
    ) -> impl Future<Output = Result<Option<u64>, Error>> + 'a {
437
238
        self.as_store_driver_pin().has(digest.into())
438
238
    }
439
440
    /// Look up a list of digests in the store and return a result for each in
441
    /// the same order as input.  The result will either be None if it does not
442
    /// exist in the store, or Some(size) if it does.
443
    /// Note: On an AC store the size will be incorrect and should not be used!
444
    #[inline]
445
19
    fn has_many<'a>(
446
19
        &'a self,
447
19
        digests: &'a [StoreKey<'a>],
448
19
    ) -> impl Future<Output = Result<Vec<Option<u64>>, Error>> + Send + 'a {
449
19
        self.as_store_driver_pin().has_many(digests)
450
19
    }
451
452
    /// The implementation of the above has and `has_many` functions.  See their
453
    /// documentation for details.
454
    #[inline]
455
56
    fn has_with_results<'a>(
456
56
        &'a self,
457
56
        digests: &'a [StoreKey<'a>],
458
56
        results: &'a mut [Option<u64>],
459
56
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
460
56
        self.as_store_driver_pin()
461
56
            .has_with_results(digests, results)
462
56
    }
463
464
    /// List all the keys in the store that are within the given range.
465
    /// `handler` is called for each key in the range. If `handler` returns
466
    /// false, the listing is stopped.
467
    ///
468
    /// The number of keys passed through the handler is the return value.
469
    #[inline]
470
15
    fn list<'a, 'b>(
471
15
        &'a self,
472
15
        range: impl RangeBounds<StoreKey<'b>> + Send + 'b,
473
15
        mut handler: impl for<'c> FnMut(&'c StoreKey) -> bool + Send + Sync + 'a,
474
15
    ) -> impl Future<Output = Result<u64, Error>> + Send + 'a
475
15
    where
476
15
        'b: 'a,
477
    {
478
        // Note: We use a manual async move, so the future can own the `range` and `handler`,
479
        // otherwise we'd require the caller to pass them in by reference making more borrow
480
        // checker noise.
481
15
        async move {
482
15
            self.as_store_driver_pin()
483
15
                .list(
484
15
                    (
485
15
                        range.start_bound().map(StoreKey::borrow),
486
15
                        range.end_bound().map(StoreKey::borrow),
487
15
                    ),
488
15
                    &mut handler,
489
15
                )
490
15
                .await
491
15
        }
492
15
    }
493
494
    /// Sends the data to the store.
495
    #[inline]
496
5.23k
    fn update<'a>(
497
5.23k
        &'a self,
498
5.23k
        digest: impl Into<StoreKey<'a>>,
499
5.23k
        reader: DropCloserReadHalf,
500
5.23k
        upload_size: UploadSizeInfo,
501
5.23k
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
502
5.23k
        self.as_store_driver_pin()
503
5.23k
            .update(digest.into(), reader, upload_size)
504
5.23k
    }
505
506
    /// Any optimizations the store might want to expose to the callers.
507
    /// By default, no optimizations are exposed.
508
    #[inline]
509
10
    fn optimized_for(&self, optimization: StoreOptimizations) -> bool {
510
10
        self.as_store_driver_pin().optimized_for(optimization)
511
10
    }
512
513
    /// Specialized version of `.update()` which takes a `FileSlot`.
514
    /// This is useful if the underlying store can optimize the upload process
515
    /// when it knows the data is coming from a file.
516
    #[inline]
517
8
    fn update_with_whole_file<'a>(
518
8
        &'a self,
519
8
        digest: impl Into<StoreKey<'a>>,
520
8
        path: OsString,
521
8
        file: fs::FileSlot,
522
8
        upload_size: UploadSizeInfo,
523
8
    ) -> impl Future<Output = Result<Option<fs::FileSlot>, Error>> + Send + 'a {
524
8
        self.as_store_driver_pin()
525
8
            .update_with_whole_file(digest.into(), path, file, upload_size)
526
8
    }
527
528
    /// Utility to send all the data to the store when you have all the bytes.
529
    #[inline]
530
5.66k
    fn update_oneshot<'a>(
531
5.66k
        &'a self,
532
5.66k
        digest: impl Into<StoreKey<'a>>,
533
5.66k
        data: Bytes,
534
5.66k
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
535
5.66k
        self.as_store_driver_pin()
536
5.66k
            .update_oneshot(digest.into(), data)
537
5.66k
    }
538
539
    /// Retrieves part of the data from the store and writes it to the given writer.
540
    #[inline]
541
1.31k
    fn get_part<'a>(
542
1.31k
        &'a self,
543
1.31k
        digest: impl Into<StoreKey<'a>>,
544
1.31k
        mut writer: impl BorrowMut<DropCloserWriteHalf> + Send + 'a,
545
1.31k
        offset: u64,
546
1.31k
        length: Option<u64>,
547
1.31k
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
548
1.31k
        let key = digest.into();
549
        // Note: We need to capture `writer` just in case the caller
550
        // expects the drop() method to be called on it when the future
551
        // is done due to the complex interaction between the DropCloserWriteHalf
552
        // and the DropCloserReadHalf during drop().
553
1.31k
        async move {
554
1.31k
            self.as_store_driver_pin()
555
1.31k
                .get_part(key, writer.borrow_mut(), offset, length)
556
1.31k
                .await
557
1.31k
        }
558
1.31k
    }
559
560
    /// Utility that works the same as `.get_part()`, but writes all the data.
561
    #[inline]
562
15
    fn get<'a>(
563
15
        &'a self,
564
15
        key: impl Into<StoreKey<'a>>,
565
15
        writer: DropCloserWriteHalf,
566
15
    ) -> impl Future<Output = Result<(), Error>> + Send + 'a {
567
15
        self.as_store_driver_pin().get(key.into(), writer)
568
15
    }
569
570
    /// Utility that will return all the bytes at once instead of in a streaming manner.
571
    #[inline]
572
5.25k
    fn get_part_unchunked<'a>(
573
5.25k
        &'a self,
574
5.25k
        key: impl Into<StoreKey<'a>>,
575
5.25k
        offset: u64,
576
5.25k
        length: Option<u64>,
577
5.25k
    ) -> impl Future<Output = Result<Bytes, Error>> + Send + 'a {
578
5.25k
        self.as_store_driver_pin()
579
5.25k
            .get_part_unchunked(key.into(), offset, length)
580
5.25k
    }
581
582
    /// Default implementation of the health check. Some stores may want to override this
583
    /// in situations where the default implementation is not sufficient.
584
    #[inline]
585
1
    fn check_health(
586
1
        &self,
587
1
        namespace: Cow<'static, str>,
588
1
    ) -> impl Future<Output = HealthStatus> + Send {
589
1
        self.as_store_driver_pin().check_health(namespace)
590
1
    }
591
}
592
593
#[async_trait]
594
pub trait StoreDriver:
595
    Sync + Send + Unpin + MetricsComponent + HealthStatusIndicator + 'static
596
{
597
    /// See: [`StoreLike::has`] for details.
598
    #[inline]
599
261
    async fn has(self: Pin<&Self>, key: StoreKey<'_>) -> Result<Option<u64>, Error> {
600
        let mut result = [None];
601
        self.has_with_results(&[key], &mut result).await?;
602
        Ok(result[0])
603
261
    }
604
605
    /// See: [`StoreLike::has_many`] for details.
606
    #[inline]
607
    async fn has_many(
608
        self: Pin<&Self>,
609
        digests: &[StoreKey<'_>],
610
19
    ) -> Result<Vec<Option<u64>>, Error> {
611
        let mut results = vec![None; digests.len()];
612
        self.has_with_results(digests, &mut results).await?;
613
        Ok(results)
614
19
    }
615
616
    /// See: [`StoreLike::has_with_results`] for details.
617
    async fn has_with_results(
618
        self: Pin<&Self>,
619
        digests: &[StoreKey<'_>],
620
        results: &mut [Option<u64>],
621
    ) -> Result<(), Error>;
622
623
    /// See: [`StoreLike::list`] for details.
624
    async fn list(
625
        self: Pin<&Self>,
626
        _range: (Bound<StoreKey<'_>>, Bound<StoreKey<'_>>),
627
        _handler: &mut (dyn for<'a> FnMut(&'a StoreKey) -> bool + Send + Sync + '_),
628
0
    ) -> Result<u64, Error> {
629
        // TODO(palfrey) We should force all stores to implement this function instead of
630
        // providing a default implementation.
631
        Err(make_err!(
632
            Code::Unimplemented,
633
            "Store::list() not implemented for this store"
634
        ))
635
0
    }
636
637
    /// See: [`StoreLike::update`] for details.
638
    async fn update(
639
        self: Pin<&Self>,
640
        key: StoreKey<'_>,
641
        reader: DropCloserReadHalf,
642
        upload_size: UploadSizeInfo,
643
    ) -> Result<(), Error>;
644
645
    /// See: [`StoreLike::optimized_for`] for details.
646
101
    fn optimized_for(&self, _optimization: StoreOptimizations) -> bool {
647
101
        false
648
101
    }
649
650
    /// See: [`StoreLike::update_with_whole_file`] for details.
651
    async fn update_with_whole_file(
652
        self: Pin<&Self>,
653
        key: StoreKey<'_>,
654
        path: OsString,
655
        mut file: fs::FileSlot,
656
        upload_size: UploadSizeInfo,
657
1
    ) -> Result<Option<fs::FileSlot>, Error> {
658
        let inner_store = self.inner_store(Some(key.borrow()));
659
        if inner_store.optimized_for(StoreOptimizations::FileUpdates) {
660
            error_if!(
661
                addr_eq(inner_store, &raw const *self),
662
                "Store::inner_store() returned self when optimization present"
663
            );
664
            return Pin::new(inner_store)
665
                .update_with_whole_file(key, path, file, upload_size)
666
                .await;
667
        }
668
        slow_update_store_with_file(self, key, &mut file, upload_size).await?;
669
        Ok(Some(file))
670
1
    }
671
672
    /// See: [`StoreLike::update_oneshot`] for details.
673
5.77k
    async fn update_oneshot(self: Pin<&Self>, key: StoreKey<'_>, data: Bytes) -> Result<(), Error> {
674
        // TODO(palfrey) This is extremely inefficient, since we have exactly
675
        // what we need here. Maybe we could instead make a version of the stream
676
        // that can take objects already fully in memory instead?
677
        let (mut tx, rx) = make_buf_channel_pair();
678
679
        let data_len =
680
            u64::try_from(data.len()).err_tip(|| "Could not convert data.len() to u64")?;
681
5.77k
        let send_fut = async move {
682
            // Only send if we are not EOF.
683
5.77k
            if !data.is_empty() {
  Branch (683:16): [True: 0, False: 0]
  Branch (683:16): [True: 0, False: 0]
  Branch (683:16): [True: 0, False: 0]
  Branch (683:16): [True: 0, False: 0]
  Branch (683:16): [True: 0, False: 0]
  Branch (683:16): [True: 0, False: 0]
  Branch (683:16): [True: 0, False: 0]
  Branch (683:16): [True: 0, False: 0]
  Branch (683:16): [True: 0, False: 0]
  Branch (683:16): [True: 0, False: 0]
  Branch (683:16): [True: 0, False: 0]
  Branch (683:16): [True: 34, False: 0]
  Branch (683:16): [True: 0, False: 0]
  Branch (683:16): [True: 0, False: 0]
  Branch (683:16): [True: 0, False: 0]
  Branch (683:16): [True: 0, False: 0]
  Branch (683:16): [True: 0, False: 0]
  Branch (683:16): [True: 0, False: 0]
  Branch (683:16): [True: 0, False: 0]
  Branch (683:16): [Folded - Ignored]
  Branch (683:16): [True: 0, False: 0]
  Branch (683:16): [True: 1, False: 0]
  Branch (683:16): [True: 16, False: 40]
  Branch (683:16): [True: 8, False: 0]
  Branch (683:16): [True: 0, False: 0]
  Branch (683:16): [True: 4, False: 1]
  Branch (683:16): [True: 7, False: 0]
  Branch (683:16): [True: 90, False: 0]
  Branch (683:16): [True: 0, False: 0]
  Branch (683:16): [True: 3, False: 0]
  Branch (683:16): [True: 2, False: 0]
  Branch (683:16): [True: 0, False: 0]
  Branch (683:16): [True: 0, False: 0]
  Branch (683:16): [True: 4, False: 0]
  Branch (683:16): [True: 1, False: 0]
  Branch (683:16): [True: 2, False: 0]
  Branch (683:16): [True: 2, False: 0]
  Branch (683:16): [True: 2, False: 0]
  Branch (683:16): [True: 2, False: 0]
  Branch (683:16): [True: 0, False: 0]
  Branch (683:16): [True: 11, False: 0]
  Branch (683:16): [True: 0, False: 0]
  Branch (683:16): [True: 0, False: 0]
  Branch (683:16): [True: 0, False: 0]
  Branch (683:16): [True: 0, False: 0]
  Branch (683:16): [True: 0, False: 0]
  Branch (683:16): [True: 406, False: 1]
  Branch (683:16): [True: 0, False: 0]
  Branch (683:16): [True: 2, False: 0]
  Branch (683:16): [Folded - Ignored]
  Branch (683:16): [True: 0, False: 0]
  Branch (683:16): [True: 0, False: 0]
  Branch (683:16): [True: 1, False: 0]
  Branch (683:16): [True: 4, False: 2]
  Branch (683:16): [True: 2, False: 0]
  Branch (683:16): [True: 1, False: 0]
  Branch (683:16): [True: 13, False: 1]
  Branch (683:16): [True: 51, False: 32]
  Branch (683:16): [True: 0, False: 0]
  Branch (683:16): [True: 1, False: 0]
  Branch (683:16): [True: 5.00k, False: 0]
  Branch (683:16): [True: 6, False: 0]
  Branch (683:16): [True: 4, False: 0]
  Branch (683:16): [True: 2, False: 0]
  Branch (683:16): [True: 0, False: 0]
  Branch (683:16): [True: 7, False: 0]
684
5.69k
                tx.send(data)
685
5.69k
                    .await
686
5.69k
                    .err_tip(|| "Failed to write data in update_oneshot")
?0
;
687
77
            }
688
5.77k
            tx.send_eof()
689
5.77k
                .err_tip(|| "Failed to write EOF in update_oneshot")
?0
;
690
5.77k
            Ok(())
691
5.77k
        };
692
        try_join!(
693
            send_fut,
694
            self.update(key, rx, UploadSizeInfo::ExactSize(data_len))
695
        )?;
696
        Ok(())
697
5.77k
    }
698
699
    /// See: [`StoreLike::get_part`] for details.
700
    async fn get_part(
701
        self: Pin<&Self>,
702
        key: StoreKey<'_>,
703
        writer: &mut DropCloserWriteHalf,
704
        offset: u64,
705
        length: Option<u64>,
706
    ) -> Result<(), Error>;
707
708
    /// See: [`StoreLike::get`] for details.
709
    #[inline]
710
    async fn get(
711
        self: Pin<&Self>,
712
        key: StoreKey<'_>,
713
        mut writer: DropCloserWriteHalf,
714
15
    ) -> Result<(), Error> {
715
        self.get_part(key, &mut writer, 0, None).await
716
15
    }
717
718
    /// See: [`StoreLike::get_part_unchunked`] for details.
719
    async fn get_part_unchunked(
720
        self: Pin<&Self>,
721
        key: StoreKey<'_>,
722
        offset: u64,
723
        length: Option<u64>,
724
5.36k
    ) -> Result<Bytes, Error> {
725
        let length_usize = length
726
2.24k
            .map(|v| usize::try_from(v).err_tip(|| "Could not convert length to usize"))
727
            .transpose()?;
728
729
        // TODO(palfrey) This is extremely inefficient, since we have exactly
730
        // what we need here. Maybe we could instead make a version of the stream
731
        // that can take objects already fully in memory instead?
732
        let (mut tx, mut rx) = make_buf_channel_pair();
733
734
        let (data_res, get_part_res) = join!(
735
            rx.consume(length_usize),
736
            // We use a closure here to ensure that the `tx` is dropped when the
737
            // future is done.
738
5.36k
            async move { self.get_part(key, &mut tx, offset, length).await },
739
        );
740
        get_part_res
741
            .err_tip(|| "Failed to get_part in get_part_unchunked")
742
            .merge(data_res.err_tip(|| "Failed to read stream to completion in get_part_unchunked"))
743
5.36k
    }
744
745
    /// See: [`StoreLike::check_health`] for details.
746
1
    async fn check_health(self: Pin<&Self>, namespace: Cow<'static, str>) -> HealthStatus {
747
        let digest_data_size = default_digest_size_health_check();
748
        let mut digest_data = vec![0u8; digest_data_size];
749
750
        let mut namespace_hasher = StdHasher::new();
751
        namespace.hash(&mut namespace_hasher);
752
        self.get_name().hash(&mut namespace_hasher);
753
        let hash_seed = namespace_hasher.finish();
754
755
        // Fill the digest data with random data based on a stable
756
        // hash of the namespace and store name. Intention is to
757
        // have randomly filled data that is unique per store and
758
        // does not change between health checks. This is to ensure
759
        // we are not adding more data to store on each health check.
760
        let mut rng: StdRng = StdRng::seed_from_u64(hash_seed);
761
        rng.fill_bytes(&mut digest_data);
762
763
        let mut digest_hasher = default_digest_hasher_func().hasher();
764
        digest_hasher.update(&digest_data);
765
        let digest_data_len = digest_data.len() as u64;
766
        let digest_info = StoreKey::from(digest_hasher.finalize_digest());
767
768
        let digest_bytes = Bytes::copy_from_slice(&digest_data);
769
770
        if let Err(e) = self
771
            .update_oneshot(digest_info.borrow(), digest_bytes.clone())
772
            .await
773
        {
774
            warn!(?e, "check_health Store.update_oneshot() failed");
775
            return HealthStatus::new_failed(
776
                self.get_ref(),
777
                format!("Store.update_oneshot() failed: {e}").into(),
778
            );
779
        }
780
781
        match self.has(digest_info.borrow()).await {
782
            Ok(Some(s)) => {
783
                if s != digest_data_len {
784
                    return HealthStatus::new_failed(
785
                        self.get_ref(),
786
                        format!("Store.has() size mismatch {s} != {digest_data_len}").into(),
787
                    );
788
                }
789
            }
790
            Ok(None) => {
791
                return HealthStatus::new_failed(
792
                    self.get_ref(),
793
                    "Store.has() size not found".into(),
794
                );
795
            }
796
            Err(e) => {
797
                return HealthStatus::new_failed(
798
                    self.get_ref(),
799
                    format!("Store.has() failed: {e}").into(),
800
                );
801
            }
802
        }
803
804
        match self
805
            .get_part_unchunked(digest_info, 0, Some(digest_data_len))
806
            .await
807
        {
808
            Ok(b) => {
809
                if b != digest_bytes {
810
                    return HealthStatus::new_failed(
811
                        self.get_ref(),
812
                        "Store.get_part_unchunked() data mismatch".into(),
813
                    );
814
                }
815
            }
816
            Err(e) => {
817
                return HealthStatus::new_failed(
818
                    self.get_ref(),
819
                    format!("Store.get_part_unchunked() failed: {e}").into(),
820
                );
821
            }
822
        }
823
824
        HealthStatus::new_ok(self.get_ref(), "Successfully store health check".into())
825
1
    }
826
827
    /// See: [`Store::inner_store`] for details.
828
    fn inner_store(&self, _digest: Option<StoreKey<'_>>) -> &dyn StoreDriver;
829
830
    /// Returns an Any variation of whatever Self is.
831
    fn as_any(&self) -> &(dyn core::any::Any + Sync + Send + 'static);
832
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static>;
833
834
    // Register health checks used to monitor the store.
835
0
    fn register_health(self: Arc<Self>, _registry: &mut HealthRegistryBuilder) {}
836
837
    fn register_remove_callback(
838
        self: Arc<Self>,
839
        callback: Arc<dyn RemoveItemCallback>,
840
    ) -> Result<(), Error>;
841
}
842
843
// Callback to be called when a store deletes an item. This is used so
844
// compound stores can remove items from their internal state when their
845
// underlying stores remove items e.g. caches
846
pub trait RemoveItemCallback: Debug + Send + Sync {
847
    fn callback<'a>(
848
        &'a self,
849
        store_key: StoreKey<'a>,
850
    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>;
851
}
852
853
/// The instructions on how to decode a value from a Bytes & version into
854
/// the underlying type.
855
pub trait SchedulerStoreDecodeTo {
856
    type DecodeOutput;
857
    fn decode(version: i64, data: Bytes) -> Result<Self::DecodeOutput, Error>;
858
}
859
860
pub trait SchedulerSubscription: Send + Sync {
861
    fn changed(&mut self) -> impl Future<Output = Result<(), Error>> + Send;
862
}
863
864
pub trait SchedulerSubscriptionManager: Send + Sync {
865
    type Subscription: SchedulerSubscription;
866
867
    fn notify_for_test(&self, value: String);
868
869
    fn subscribe<K>(&self, key: K) -> Result<Self::Subscription, Error>
870
    where
871
        K: SchedulerStoreKeyProvider;
872
873
    fn is_reliable() -> bool;
874
}
875
876
/// The API surface for a scheduler store.
877
pub trait SchedulerStore: Send + Sync + 'static {
878
    type SubscriptionManager: SchedulerSubscriptionManager;
879
880
    /// Returns the subscription manager for the scheduler store.
881
    fn subscription_manager(&self) -> Result<Arc<Self::SubscriptionManager>, Error>;
882
883
    /// Updates or inserts an entry into the underlying store.
884
    /// Metadata about the key is attached to the compile-time type.
885
    /// If `StoreKeyProvider::Versioned` is `TrueValue`, the data will not
886
    /// be updated if the current version in the database does not match
887
    /// the version in the passed in data.
888
    /// No guarantees are made about when `Version` is `FalseValue`.
889
    /// Indexes are guaranteed to be updated atomically with the data.
890
    fn update_data<T>(&self, data: T) -> impl Future<Output = Result<Option<i64>, Error>> + Send
891
    where
892
        T: SchedulerStoreDataProvider
893
            + SchedulerStoreKeyProvider
894
            + SchedulerCurrentVersionProvider
895
            + Send;
896
897
    /// Searches for all keys in the store that match the given index prefix.
898
    fn search_by_index_prefix<K>(
899
        &self,
900
        index: K,
901
    ) -> impl Future<
902
        Output = Result<
903
            impl Stream<Item = Result<<K as SchedulerStoreDecodeTo>::DecodeOutput, Error>> + Send,
904
            Error,
905
        >,
906
    > + Send
907
    where
908
        K: SchedulerIndexProvider + SchedulerStoreDecodeTo + Send;
909
910
    /// Returns data for the provided key with the given version if
911
    /// `StoreKeyProvider::Versioned` is `TrueValue`.
912
    fn get_and_decode<K>(
913
        &self,
914
        key: K,
915
    ) -> impl Future<Output = Result<Option<<K as SchedulerStoreDecodeTo>::DecodeOutput>, Error>> + Send
916
    where
917
        K: SchedulerStoreKeyProvider + SchedulerStoreDecodeTo + Send;
918
}
919
920
/// A type that is used to let the scheduler store know what
921
/// index is being requested.
922
pub trait SchedulerIndexProvider {
923
    /// Only keys inserted with this prefix will be indexed.
924
    const KEY_PREFIX: &'static str;
925
926
    /// The name of the index.
927
    const INDEX_NAME: &'static str;
928
929
    /// The sort key for the index (if any).
930
    const MAYBE_SORT_KEY: Option<&'static str> = None;
931
932
    /// If the data is versioned.
933
    type Versioned: BoolValue;
934
935
    /// The value of the index.
936
    fn index_value(&self) -> Cow<'_, str>;
937
}
938
939
/// Provides a key to lookup data in the store.
940
pub trait SchedulerStoreKeyProvider {
941
    /// If the data is versioned.
942
    type Versioned: BoolValue;
943
944
    /// Returns the key for the data.
945
    fn get_key(&self) -> StoreKey<'static>;
946
}
947
948
/// Provides data to be stored in the scheduler store.
949
pub trait SchedulerStoreDataProvider {
950
    /// Converts the data into bytes to be stored in the store.
951
    fn try_into_bytes(self) -> Result<Bytes, Error>;
952
953
    /// Returns the indexes for the data if any.
954
4
    fn get_indexes(&self) -> Result<Vec<(&'static str, Bytes)>, Error> {
955
4
        Ok(Vec::new())
956
4
    }
957
}
958
959
/// Provides the current version of the data in the store.
960
pub trait SchedulerCurrentVersionProvider {
961
    /// Returns the current version of the data in the store.
962
    fn current_version(&self) -> i64;
963
}
964
965
/// Default implementation for when we are not providing a version
966
/// for the data.
967
impl<T> SchedulerCurrentVersionProvider for T
968
where
969
    T: SchedulerStoreKeyProvider<Versioned = FalseValue>,
970
{
971
0
    fn current_version(&self) -> i64 {
972
0
        0
973
0
    }
974
}
975
976
/// Compile time types for booleans.
977
pub trait BoolValue {
978
    const VALUE: bool;
979
}
980
/// Compile time check if something is false.
981
pub trait IsFalse {}
982
/// Compile time check if something is true.
983
pub trait IsTrue {}
984
985
/// Compile time true value.
986
#[derive(Debug, Clone, Copy)]
987
pub struct TrueValue;
988
impl BoolValue for TrueValue {
989
    const VALUE: bool = true;
990
}
991
impl IsTrue for TrueValue {}
992
993
/// Compile time false value.
994
#[derive(Debug, Clone, Copy)]
995
pub struct FalseValue;
996
impl BoolValue for FalseValue {
997
    const VALUE: bool = false;
998
}
999
impl IsFalse for FalseValue {}